How to programmatically set up Airflow 1.10 logging with localstack s3 endpoint?
I think you're supposed to use localhost
not localstack
for the endpoint, e.g. host = http://localhost:4572
.
In Airflow 1.10 you can override the endpoint on a per-connection basis but unfortunately it only supports one endpoint at a time so you'd be changing it for all AWS hooks using the connection. To override it, edit the relevant connection and in the "Extra" field put:
{"host": "http://localhost:4572"}
I believe this will fix it?
I managed to make this work by referring to this guide. Basically you need to create a connection using the Connection class and pass the credentials that you need, in my case I needed AWS_SESSION_TOKEN, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, REGION_NAME
to make this work. Use this function as a python_callable
in a PythonOperator
which should be the first part of the DAG.
import osimport jsonfrom airflow.models.connection import Connectionfrom airflow.exceptions import AirflowFailExceptiondef _create_connection(**context): """ Sets the connection information about the environment using the Connection class instead of doing it manually in the Airflow UI """ AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") AWS_SESSION_TOKEN = os.getenv("AWS_SESSION_TOKEN") REGION_NAME = os.getenv("REGION_NAME") credentials = [ AWS_SESSION_TOKEN, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, REGION_NAME, ] if not credentials or any(not credential for credential in credentials): raise AirflowFailException("Environment variables were not passed") extras = json.dumps( dict( aws_session_token=AWS_SESSION_TOKEN, aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, region_name=REGION_NAME, ), ) try: Connection( conn_id="s3_con", conn_type="S3", extra=extras, ) except Exception as e: raise AirflowFailException( f"Error creating connection to Airflow :{e!r}", )