How to dynamically iterate over the output of an upstream task to create parallel tasks in airflow? How to dynamically iterate over the output of an upstream task to create parallel tasks in airflow? python-3.x python-3.x

How to dynamically iterate over the output of an upstream task to create parallel tasks in airflow?


Per @Juan Riza's suggestion I checked out this link: Proper way to create dynamic workflows in Airflow. This was pretty much the answer, although I was able to simplify the solution enough that I thought I would offer my own modified version of the implementation here:

from datetime import datetimeimport osimport sysfrom airflow.models import DAGfrom airflow.operators.python_operator import PythonOperatorimport ds_dependenciesSCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')if SCRIPT_PATH:    sys.path.insert(0, SCRIPT_PATH)    import dash_workerselse:    print('Define DASH_PREPROC_PATH value in environmental variables')    sys.exit(1)ENV = os.environdefault_args = {  # 'start_date': datetime.now(),  'start_date': datetime(2017, 7, 18)}DAG = DAG(  dag_id='dash_preproc',  default_args=default_args)clear_tables = PythonOperator(  task_id='clear_tables',  python_callable=dash_workers.clear_db,  dag=DAG)def id_worker(uid):    return PythonOperator(        task_id=uid,        python_callable=dash_workers.main_preprocess,        op_args=[uid],        dag=DAG)for uid in capone_dash_workers.get_id_creds():    clear_tables >> id_worker(uid)

clear_tables cleans the database that will be re-built as a result of the process. id_worker is a function that dynamically generates new preprocessing tasks, based on the array of ID values returned from get_if_creds. The task ID is just the corresponding user ID, though it could easily have been an index, i, as in the example mentioned above.

NOTE That bitshift operator (<<) looks backwards to me, as the clear_tables task should come first, but it's what seems to be working in this case.


Considering that Apache Airflow is a workflow management tool, ie. it determines the dependencies between task that the user defines in comparison (as an example) with apache Nifi which is a dataflow management tool, ie. the dependencies here are data which are transferd through the tasks.

That said, i think that your approach is quit right (my comment is based on the code posted) but Airflow offers a concept called XCom. It allows tasks to "cross-communicate" between them by passing some data. How big should the passed data be ? it is up to you to test! But generally it should be not so big. I think it is in the form of key,value pairs and it get stored in the airflow meta-database,ie you can't pass files for example but a list with ids could work.

Like i said you should test it your self. I would be very happy to know your experience. Here is an example dag which demonstrates the use of XCom and here is the necessary documentation. Cheers!