How to dynamically iterate over the output of an upstream task to create parallel tasks in airflow?
Per @Juan Riza's suggestion I checked out this link: Proper way to create dynamic workflows in Airflow. This was pretty much the answer, although I was able to simplify the solution enough that I thought I would offer my own modified version of the implementation here:
from datetime import datetimeimport osimport sysfrom airflow.models import DAGfrom airflow.operators.python_operator import PythonOperatorimport ds_dependenciesSCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')if SCRIPT_PATH: sys.path.insert(0, SCRIPT_PATH) import dash_workerselse: print('Define DASH_PREPROC_PATH value in environmental variables') sys.exit(1)ENV = os.environdefault_args = { # 'start_date': datetime.now(), 'start_date': datetime(2017, 7, 18)}DAG = DAG( dag_id='dash_preproc', default_args=default_args)clear_tables = PythonOperator( task_id='clear_tables', python_callable=dash_workers.clear_db, dag=DAG)def id_worker(uid): return PythonOperator( task_id=uid, python_callable=dash_workers.main_preprocess, op_args=[uid], dag=DAG)for uid in capone_dash_workers.get_id_creds(): clear_tables >> id_worker(uid)
clear_tables
cleans the database that will be re-built as a result of the process. id_worker
is a function that dynamically generates new preprocessing tasks, based on the array of ID values returned from get_if_creds
. The task ID is just the corresponding user ID, though it could easily have been an index, i
, as in the example mentioned above.
NOTE That bitshift operator (<<
) looks backwards to me, as the clear_tables
task should come first, but it's what seems to be working in this case.
Considering that Apache Airflow is a workflow management tool, ie. it determines the dependencies between task that the user defines in comparison (as an example) with apache Nifi which is a dataflow management tool, ie. the dependencies here are data which are transferd through the tasks.
That said, i think that your approach is quit right (my comment is based on the code posted) but Airflow offers a concept called XCom
. It allows tasks to "cross-communicate" between them by passing some data. How big should the passed data be ? it is up to you to test! But generally it should be not so big. I think it is in the form of key,value pairs and it get stored in the airflow meta-database,ie you can't pass files for example but a list with ids could work.
Like i said you should test it your self. I would be very happy to know your experience. Here is an example dag which demonstrates the use of XCom
and here is the necessary documentation. Cheers!