How to handle different task intervals on a single Dag in airflow? How to handle different task intervals on a single Dag in airflow? python-3.x python-3.x

How to handle different task intervals on a single Dag in airflow?


You can use a ShortCircuitOperator to do this.

import airflowfrom airflow.operators.python_operator import ShortCircuitOperatorfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.models import DAGargs = {    'owner': 'airflow',    'start_date': airflow.utils.dates.days_ago(2),    'schedule_interval': '0 10 * * *'}dag = DAG(dag_id='example', default_args=args)a = DummyOperator(task_id='a', dag=dag)b = DummyOperator(task_id='b', dag=dag)c = DummyOperator(task_id='c', dag=dag)d = DummyOperator(task_id='d', dag=dag)def check_trigger(execution_date, **kwargs):    return execution_date.weekday() == 0check_trigger_d = ShortCircuitOperator(  task_id='check_trigger_d',  python_callable=check_trigger,  provide_context=True,  dag=dag)a.set_downstream(b)b.set_downstream(c)a.set_downstream(check_trigger_d)# Perform D only if trigger function returns a true valuecheck_trigger_d.set_downstream(d)