Using Airflow macro outside of operators Using Airflow macro outside of operators hadoop hadoop

Using Airflow macro outside of operators


Lucky for you bucket_key is templated, just put the jinja template inside.

…bucket_key=getPath() + '{{ ds }}',…

Completely outside an operator you cannot use these macros. Because the file is interpreted regularly by the scheduler, and not just during a dag run. So what would the value of ds be when the dag isn't running?

However since you're unlikely to want to do anything with it outside the tasks, you could put it into a templated field. You can also extend another field to be templated.

class MySensor(S3KeySensor):    template_fields = ('bucket_key', 'bucket_name', 'my_thing')    def __init__(self, my_thing=None, *args, **kwargs):        super(MySensor, self).__init__(*args, **kwargs)        self.my_thing = my_thing    def post_execute(self, context):        logging.info(           "I probably wanted to over-ride poke to use {}".format(self.my_thing)scanner = MySensor(    my_thing='{{ ds }}',    task_id='scanner',    poke_interval=60,    timeout=24 * 60 * 60,    soft_fail=False,    wildcard_match=True,    bucket_key=getPath() + '{{ ds }}',    bucket_name=bucketName,    dag=dag)

Edit: IIRC self.my_thing doesn't change after the init, rather, the context.my_thing will get templated just before (?pre_execute and) execute is called.


Use double quotes.

datestamp = "{{ ds }}"print datestamp