Running "unique" tasks with celery Running "unique" tasks with celery django django

Running "unique" tasks with celery


Based on MattH's answer, you could use a decorator like this:

def single_instance_task(timeout):    def task_exc(func):        @functools.wraps(func)        def wrapper(*args, **kwargs):            lock_id = "celery-single-instance-" + func.__name__            acquire_lock = lambda: cache.add(lock_id, "true", timeout)            release_lock = lambda: cache.delete(lock_id)            if acquire_lock():                try:                    func(*args, **kwargs)                finally:                    release_lock()        return wrapper    return task_exc

then, use it like so...

@periodic_task(run_every=timedelta(minutes=1))@single_instance_task(60*10)def fetch_articles()    yada yada...


Using https://pypi.python.org/pypi/celery_once seems to do the job really nice, including reporting errors and testing against some parameters for uniqueness.

You can do things like:

from celery_once import QueueOncefrom myapp.celery import appfrom time import sleep@app.task(base=QueueOnce, once=dict(keys=('customer_id',)))def start_billing(customer_id, year, month):    sleep(30)    return "Done!"

which just needs the following settings in your project:

ONCE_REDIS_URL = 'redis://localhost:6379/0'ONCE_DEFAULT_TIMEOUT = 60 * 60  # remove lock after 1 hour in case it was stale