Check if in celery task Check if in celery task flask flask

Check if in celery task


Here is one way to do it by using celery.current_task. Here is the code to be used by the task:

def notification():    from celery import current_task    if not current_task:        print "directly called"    elif current_task.request.id is None:        print "called synchronously"    else:        print "dispatched"@app.taskdef notify():    notification()

This is code you can run to exercise the above:

        from core.tasks import notify, notification        print "DIRECT"        notification()        print "NOT DISPATCHED"        notify()        print "DISPATCHED"        notify.delay().get()

My task code in the first snippet was in a module named core.tasks. And I shoved the code in the last snippet in a custom Django management command. This tests 3 cases:

  • Calling notification directly.

  • Calling notification through a task executed synchronously. That is, this task is not dispatched through Celery to a worker. The code of the task executes in the same process that calls notify.

  • Calling notification through a task run by a worker. The code of the task executes in a different process from the process that started it.

The output was:

NOT DISPATCHEDcalled synchronouslyDISPATCHEDDIRECTdirectly called

There is no line from the print in the task on the output after DISPATCHED because that line ends up in the worker log:

[2015-12-17 07:23:57,527: WARNING/Worker-4] dispatched

Important note: I initially was using if current_task is None in the first test but it did not work. I checked and rechecked. Somehow Celery sets current_task to an object which looks like None (if you use repr on it, you get None) but is not None. Unsure what is going on there. Using if not current_task works.

Also, I've tested the code above in a Django application but I've not used it in production. There may be gotchas I don't know.