Background tasks in flask Background tasks in flask multithreading multithreading

Background tasks in flask


Best practice

The best way to implement background tasks in flask is with Celery as explained in this SO post. A good starting point is the official Flask documentation and the Celery documentation.

Crazy way: Build your own decorator

As @MrLeeh pointed out in a comment, Miguel Grinberg presented a solution in his Pycon 2016 talk by implementing a decorator. I want to emphasize that I have the highest respect for his solution; he called it a "crazy solution" himself. The below code is a minor adaptation of his solution.

Warning!!!

Don't use this in production! The main reason is that this app has a memory leak by using the global tasks dictionary. Even if you fix the memory leak issue, maintaining this sort of code is hard. If you just want to play around or use this in a private project, read on.

Minimal example

Assume you have a long running function call in your /foo endpoint. I mock this with a 10 second sleep timer. If you call the enpoint three times, it will take 30 seconds to finish.

Miguel Grinbergs decorator solution is implemented in flask_async. It runs a new thread in a Flask context which is identical to the current Flask context. Each thread is issued a new task_id. The result is saved in a global dictionary tasks[task_id]['result'].

With the decorator in place you only need to decorate the endpoint with @flask_async and the endpoint is asynchronous - just like that!

import threadingimport timeimport uuidfrom functools import wrapsfrom flask import Flask, current_app, request, abortfrom werkzeug.exceptions import HTTPException, InternalServerErrorapp = Flask(__name__)tasks = {}def flask_async(f):    """    This decorator transforms a sync route to asynchronous by running it in a background thread.    """    @wraps(f)    def wrapped(*args, **kwargs):        def task(app, environ):            # Create a request context similar to that of the original request            with app.request_context(environ):                try:                    # Run the route function and record the response                    tasks[task_id]['result'] = f(*args, **kwargs)                except HTTPException as e:                    tasks[task_id]['result'] = current_app.handle_http_exception(e)                except Exception as e:                    # The function raised an exception, so we set a 500 error                    tasks[task_id]['result'] = InternalServerError()                    if current_app.debug:                        # We want to find out if something happened so reraise                        raise        # Assign an id to the asynchronous task        task_id = uuid.uuid4().hex        # Record the task, and then launch it        tasks[task_id] = {'task': threading.Thread(            target=task, args=(current_app._get_current_object(), request.environ))}        tasks[task_id]['task'].start()        # Return a 202 response, with an id that the client can use to obtain task status        return {'TaskId': task_id}, 202    return wrapped@app.route('/foo')@flask_asyncdef foo():    time.sleep(10)    return {'Result': True}@app.route('/foo/<task_id>', methods=['GET'])def foo_results(task_id):    """        Return results of asynchronous task.        If this request returns a 202 status code, it means that task hasn't finished yet.        """    task = tasks.get(task_id)    if task is None:        abort(404)    if 'result' not in task:        return {'TaskID': task_id}, 202    return task['result']if __name__ == '__main__':    app.run(debug=True)

However, you need a little trick to get your results. The endpoint /foo will only return the HTTP code 202 and the task id, but not the result. You need another endpoint /foo/<task_id> to get the result. Here is an example for localhost:

import timeimport requeststask_ids = [requests.get('http://127.0.0.1:5000/foo').json().get('TaskId')            for _ in range(2)]time.sleep(11)results = [requests.get(f'http://127.0.0.1:5000/foo/{task_id}').json()           for task_id in task_ids]# [{'Result': True}, {'Result': True}]