celery task and customize decorator celery task and customize decorator django django

celery task and customize decorator


Not quite sure why passing arguments won't work?

if you use this example:

@task()def add(x, y):    return x + y

lets add some logging to the MyCoolTask:

from celery import taskfrom celery.registry import tasksimport loggingimport celerylogger = logging.getLogger(__name__)class MyCoolTask(celery.Task):    def __call__(self, *args, **kwargs):        """In celery task this function call the run method, here you can        set some environment variable before the run of the task"""        logger.info("Starting to run")        return self.run(*args, **kwargs)    def after_return(self, status, retval, task_id, args, kwargs, einfo):        #exit point of the task whatever is the state        logger.info("Ending run")        pass

and create an extended class (extending MyCoolTask, but now with arguments):

class AddTask(MyCoolTask):    def run(self,x,y):        if x and y:            result=add(x,y)            logger.info('result = %d' % result)            return result        else:            logger.error('No x or y in arguments')tasks.register(AddTask)

and make sure you pass the kwargs as json data:

{"x":8,"y":9}

I get the result:

[2013-03-05 17:30:25,853: INFO/MainProcess] Starting to run[2013-03-05 17:30:25,855: INFO/MainProcess] result = 17[2013-03-05 17:30:26,739: INFO/MainProcess] Ending run[2013-03-05 17:30:26,741: INFO/MainProcess] Task iamscheduler.tasks.AddTask[6a62641d-16a6-44b6-a1cf-7d4bdc8ea9e0] succeeded in 0.888684988022s: 17


Instead of use decorator why you don't create a base class that extend celery.Task ?

In this way all your tasks can extend your customized task class, where you can implement your personal behavior by using methods __call__ and after_return.You can also define common methods and object for all your task.

class MyCoolTask(celery.Task):    def __call__(self, *args, **kwargs):        """In celery task this function call the run method, here you can        set some environment variable before the run of the task"""        return self.run(*args, **kwargs)    def after_return(self, status, retval, task_id, args, kwargs, einfo):        #exit point of the task whatever is the state        pass