Making an asynchronous task in Flask Making an asynchronous task in Flask python python

Making an asynchronous task in Flask


I would use Celery to handle the asynchronous task for you. You'll need to install a broker to serve as your task queue (RabbitMQ and Redis are recommended).

app.py:

from flask import Flaskfrom celery import Celerybroker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queueapp = Flask(__name__)    celery = Celery(app.name, broker=broker_url)celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py@celery.task(bind=True)def some_long_task(self, x, y):    # Do some long task    ...@app.route('/render/<id>', methods=['POST'])def render_script(id=None):    ...    data = json.loads(request.data)    text_list = data.get('text_list')    final_file = audio_class.render_audio(data=text_list)    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables    return Response(        mimetype='application/json',        status=200    )

Run your Flask app, and start another process to run your celery worker.

$ celery worker -A app.celery --loglevel=debug

I would also refer to Miguel Gringberg's write up for a more in depth guide to using Celery with Flask.


Threading is another possible solution. Although the Celery based solution is better for applications at scale, if you are not expecting too much traffic on the endpoint in question, threading is a viable alternative.

This solution is based on Miguel Grinberg's PyCon 2016 Flask at Scale presentation, specifically slide 41 in his slide deck. His code is also available on github for those interested in the original source.

From a user perspective the code works as follows:

  1. You make a call to the endpoint that performs the long running task.
  2. This endpoint returns 202 Accepted with a link to check on the task status.
  3. Calls to the status link returns 202 while the taks is still running, and returns 200 (and the result) when the task is complete.

To convert an api call to a background task, simply add the @async_api decorator.

Here is a fully contained example:

from flask import Flask, g, abort, current_app, request, url_forfrom werkzeug.exceptions import HTTPException, InternalServerErrorfrom flask_restful import Resource, Apifrom datetime import datetimefrom functools import wrapsimport threadingimport timeimport uuidtasks = {}app = Flask(__name__)api = Api(app)@app.before_first_requestdef before_first_request():    """Start a background thread that cleans up old tasks."""    def clean_old_tasks():        """        This function cleans up old tasks from our in-memory data structure.        """        global tasks        while True:            # Only keep tasks that are running or that finished less than 5            # minutes ago.            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60            tasks = {task_id: task for task_id, task in tasks.items()                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}            time.sleep(60)    if not current_app.config['TESTING']:        thread = threading.Thread(target=clean_old_tasks)        thread.start()def async_api(wrapped_function):    @wraps(wrapped_function)    def new_function(*args, **kwargs):        def task_call(flask_app, environ):            # Create a request context similar to that of the original request            # so that the task can have access to flask.g, flask.request, etc.            with flask_app.request_context(environ):                try:                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)                except HTTPException as e:                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)                except Exception as e:                    # The function raised an exception, so we set a 500 error                    tasks[task_id]['return_value'] = InternalServerError()                    if current_app.debug:                        # We want to find out if something happened so reraise                        raise                finally:                    # We record the time of the response, to help in garbage                    # collecting old tasks                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())                    # close the database session (if any)        # Assign an id to the asynchronous task        task_id = uuid.uuid4().hex        # Record the task, and then launch it        tasks[task_id] = {'task_thread': threading.Thread(            target=task_call, args=(current_app._get_current_object(),                               request.environ))}        tasks[task_id]['task_thread'].start()        # Return a 202 response, with a link that the client can use to        # obtain task status        print(url_for('gettaskstatus', task_id=task_id))        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}    return new_functionclass GetTaskStatus(Resource):    def get(self, task_id):        """        Return status about an asynchronous task. If this request returns a 202        status code, it means that task hasn't finished yet. Else, the response        from the task is returned.        """        task = tasks.get(task_id)        if task is None:            abort(404)        if 'return_value' not in task:            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}        return task['return_value']class CatchAll(Resource):    @async_api    def get(self, path=''):        # perform some intensive processing        print("starting processing task, path: '%s'" % path)        time.sleep(10)        print("completed processing task, path: '%s'" % path)        return f'The answer is: {path}'api.add_resource(CatchAll, '/<path:path>', '/')api.add_resource(GetTaskStatus, '/status/<task_id>')if __name__ == '__main__':    app.run(debug=True)


You can also try using multiprocessing.Process with daemon=True; the process.start() method does not block and you can return a response/status immediately to the caller while your expensive function executes in the background.

I experienced similar problem while working with falcon framework and using daemon process helped.

You'd need to do the following:

from multiprocessing import Process@app.route('/render/<id>', methods=['POST'])def render_script(id=None):    ...    heavy_process = Process(  # Create a daemonic process with heavy "my_func"        target=my_func,        daemon=True    )    heavy_process.start()    return Response(        mimetype='application/json',        status=200    )# Define some heavy functiondef my_func():    time.sleep(10)    print("Process finished")

You should get a response immediately and, after 10s you should see a printed message in the console.

NOTE: Keep in mind that daemonic processes are not allowed to spawn any child processes.