How to create a conditional task in Airflow
Airflow has a BranchPythonOperator that can be used to express the branching dependency more directly.
The docs describe its use:
The BranchPythonOperator is much like the PythonOperator except that it expects a python_callable that returns a task_id. The task_id returned is followed, and all of the other paths are skipped. The task_id returned by the Python function has to be referencing a task directly downstream from the BranchPythonOperator task.
...
If you want to skip some tasks, keep in mind that you can’t have an empty path, if so make a dummy task.
Code Example
def dummy_test(): return 'branch_a'A_task = DummyOperator(task_id='branch_a', dag=dag)B_task = DummyOperator(task_id='branch_false', dag=dag)branch_task = BranchPythonOperator( task_id='branching', python_callable=dummy_test, dag=dag,)branch_task >> A_task branch_task >> B_task
EDIT:
If you're installing an Airflow version >=1.10.3, you can also return a list of task ids, allowing you to skip multiple downstream paths in a single Operator and don't use a dummy task before joining.
You have to use airflow trigger rules
All operators have a trigger_rule argument which defines the rule by which the generated task get triggered.
The trigger rule possibilities:
ALL_SUCCESS = 'all_success'ALL_FAILED = 'all_failed'ALL_DONE = 'all_done'ONE_SUCCESS = 'one_success'ONE_FAILED = 'one_failed'DUMMY = 'dummy'
Here is the idea to solve your problem:
from airflow.operators.ssh_execute_operator import SSHExecuteOperatorfrom airflow.utils.trigger_rule import TriggerRulefrom airflow.contrib.hooks import SSHHooksshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)task_1 = SSHExecuteOperator( task_id='task_1', bash_command=<YOUR COMMAND>, ssh_hook=sshHook, dag=dag)task_2 = SSHExecuteOperator( task_id='conditional_task', bash_command=<YOUR COMMAND>, ssh_hook=sshHook, dag=dag)task_2a = SSHExecuteOperator( task_id='task_2a', bash_command=<YOUR COMMAND>, trigger_rule=TriggerRule.ALL_SUCCESS, ssh_hook=sshHook, dag=dag)task_2b = SSHExecuteOperator( task_id='task_2b', bash_command=<YOUR COMMAND>, trigger_rule=TriggerRule.ALL_FAILED, ssh_hook=sshHook, dag=dag)task_3 = SSHExecuteOperator( task_id='task_3', bash_command=<YOUR COMMAND>, trigger_rule=TriggerRule.ONE_SUCCESS, ssh_hook=sshHook, dag=dag)task_2.set_upstream(task_1)task_2a.set_upstream(task_2)task_2b.set_upstream(task_2)task_3.set_upstream(task_2a)task_3.set_upstream(task_2b)