Show a progress bar for my multithreaded process Show a progress bar for my multithreaded process flask flask

Show a progress bar for my multithreaded process


For interprocess communication you can use a multiprocessiong.Queue and your workers can put_nowait tuples with progress information on it while doing their work. Your main process can update whatever your view_progress is reading until all results are ready.

A bit like in this example usage of a Queue, with a few adjustments:

In the writers (workers) I'd use put_nowait instead of put because working is more important than waiting to report that you are working (but perhaps you judge otherwise and decide that informing the user is part of the task and should never be skipped).

The example just puts strings on the queue, I'd use collections.namedtuples for more structured messages. On tasks with many steps, this enables you to raise the resolution of you progress report, and report more to the user.


In general the approach you are taking is okay, I do it in a similar way.

To calculate the progress you can use an auxiliary function that counts the completed tasks:

def get_progress(result_objs):    done = 0    errors = 0    for r in result_objs:        if r.ready():            done += 1            if not r.successful():                errors += 1    return (done, errors)

Note that as a bonus this function returns how many of the "done" tasks ended in errors.

The big problem is for the /api/v1.0/progress route to find the array of AsyncResult objects.

Unfortunately AsyncResult objects cannot be serialized to a session, so that option is out. If your application supports a single set of async tasks at a time then you can just store this array as a global variable. If you need to support multiple clients, each with a different set of async tasks, then you will need figure out a strategy to keep client session data in the server.

I implemented the single client solution as a quick test. My view functions are as follows:

results = None@app.route('/')def index():    global results    results = [pool.apply_async(do_work) for n in range(20)]    return render_template('index.html')@app.route('/api/v1.0/progress')def progress():    global results    total = len(results)    done, errored = get_progress(results)    return jsonify({'total': total, 'done': done, 'errored': errored})

I hope this helps!


I think you should be able to update the number of completed tasks using multiprocessing.Value and multiprocessing.Lock.

In your main code, use:

processes=multiprocessing.Value('i', 10)lock=multiprocessing.Lock()

And then, when you call worker.dowork, pass a lock object and the value to it:

worker.dowork(lock, processes)

In your worker.dowork code, decrease "processes" by one when the code is finished:

lock.acquire()processes.value-=1lock.release()

Now, "processes.value" should be accessible from your main code, and be equal to the number of remaining processes. Make sure you acquire the lock before acessing processes.value, and release the lock afterwards