How to programmatically set up Airflow 1.10 logging with localstack s3 endpoint? How to programmatically set up Airflow 1.10 logging with localstack s3 endpoint? kubernetes kubernetes

How to programmatically set up Airflow 1.10 logging with localstack s3 endpoint?


I think you're supposed to use localhost not localstack for the endpoint, e.g. host = http://localhost:4572.

In Airflow 1.10 you can override the endpoint on a per-connection basis but unfortunately it only supports one endpoint at a time so you'd be changing it for all AWS hooks using the connection. To override it, edit the relevant connection and in the "Extra" field put:

{"host": "http://localhost:4572"}

I believe this will fix it?


I managed to make this work by referring to this guide. Basically you need to create a connection using the Connection class and pass the credentials that you need, in my case I needed AWS_SESSION_TOKEN, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, REGION_NAME to make this work. Use this function as a python_callable in a PythonOperator which should be the first part of the DAG.

import osimport jsonfrom airflow.models.connection import Connectionfrom airflow.exceptions import AirflowFailExceptiondef _create_connection(**context):    """    Sets the connection information about the environment using the Connection    class instead of doing it manually in the Airflow UI    """    AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")    AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")    AWS_SESSION_TOKEN = os.getenv("AWS_SESSION_TOKEN")    REGION_NAME = os.getenv("REGION_NAME")    credentials = [        AWS_SESSION_TOKEN,        AWS_ACCESS_KEY_ID,        AWS_SECRET_ACCESS_KEY,        REGION_NAME,    ]    if not credentials or any(not credential for credential in credentials):        raise AirflowFailException("Environment variables were not passed")    extras = json.dumps(        dict(            aws_session_token=AWS_SESSION_TOKEN,            aws_access_key_id=AWS_ACCESS_KEY_ID,            aws_secret_access_key=AWS_SECRET_ACCESS_KEY,            region_name=REGION_NAME,        ),    )    try:        Connection(            conn_id="s3_con",            conn_type="S3",            extra=extras,        )    except Exception as e:        raise AirflowFailException(            f"Error creating connection to Airflow :{e!r}",        )