Running "unique" tasks with celery
Based on MattH's answer, you could use a decorator like this:
def single_instance_task(timeout): def task_exc(func): @functools.wraps(func) def wrapper(*args, **kwargs): lock_id = "celery-single-instance-" + func.__name__ acquire_lock = lambda: cache.add(lock_id, "true", timeout) release_lock = lambda: cache.delete(lock_id) if acquire_lock(): try: func(*args, **kwargs) finally: release_lock() return wrapper return task_exc
then, use it like so...
@periodic_task(run_every=timedelta(minutes=1))@single_instance_task(60*10)def fetch_articles() yada yada...
Using https://pypi.python.org/pypi/celery_once seems to do the job really nice, including reporting errors and testing against some parameters for uniqueness.
You can do things like:
from celery_once import QueueOncefrom myapp.celery import appfrom time import sleep@app.task(base=QueueOnce, once=dict(keys=('customer_id',)))def start_billing(customer_id, year, month): sleep(30) return "Done!"
which just needs the following settings in your project:
ONCE_REDIS_URL = 'redis://localhost:6379/0'ONCE_DEFAULT_TIMEOUT = 60 * 60 # remove lock after 1 hour in case it was stale