How to Trigger a DAG on the success of a another DAG in Airflow using Python? How to Trigger a DAG on the success of a another DAG in Airflow using Python? python-3.x python-3.x

How to Trigger a DAG on the success of a another DAG in Airflow using Python?


Answer is in this thread already. Below is demo code:

Parent dag:

from datetime import datetimefrom airflow import DAGfrom airflow.operators.dummy_operator import DummyOperatordefault_args = {    'owner': 'airflow',    'depends_on_past': False,    'start_date': datetime(2020, 4, 29),}dag = DAG('Parent_dag', default_args=default_args, schedule_interval='@daily')leave_work = DummyOperator(    task_id='leave_work',    dag=dag,)cook_dinner = DummyOperator(    task_id='cook_dinner',    dag=dag,)leave_work >> cook_dinner

Child dag:

from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.operators.sensors import ExternalTaskSensordefault_args = {    'owner': 'airflow',    'depends_on_past': False,    'start_date': datetime(2020, 4, 29),}dag = DAG('Child_dag', default_args=default_args, schedule_interval='@daily')# Use ExternalTaskSensor to listen to the Parent_dag and cook_dinner task# when cook_dinner is finished, Child_dag will be triggeredwait_for_dinner = ExternalTaskSensor(    task_id='wait_for_dinner',    external_dag_id='Parent_dag',    external_task_id='cook_dinner',    start_date=datetime(2020, 4, 29),    execution_delta=timedelta(hours=1),    timeout=3600,)have_dinner = DummyOperator(    task_id='have_dinner',    dag=dag,)play_with_food = DummyOperator(    task_id='play_with_food',    dag=dag,)wait_for_dinner >> have_dinnerwait_for_dinner >> play_with_food

Images:

Dags

Dags

Parent_dag

Parent_dag

Child_dag

Child_dag


As requested by @pankaj, I'm hereby adding a snippet depicting reactive-triggering using TriggerDagRunOperator (as opposed to poll-based triggering of ExternalTaskSensor)

from typing import Listfrom airflow.models.baseoperator import BaseOperatorfrom airflow.models.dag import DAGfrom airflow.operators.dagrun_operator import TriggerDagRunOperatorfrom airflow.utils.trigger_rule import TriggerRule# DAG objectmy_dag: DAG = DAG(dag_id='my_dag',                  start_date=..)..# a list of 'tail' tasks: tasks that have no downstream taskstail_tasks_of_first_dag: List[BaseOperator] = my_magic_function_that_determines_all_tail_tasks(..)..# our trigger taskmy_trigger_task: TriggerDagRunOperator = TriggerDagRunOperator(dag=my_dag,                                                               task_id='my_trigger_task',                                                               trigger_rule=TriggerRule.ALL_SUCCESS,                                                               external_dag_id='id_of_dag_to_be_triggered')# our trigger task should run when all 'tail' tasks have completed / succeededtail_tasks_of_first_dag >> my_trigger_task

Note that snippet is for reference purpose only; it has NOT been tested


Points to note / References


I believe you are looking for SubDags operator, running a Dag in a bigger dag.Note that creating many subdags like in the example below gets messy pretty quick, so I recommend splitting each subdag in a file and importing then in a main file.

The SubDagOperator is simple to use you need to give an Id, a subdag (the child) and a dag(the parent)

subdag_2 = SubDagOperator(        task_id="just_some_id",         subdag=child_subdag, <---- this must be a DAG        dag=parent_dag, <----- this must be a DAG        )

It will look like this:this

From their examples repo

from airflow import DAGfrom airflow.example_dags.subdags.subdag import subdagfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.operators.subdag_operator import SubDagOperatorfrom airflow.utils.dates import days_agofrom airflow import DAGfrom airflow.operators.dummy_operator import DummyOperatordef subdag(parent_dag_name, child_dag_name, args):    dag_subdag = DAG(            dag_id='%s.%s' % (parent_dag_name, child_dag_name),            default_args=args,            schedule_interval="@daily",            )    for i in range(5):        DummyOperator(                task_id='%s-task-%s' % (child_dag_name, i + 1),                default_args=args,                dag=dag_subdag,                )    return dag_subdagDAG_NAME = 'example_subdag_operator'args = {        'owner': 'airflow',        'start_date': days_ago(2),        }dag = DAG(        dag_id=DAG_NAME,        default_args=args,        schedule_interval="@once",        tags=['example']        )start = DummyOperator(        task_id='start-of-main-job',        dag=dag,        )some_other_task = DummyOperator(        task_id='some-other-task',        dag=dag,        )end = DummyOperator(        task_id='end-of-main-job',        dag=dag,        )subdag = SubDagOperator(        task_id='run-this-dag-after-previous-steps',        subdag=subdag(DAG_NAME, 'run-this-dag-after-previous-steps', args),        dag=dag,        )start >> some_other_task >> end >> subdag