Python script scheduling in airflow Python script scheduling in airflow python python

Python script scheduling in airflow


You should probably use the PythonOperator to call your function. If you want to define the function somewhere else, you can simply import it from a module as long as it's accessible in your PYTHONPATH.

from airflow import DAGfrom airflow.operators.python_operator import PythonOperatorfrom my_script import my_python_functiondag = DAG('tutorial', default_args=default_args)PythonOperator(dag=dag,               task_id='my_task_powered_by_python',               provide_context=False,               python_callable=my_python_function,               op_args=['arguments_passed_to_callable'],               op_kwargs={'keyword_argument':'which will be passed to function'})

If your function my_python_function was in a script file /path/to/my/scripts/dir/my_script.py

Then before starting Airflow, you could add the path to your scripts to the PYTHONPATH like so:

export PYTHONPATH=/path/to/my/scripts/dir/:$PYTHONPATH

More information here:https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html

Default args and other considerations as in the tutorial: https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html


You can also use bashoperator to execute python scripts in Airflow. You can put your scripts in a folder in DAG folder. If your scripts are somewhere else, just give a path to those scripts.

    from airflow import DAG    from airflow.operators import BashOperator,PythonOperator    from datetime import datetime, timedelta    seven_days_ago = datetime.combine(datetime.today() - timedelta(7),                                      datetime.min.time())    default_args = {        'owner': 'airflow',        'depends_on_past': False,        'start_date': seven_days_ago,        'email': ['airflow@airflow.com'],        'email_on_failure': False,        'email_on_retry': False,        'retries': 1,        'retry_delay': timedelta(minutes=5),      }    dag = DAG('simple', default_args=default_args)t1 = BashOperator(    task_id='testairflow',    bash_command='python /home/airflow/airflow/dags/scripts/file1.py',    dag=dag)


Airflow parses all Python files in $AIRFLOW_HOME/dags (in your case /home/amit/airflow/dags). And that python script should retrun a DAG object back as shown in answer from "postrational". Now when it is being reported as missing that means there is some issue in Python code and Airflow could not load it. Check airflow webserver or scheduler logs for more details, as stderr or stdout goes there.