Background tasks in flask
Best practice
The best way to implement background tasks in flask is with Celery as explained in this SO post. A good starting point is the official Flask documentation and the Celery documentation.
Crazy way: Build your own decorator
As @MrLeeh pointed out in a comment, Miguel Grinberg presented a solution in his Pycon 2016 talk by implementing a decorator. I want to emphasize that I have the highest respect for his solution; he called it a "crazy solution" himself. The below code is a minor adaptation of his solution.
Warning!!!
Don't use this in production! The main reason is that this app has a memory leak by using the global tasks
dictionary. Even if you fix the memory leak issue, maintaining this sort of code is hard. If you just want to play around or use this in a private project, read on.
Minimal example
Assume you have a long running function call in your /foo
endpoint. I mock this with a 10 second sleep
timer. If you call the enpoint three times, it will take 30 seconds to finish.
Miguel Grinbergs decorator solution is implemented in flask_async
. It runs a new thread in a Flask context which is identical to the current Flask context. Each thread is issued a new task_id
. The result is saved in a global dictionary tasks[task_id]['result']
.
With the decorator in place you only need to decorate the endpoint with @flask_async
and the endpoint is asynchronous - just like that!
import threadingimport timeimport uuidfrom functools import wrapsfrom flask import Flask, current_app, request, abortfrom werkzeug.exceptions import HTTPException, InternalServerErrorapp = Flask(__name__)tasks = {}def flask_async(f): """ This decorator transforms a sync route to asynchronous by running it in a background thread. """ @wraps(f) def wrapped(*args, **kwargs): def task(app, environ): # Create a request context similar to that of the original request with app.request_context(environ): try: # Run the route function and record the response tasks[task_id]['result'] = f(*args, **kwargs) except HTTPException as e: tasks[task_id]['result'] = current_app.handle_http_exception(e) except Exception as e: # The function raised an exception, so we set a 500 error tasks[task_id]['result'] = InternalServerError() if current_app.debug: # We want to find out if something happened so reraise raise # Assign an id to the asynchronous task task_id = uuid.uuid4().hex # Record the task, and then launch it tasks[task_id] = {'task': threading.Thread( target=task, args=(current_app._get_current_object(), request.environ))} tasks[task_id]['task'].start() # Return a 202 response, with an id that the client can use to obtain task status return {'TaskId': task_id}, 202 return wrapped@app.route('/foo')@flask_asyncdef foo(): time.sleep(10) return {'Result': True}@app.route('/foo/<task_id>', methods=['GET'])def foo_results(task_id): """ Return results of asynchronous task. If this request returns a 202 status code, it means that task hasn't finished yet. """ task = tasks.get(task_id) if task is None: abort(404) if 'result' not in task: return {'TaskID': task_id}, 202 return task['result']if __name__ == '__main__': app.run(debug=True)
However, you need a little trick to get your results. The endpoint /foo
will only return the HTTP code 202 and the task id, but not the result. You need another endpoint /foo/<task_id>
to get the result. Here is an example for localhost:
import timeimport requeststask_ids = [requests.get('http://127.0.0.1:5000/foo').json().get('TaskId') for _ in range(2)]time.sleep(11)results = [requests.get(f'http://127.0.0.1:5000/foo/{task_id}').json() for task_id in task_ids]# [{'Result': True}, {'Result': True}]