Proper way to create dynamic workflows in Airflow Proper way to create dynamic workflows in Airflow python python

Proper way to create dynamic workflows in Airflow


Here is how I did it with a similar request without any subdags:

First create a method that returns whatever values you want

def values_function():     return values

Next create method that will generate the jobs dynamically:

def group(number, **kwargs):        #load the values if needed in the command you plan to execute        dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"        return BashOperator(                task_id='JOB_NAME_{}'.format(number),                bash_command='script.sh {} {}'.format(dyn_value, number),                dag=dag)

And then combine them:

push_func = PythonOperator(        task_id='push_func',        provide_context=True,        python_callable=values_function,        dag=dag)complete = DummyOperator(        task_id='All_jobs_completed',        dag=dag)for i in values_function():        push_func >> group(i) >> complete


Yes this is possible I've created an example DAG that demonstrates this.

import airflowfrom airflow.operators.python_operator import PythonOperatorimport osfrom airflow.models import Variableimport loggingfrom airflow import configuration as conffrom airflow.models import DagBag, TaskInstancefrom airflow import DAG, settingsfrom airflow.operators.bash_operator import BashOperatormain_dag_id = 'DynamicWorkflow2'args = {    'owner': 'airflow',    'start_date': airflow.utils.dates.days_ago(2),    'provide_context': True}dag = DAG(    main_dag_id,    schedule_interval="@once",    default_args=args)def start(*args, **kwargs):    value = Variable.get("DynamicWorkflow_Group1")    logging.info("Current DynamicWorkflow_Group1 value is " + str(value))def resetTasksStatus(task_id, execution_date):    logging.info("Resetting: " + task_id + " " + execution_date)    dag_folder = conf.get('core', 'DAGS_FOLDER')    dagbag = DagBag(dag_folder)    check_dag = dagbag.dags[main_dag_id]    session = settings.Session()    my_task = check_dag.get_task(task_id)    ti = TaskInstance(my_task, execution_date)    state = ti.current_state()    logging.info("Current state of " + task_id + " is " + str(state))    ti.set_state(None, session)    state = ti.current_state()    logging.info("Updated state of " + task_id + " is " + str(state))def bridge1(*args, **kwargs):    # You can set this value dynamically e.g., from a database or a calculation    dynamicValue = 2    variableValue = Variable.get("DynamicWorkflow_Group2")    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))    logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))    os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))    variableValue = Variable.get("DynamicWorkflow_Group2")    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460    for i in range(dynamicValue):        resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date']))def bridge2(*args, **kwargs):    # You can set this value dynamically e.g., from a database or a calculation    dynamicValue = 3    variableValue = Variable.get("DynamicWorkflow_Group3")    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))    logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))    os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue))    variableValue = Variable.get("DynamicWorkflow_Group3")    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460    for i in range(dynamicValue):        resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date']))def end(*args, **kwargs):    logging.info("Ending")def doSomeWork(name, index, *args, **kwargs):    # Do whatever work you need to do    # Here I will just create a new file    os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt')starting_task = PythonOperator(    task_id='start',    dag=dag,    provide_context=True,    python_callable=start,    op_args=[])# Used to connect the stream in the event that the range is zerobridge1_task = PythonOperator(    task_id='bridge1',    dag=dag,    provide_context=True,    python_callable=bridge1,    op_args=[])DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))for index in range(int(DynamicWorkflow_Group1)):    dynamicTask = PythonOperator(        task_id='firstGroup_' + str(index),        dag=dag,        provide_context=True,        python_callable=doSomeWork,        op_args=['firstGroup', index])    starting_task.set_downstream(dynamicTask)    dynamicTask.set_downstream(bridge1_task)# Used to connect the stream in the event that the range is zerobridge2_task = PythonOperator(    task_id='bridge2',    dag=dag,    provide_context=True,    python_callable=bridge2,    op_args=[])DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))for index in range(int(DynamicWorkflow_Group2)):    dynamicTask = PythonOperator(        task_id='secondGroup_' + str(index),        dag=dag,        provide_context=True,        python_callable=doSomeWork,        op_args=['secondGroup', index])    bridge1_task.set_downstream(dynamicTask)    dynamicTask.set_downstream(bridge2_task)ending_task = PythonOperator(    task_id='end',    dag=dag,    provide_context=True,    python_callable=end,    op_args=[])DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))for index in range(int(DynamicWorkflow_Group3)):    # You can make this logic anything you'd like    # I chose to use the PythonOperator for all tasks    # except the last task will use the BashOperator    if index < (int(DynamicWorkflow_Group3) - 1):        dynamicTask = PythonOperator(            task_id='thirdGroup_' + str(index),            dag=dag,            provide_context=True,            python_callable=doSomeWork,            op_args=['thirdGroup', index])    else:        dynamicTask = BashOperator(            task_id='thirdGroup_' + str(index),            bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt',            dag=dag)    bridge2_task.set_downstream(dynamicTask)    dynamicTask.set_downstream(ending_task)# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream# and your tasks will run simultaneously instead of in your desired stream order.starting_task.set_downstream(bridge1_task)bridge1_task.set_downstream(bridge2_task)bridge2_task.set_downstream(ending_task)

Before you run the DAG create these three Airflow Variables

airflow variables --set DynamicWorkflow_Group1 1airflow variables --set DynamicWorkflow_Group2 0airflow variables --set DynamicWorkflow_Group3 0

You'll see that the DAG goes from this

enter image description here

To this after it's ran

enter image description here

You can see more information on this DAG in my article on creating Dynamic Workflows On Airflow.


I have figured out a way to create workflows based on the result of previous tasks.
Basically what you want to do is have two subdags with the following:

  1. Xcom push a list (or what ever you need to create the dynamic workflow later) in the subdag that gets executed first (see test1.py def return_list())
  2. Pass the main dag object as a parameter to your second subdag
  3. Now if you have the main dag object, you can use it to get a list of its task instances. From that list of task instances, you can filter out a task of the current run by using parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]), one could probably add more filters here.
  4. With that task instance, you can use xcom pull to get the value you need by specifying the dag_id to the one of the first subdag: dag_id='%s.%s' % (parent_dag_name, 'test1')
  5. Use the list/value to create your tasks dynamically

Now I have tested this in my local airflow installation and it works fine. I don't know if the xcom pull part will have any problems if there is more than one instance of the dag running at the same time, but then you'd probably either use a unique key or something like that to uniquely identify the xcom value you want.One could probably optimize the 3. step to be 100% sure to get a specific task of the current main dag, but for my use this performs well enough, I think one only needs one task_instance object to use xcom_pull.

Also I clean the xcoms for the first subdag before every execution, just to make sure that I don't accidentally get any wrong value.

I'm pretty bad at explaining, so I hope the following code will make everything clear:

test1.py

from airflow.models import DAGimport loggingfrom airflow.operators.python_operator import PythonOperatorfrom airflow.operators.postgres_operator import PostgresOperatorlog = logging.getLogger(__name__)def test1(parent_dag_name, start_date, schedule_interval):    dag = DAG(        '%s.test1' % parent_dag_name,        schedule_interval=schedule_interval,        start_date=start_date,    )    def return_list():        return ['test1', 'test2']    list_extract_folder = PythonOperator(        task_id='list',        dag=dag,        python_callable=return_list    )    clean_xcoms = PostgresOperator(        task_id='clean_xcoms',        postgres_conn_id='airflow_db',        sql="delete from xcom where dag_id='{{ dag.dag_id }}'",        dag=dag)    clean_xcoms >> list_extract_folder    return dag

test2.py

from airflow.models import DAG, settingsimport loggingfrom airflow.operators.dummy_operator import DummyOperatorlog = logging.getLogger(__name__)def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):    dag = DAG(        '%s.test2' % parent_dag_name,        schedule_interval=schedule_interval,        start_date=start_date    )    if len(parent_dag.get_active_runs()) > 0:        test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(            dag_id='%s.%s' % (parent_dag_name, 'test1'),            task_ids='list')        if test_list:            for i in test_list:                test = DummyOperator(                    task_id=i,                    dag=dag                )    return dag

and the main workflow:

test.py

from datetime import datetimefrom airflow import DAGfrom airflow.operators.subdag_operator import SubDagOperatorfrom subdags.test1 import test1from subdags.test2 import test2DAG_NAME = 'test-dag'dag = DAG(DAG_NAME,          description='Test workflow',          catchup=False,          schedule_interval='0 0 * * *',          start_date=datetime(2018, 8, 24))test1 = SubDagOperator(    subdag=test1(DAG_NAME,                 dag.start_date,                 dag.schedule_interval),    task_id='test1',    dag=dag)test2 = SubDagOperator(    subdag=test2(DAG_NAME,                 dag.start_date,                 dag.schedule_interval,                 parent_dag=dag),    task_id='test2',    dag=dag)test1 >> test2