Check if in celery task
Here is one way to do it by using celery.current_task
. Here is the code to be used by the task:
def notification(): from celery import current_task if not current_task: print "directly called" elif current_task.request.id is None: print "called synchronously" else: print "dispatched"@app.taskdef notify(): notification()
This is code you can run to exercise the above:
from core.tasks import notify, notification print "DIRECT" notification() print "NOT DISPATCHED" notify() print "DISPATCHED" notify.delay().get()
My task code in the first snippet was in a module named core.tasks
. And I shoved the code in the last snippet in a custom Django management command. This tests 3 cases:
Calling
notification
directly.Calling
notification
through a task executed synchronously. That is, this task is not dispatched through Celery to a worker. The code of the task executes in the same process that callsnotify
.Calling
notification
through a task run by a worker. The code of the task executes in a different process from the process that started it.
The output was:
NOT DISPATCHEDcalled synchronouslyDISPATCHEDDIRECTdirectly called
There is no line from the print
in the task on the output after DISPATCHED
because that line ends up in the worker log:
[2015-12-17 07:23:57,527: WARNING/Worker-4] dispatched
Important note: I initially was using if current_task is None
in the first test but it did not work. I checked and rechecked. Somehow Celery sets current_task
to an object which looks like None
(if you use repr
on it, you get None
) but is not None
. Unsure what is going on there. Using if not current_task
works.
Also, I've tested the code above in a Django application but I've not used it in production. There may be gotchas I don't know.