Python script scheduling in airflow
You should probably use the PythonOperator
to call your function. If you want to define the function somewhere else, you can simply import it from a module as long as it's accessible in your PYTHONPATH
.
from airflow import DAGfrom airflow.operators.python_operator import PythonOperatorfrom my_script import my_python_functiondag = DAG('tutorial', default_args=default_args)PythonOperator(dag=dag, task_id='my_task_powered_by_python', provide_context=False, python_callable=my_python_function, op_args=['arguments_passed_to_callable'], op_kwargs={'keyword_argument':'which will be passed to function'})
If your function my_python_function
was in a script file /path/to/my/scripts/dir/my_script.py
Then before starting Airflow, you could add the path to your scripts to the PYTHONPATH
like so:
export PYTHONPATH=/path/to/my/scripts/dir/:$PYTHONPATH
More information here:https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html
Default args and other considerations as in the tutorial: https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html
You can also use bashoperator to execute python scripts in Airflow. You can put your scripts in a folder in DAG folder. If your scripts are somewhere else, just give a path to those scripts.
from airflow import DAG from airflow.operators import BashOperator,PythonOperator from datetime import datetime, timedelta seven_days_ago = datetime.combine(datetime.today() - timedelta(7), datetime.min.time()) default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': seven_days_ago, 'email': ['airflow@airflow.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG('simple', default_args=default_args)t1 = BashOperator( task_id='testairflow', bash_command='python /home/airflow/airflow/dags/scripts/file1.py', dag=dag)
Airflow parses all Python files in $AIRFLOW_HOME/dags (in your case /home/amit/airflow/dags). And that python script should retrun a DAG object back as shown in answer from "postrational". Now when it is being reported as missing that means there is some issue in Python code and Airflow could not load it. Check airflow webserver or scheduler logs for more details, as stderr or stdout goes there.