Celery stop execution of a chain Celery stop execution of a chain python python

Celery stop execution of a chain


In my opinion this is a common use-case that doesn't get enough love in the documentation.

Assuming you want to abort a chain mid-way while still reporting SUCCESS as status of the completed tasks, and not sending any error log or whatnot (else you can just raise an exception) then a way to accomplish this is:

@app.task(bind=True)  # Note that we need bind=True for self to workdef task1(self, other_args):    #do_stuff    if end_chain:        self.request.callbacks = None        return    #Other stuff to do if end_chain is False

So in your example:

@app.task(ignore_result=True, bind=True)def is_room_open(self, args_sub_1):    #something time consuming    if http_req_and_parse(args_sub_1):        # go on and do the notify task        return True    else:        self.request.callbacks = None

Will work. Note that instead of ignore_result=True and subtask() you can use the shortcut .si() as stated by @abbasov-alexander

Edited to work with EAGER mode, as suggested by @PhilipGarnero in the comments.


It's unbelievable as a so common case isn't treated in any official documentation. I had to cope with the same issue (but using shared_tasks with bind option, so we have visibility of self object), so I wrote a custom decorator that handles automatically the revocation:

def revoke_chain_authority(a_shared_task):    """    @see: https://gist.github.com/bloudermilk/2173940    @param a_shared_task: a @shared_task(bind=True) celery function.    @return:    """    @wraps(a_shared_task)    def inner(self, *args, **kwargs):        try:            return a_shared_task(self, *args, **kwargs)        except RevokeChainRequested, e:            # Drop subsequent tasks in chain (if not EAGER mode)            if self.request.callbacks:                self.request.callbacks[:] = []            return e.return_value    return inner

You can use it as follows:

@shared_task(bind=True)@revoke_chain_authoritydef apply_fetching_decision(self, latitude, longitude):    #...    if condition:        raise RevokeChainRequested(False)

See the full explanation here.Hope it helps!


Firstly, it seems if into the function exists exception ignore_result don't help you.

Secondly, you use immutable=True It means that next function (in our case is notify) does not take additional arguments. You should use notify.subtask((args_sub_2, ), immutable=False) of course if it suitable for your decision.

Third, you can use shortcuts:

notify.si(args_sub_2) instead notify.subtask((args_sub_2, ), immutable=True)

and

is_room_open.s(args_sub_1) instead is_room_open.subtask((args_sub_1, ))

Try use it code:

@taskdef check_orders():    # check all the orders and send out appropriate notifications    grouped_subs = []    for thingy in things:       ...        grouped_subs.append(chain(is_room_open.s(args_sub_1),                                   notify.s(args_sub_2)))    res = group(grouped_subs).apply_async()    res.join()         #[1]    logger.info('Done checking orders at %s' % current_task.request.id))@taskdef is_room_open(args_sub_1):    #something time consuming    if http_req_and_parse(args_sub_1):        # go on and do the notify task        return True    else:        # [2]        # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how?        # None of the following things work:        # is_room_open.update_state(state='FAILURE')        # raise celery.exceptions.Ignore()        # raise Exception('spam', 'eggs')        # current_task.request.callbacks[:] = []        return False@taskdef notify(result, args_sub_2):    if result:        # something else time consuming, only do this if the first part of the chain         # passed a test (the chained tasks before this were 'successful'        notify_user(args_sub_2)        return True    return False

If you want catch exceptions you must use callback as so

is_room_open.s(args_sub_1, link_error=log_error.s())

from proj.celery import celery@celery.taskdef log_error(task_id):    result = celery.AsyncResult(task_id)    result.get(propagate=False)  # make sure result written.    with open(os.path.join('/var/errors', task_id), 'a') as fh:        fh.write('--\n\n%s %s %s' % (            task_id, result.result, result.traceback))