How to restart a failed task on Airflow How to restart a failed task on Airflow hadoop hadoop

How to restart a failed task on Airflow


In the UI:

  1. Go to the dag, and dag run of the run you want to change
  2. Click on GraphView
  3. Click on task A
  4. Click "Clear"

This will let task A run again, and if it succeeds, task C should run.This works because when you clear a task's status, the scheduler will treat it as if it hadn't run before for this dag run.


Here's an alternate solution where you can have it clear and retry certain tasks automatically. If you only want to clear a certain task, you would not use the -d (downstream) flag:

from airflow import DAGfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.operators.bash_operator import BashOperatorfrom datetime import datetime, timedeltadef clear_upstream_task(context):    execution_date = context.get("execution_date")    clear_tasks = BashOperator(        task_id='clear_tasks',        bash_command=f'airflow tasks clear -s {execution_date}  -t t1 -d -y clear_upstream_task'    )    return clear_tasks.execute(context=context)# Default settings applied to all tasksdefault_args = {    'owner': 'airflow',    'depends_on_past': False,    'email_on_failure': False,    'email_on_retry': False,    'retries': 1,    'retry_delay': timedelta(seconds=5)}with DAG('clear_upstream_task',         start_date=datetime(2021, 1, 1),         max_active_runs=3,         schedule_interval=timedelta(minutes=5),         default_args=default_args,         catchup=False         ) as dag:    t0 = DummyOperator(        task_id='t0'    )    t1 = DummyOperator(        task_id='t1'    )    t2 = DummyOperator(        task_id='t2'    )    t3 = BashOperator(        task_id='t3',        bash_command='exit 123',        on_failure_callback=clear_upstream_task    )    t0 >> t1 >> t2 >> t3