How to create a conditional task in Airflow How to create a conditional task in Airflow python python

How to create a conditional task in Airflow


Airflow has a BranchPythonOperator that can be used to express the branching dependency more directly.

The docs describe its use:

The BranchPythonOperator is much like the PythonOperator except that it expects a python_callable that returns a task_id. The task_id returned is followed, and all of the other paths are skipped. The task_id returned by the Python function has to be referencing a task directly downstream from the BranchPythonOperator task.

...

If you want to skip some tasks, keep in mind that you can’t have an empty path, if so make a dummy task.

Code Example

def dummy_test():    return 'branch_a'A_task = DummyOperator(task_id='branch_a', dag=dag)B_task = DummyOperator(task_id='branch_false', dag=dag)branch_task = BranchPythonOperator(    task_id='branching',    python_callable=dummy_test,    dag=dag,)branch_task >> A_task branch_task >> B_task

EDIT:

If you're installing an Airflow version >=1.10.3, you can also return a list of task ids, allowing you to skip multiple downstream paths in a single Operator and don't use a dummy task before joining.


You have to use airflow trigger rules

All operators have a trigger_rule argument which defines the rule by which the generated task get triggered.

The trigger rule possibilities:

ALL_SUCCESS = 'all_success'ALL_FAILED = 'all_failed'ALL_DONE = 'all_done'ONE_SUCCESS = 'one_success'ONE_FAILED = 'one_failed'DUMMY = 'dummy'

Here is the idea to solve your problem:

from airflow.operators.ssh_execute_operator import SSHExecuteOperatorfrom airflow.utils.trigger_rule import TriggerRulefrom airflow.contrib.hooks import SSHHooksshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)task_1 = SSHExecuteOperator(        task_id='task_1',        bash_command=<YOUR COMMAND>,        ssh_hook=sshHook,        dag=dag)task_2 = SSHExecuteOperator(        task_id='conditional_task',        bash_command=<YOUR COMMAND>,        ssh_hook=sshHook,        dag=dag)task_2a = SSHExecuteOperator(        task_id='task_2a',        bash_command=<YOUR COMMAND>,        trigger_rule=TriggerRule.ALL_SUCCESS,        ssh_hook=sshHook,        dag=dag)task_2b = SSHExecuteOperator(        task_id='task_2b',        bash_command=<YOUR COMMAND>,        trigger_rule=TriggerRule.ALL_FAILED,        ssh_hook=sshHook,        dag=dag)task_3 = SSHExecuteOperator(        task_id='task_3',        bash_command=<YOUR COMMAND>,        trigger_rule=TriggerRule.ONE_SUCCESS,        ssh_hook=sshHook,        dag=dag)task_2.set_upstream(task_1)task_2a.set_upstream(task_2)task_2b.set_upstream(task_2)task_3.set_upstream(task_2a)task_3.set_upstream(task_2b)