How to Trigger a DAG on the success of a another DAG in Airflow using Python?
Answer is in this thread already. Below is demo code:
Parent dag:
from datetime import datetimefrom airflow import DAGfrom airflow.operators.dummy_operator import DummyOperatordefault_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2020, 4, 29),}dag = DAG('Parent_dag', default_args=default_args, schedule_interval='@daily')leave_work = DummyOperator( task_id='leave_work', dag=dag,)cook_dinner = DummyOperator( task_id='cook_dinner', dag=dag,)leave_work >> cook_dinner
Child dag:
from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.operators.sensors import ExternalTaskSensordefault_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2020, 4, 29),}dag = DAG('Child_dag', default_args=default_args, schedule_interval='@daily')# Use ExternalTaskSensor to listen to the Parent_dag and cook_dinner task# when cook_dinner is finished, Child_dag will be triggeredwait_for_dinner = ExternalTaskSensor( task_id='wait_for_dinner', external_dag_id='Parent_dag', external_task_id='cook_dinner', start_date=datetime(2020, 4, 29), execution_delta=timedelta(hours=1), timeout=3600,)have_dinner = DummyOperator( task_id='have_dinner', dag=dag,)play_with_food = DummyOperator( task_id='play_with_food', dag=dag,)wait_for_dinner >> have_dinnerwait_for_dinner >> play_with_food
Images:
Dags
Parent_dag
Child_dag
As requested by @pankaj, I'm hereby adding a snippet depicting reactive-triggering using TriggerDagRunOperator
(as opposed to poll-based triggering of ExternalTaskSensor
)
from typing import Listfrom airflow.models.baseoperator import BaseOperatorfrom airflow.models.dag import DAGfrom airflow.operators.dagrun_operator import TriggerDagRunOperatorfrom airflow.utils.trigger_rule import TriggerRule# DAG objectmy_dag: DAG = DAG(dag_id='my_dag', start_date=..)..# a list of 'tail' tasks: tasks that have no downstream taskstail_tasks_of_first_dag: List[BaseOperator] = my_magic_function_that_determines_all_tail_tasks(..)..# our trigger taskmy_trigger_task: TriggerDagRunOperator = TriggerDagRunOperator(dag=my_dag, task_id='my_trigger_task', trigger_rule=TriggerRule.ALL_SUCCESS, external_dag_id='id_of_dag_to_be_triggered')# our trigger task should run when all 'tail' tasks have completed / succeededtail_tasks_of_first_dag >> my_trigger_task
Note that snippet is for reference purpose only; it has NOT been tested
Points to note / References
I believe you are looking for SubDags operator, running a Dag in a bigger dag.Note that creating many subdags like in the example below gets messy pretty quick, so I recommend splitting each subdag in a file and importing then in a main file.
The SubDagOperator is simple to use you need to give an Id, a subdag (the child) and a dag(the parent)
subdag_2 = SubDagOperator( task_id="just_some_id", subdag=child_subdag, <---- this must be a DAG dag=parent_dag, <----- this must be a DAG )
From their examples repo
from airflow import DAGfrom airflow.example_dags.subdags.subdag import subdagfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.operators.subdag_operator import SubDagOperatorfrom airflow.utils.dates import days_agofrom airflow import DAGfrom airflow.operators.dummy_operator import DummyOperatordef subdag(parent_dag_name, child_dag_name, args): dag_subdag = DAG( dag_id='%s.%s' % (parent_dag_name, child_dag_name), default_args=args, schedule_interval="@daily", ) for i in range(5): DummyOperator( task_id='%s-task-%s' % (child_dag_name, i + 1), default_args=args, dag=dag_subdag, ) return dag_subdagDAG_NAME = 'example_subdag_operator'args = { 'owner': 'airflow', 'start_date': days_ago(2), }dag = DAG( dag_id=DAG_NAME, default_args=args, schedule_interval="@once", tags=['example'] )start = DummyOperator( task_id='start-of-main-job', dag=dag, )some_other_task = DummyOperator( task_id='some-other-task', dag=dag, )end = DummyOperator( task_id='end-of-main-job', dag=dag, )subdag = SubDagOperator( task_id='run-this-dag-after-previous-steps', subdag=subdag(DAG_NAME, 'run-this-dag-after-previous-steps', args), dag=dag, )start >> some_other_task >> end >> subdag