celery - chaining groups and subtasks. -> out of order execution
So as it turns out, in celery you cannot chain two groups together.
I suspect this is because groups chained with tasks automatically become a chord
--> Celery docs: http://docs.celeryproject.org/en/latest/userguide/canvas.html
Chaining a group together with another task will automatically upgrade it to be a chord:
Groups return a parent task. When chaining two groups together, I suspect that when the first group completes, the chord starts the callback "task". I suspect this "task" is actually the "parent task" of the second group. I further suspect that this parent task completes as soon as it finishes kicking off all the subtasks within the group and as a result the next item after the 2nd group is executed.
To demonstrate this here is some sample code. You'll need to already have a running celery instance.
# celery_experiment.pyfrom celery import task, group, chain, chordfrom celery.signals import task_sent, task_postrun, task_prerunimport timeimport loggingimport randomrandom.seed()logging.basicConfig(level=logging.DEBUG)### HANDLERS ### @task_prerun.connect()def task_starting_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds): try: logging.info('[%s] starting' % kwargs['id']) except KeyError: pass@task_postrun.connect()def task_finished_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds): try: logging.info('[%s] finished' % kwargs['id']) except KeyError: passdef random_sleep(id): slp = random.randint(1, 3) logging.info('[%s] sleep for %ssecs' % (id, slp)) time.sleep(slp)@task()def thing(id): logging.info('[%s] begin' % id) random_sleep(id) logging.info('[%s] end' % id)def exec_exp(): st = thing.si(id='st') st_arr = [thing.si(id='st_arr1_a'), thing.si(id='st_arr1_b'), thing.si(id='st_arr1_c'),] st_arr2 = [thing.si(id='st_arr2_a'), thing.si(id='st_arr2_b'),] st2 = thing.si(id='st2') st3 = thing.si(id='st3') st4 = thing.si(id='st4') grp1 = group(st_arr) grp2 = group(st_arr2) # chn can chain two groups together because they are seperated by a single subtask chn = (st | grp1 | st2 | grp2 | st3 | st4) # in chn2 you can't chain two groups together. what will happen is st3 will start before grp2 finishes #chn2 = (st | st2 | grp1 | grp2 | st3 | st4) r = chn() #r2 = chn2()
I have the same issue with celery, trying to have a workflow where the first step is "spawn a million tasks". Tried groups of groups, subtasks, eventually my step2 kicks off before step1 is over.
Long story short I might have found a solution with the use of chords and a dumb finisher:
@celery.taskdef chordfinisher( *args, **kwargs ): return "OK"
Doing nothing much, but it enables me to do this:
tasks = []for id in ids: tasks.append( mytask.si( id ) )step1 = chord( group( tasks ), chordfinisher.si() )step2 = ...workflow = chain( step1, step2 )
Originally I wanted to have step1 in a subtask but for the same reason as suspected, the action of calling a group ends, the task is considered finished, and my workflow moves on...
If someone has something better, I'm interested!