How can I set up Celery to call a custom initialization function before running my tasks? How can I set up Celery to call a custom initialization function before running my tasks? python python

How can I set up Celery to call a custom initialization function before running my tasks?


You can either write a custom loader, or use the signals.

Loaders have the on_task_init method, which is called when a task is about to be executed,and on_worker_init which is called by the celery+celerybeat main process.

Using signals is probably the easiest, the signals available are:

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

    Dispatched when a task is about to be executed by the worker (or locallyif using apply/or if CELERY_ALWAYS_EAGER has been set).

  • task_postrun(task_id, task, args, kwargs, retval)Dispatched after a task has been executed in the same conditions as above.

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    Called when a task is applied (not good for long-running operations)

Additional signals available in 0.9.x (current master branch on github):

  • worker_init()

    Called when celeryd has started (before the task is initialized, so if on asystem supporting fork, any memory changes would be copied to the childworker processes).

  • worker_ready()

    Called when celeryd is able to receive tasks.

  • worker_shutdown()

    Called when celeryd is shutting down.

Here's an example precalculating something the first time a task is run in the process:

from celery.task import Taskfrom celery.registry import tasksfrom celery.signals import task_prerun_precalc_table = {}class PowersOfTwo(Task):    def run(self, x):        if x in _precalc_table:            return _precalc_table[x]        else:            return x ** 2tasks.register(PowersOfTwo)def _precalc_numbers(**kwargs):    if not _precalc_table: # it's empty, so haven't been generated yet        for i in range(1024):            _precalc_table[i] = i ** 2# need to use registered instance for sender argument.task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])

If you want the function to be run for all tasks, just skip the sender argument.