celery task and customize decorator
Not quite sure why passing arguments won't work?
if you use this example:
@task()def add(x, y): return x + y
lets add some logging to the MyCoolTask:
from celery import taskfrom celery.registry import tasksimport loggingimport celerylogger = logging.getLogger(__name__)class MyCoolTask(celery.Task): def __call__(self, *args, **kwargs): """In celery task this function call the run method, here you can set some environment variable before the run of the task""" logger.info("Starting to run") return self.run(*args, **kwargs) def after_return(self, status, retval, task_id, args, kwargs, einfo): #exit point of the task whatever is the state logger.info("Ending run") pass
and create an extended class (extending MyCoolTask, but now with arguments):
class AddTask(MyCoolTask): def run(self,x,y): if x and y: result=add(x,y) logger.info('result = %d' % result) return result else: logger.error('No x or y in arguments')tasks.register(AddTask)
and make sure you pass the kwargs as json data:
{"x":8,"y":9}
I get the result:
[2013-03-05 17:30:25,853: INFO/MainProcess] Starting to run[2013-03-05 17:30:25,855: INFO/MainProcess] result = 17[2013-03-05 17:30:26,739: INFO/MainProcess] Ending run[2013-03-05 17:30:26,741: INFO/MainProcess] Task iamscheduler.tasks.AddTask[6a62641d-16a6-44b6-a1cf-7d4bdc8ea9e0] succeeded in 0.888684988022s: 17
Instead of use decorator why you don't create a base class that extend celery.Task
?
In this way all your tasks can extend your customized task class, where you can implement your personal behavior by using methods __call__
and after_return
.You can also define common methods and object for all your task.
class MyCoolTask(celery.Task): def __call__(self, *args, **kwargs): """In celery task this function call the run method, here you can set some environment variable before the run of the task""" return self.run(*args, **kwargs) def after_return(self, status, retval, task_id, args, kwargs, einfo): #exit point of the task whatever is the state pass