How can I provide shared state to my Flask app with multiple workers without depending on additional software?
If your webserver's worker type is compatible with the multiprocessing
module, you can use multiprocessing.managers.BaseManager
to provide a shared state for Python objects. A simple wrapper could look like this:
from multiprocessing import Lockfrom multiprocessing.managers import AcquirerProxy, BaseManager, DictProxydef get_shared_state(host, port, key): shared_dict = {} shared_lock = Lock() manager = BaseManager((host, port), key) manager.register("get_dict", lambda: shared_dict, DictProxy) manager.register("get_lock", lambda: shared_lock, AcquirerProxy) try: manager.get_server() manager.start() except OSError: # Address already in use manager.connect() return manager.get_dict(), manager.get_lock()
You can assign your data to the shared_dict
to make it accessible across processes:
HOST = "127.0.0.1"PORT = 35791KEY = b"secret"shared_dict, shared_lock = get_shared_state(HOST, PORT, KEY)shared_dict["number"] = 0shared_dict["text"] = "Hello World"shared_dict["array"] = numpy.array([1, 2, 3])
However, you should be aware of the following circumstances:
- Use
shared_lock
to protect against race conditions when overwriting values inshared_dict
. (See Flask example below.) - There is no data persistence. If you restart the app, or if the main (the first)
BaseManager
process dies, the shared state is gone. - With this simple implementation of
BaseManager
, you cannot directly edit nested values inshared_dict
. For example,shared_dict["array"][1] = 0
has no effect. You will have to edit a copy and then reassign it to the dictionary key.
Flask example:
The following Flask app uses a global variable to store a counter number:
from flask import Flaskapp = Flask(__name__)number = 0@app.route("/")def counter(): global number number += 1 return str(number)
This works when using only 1 worker gunicorn -w 1 server:app
. When using multiple workers gunicorn -w 4 server:app
it becomes apparent that number
is not a shared state but individual for each worker process.
Instead, with shared_dict
, the app looks like this:
from flask import Flaskapp = Flask(__name__)HOST = "127.0.0.1"PORT = 35791KEY = b"secret"shared_dict, shared_lock = get_shared_state(HOST, PORT, KEY)shared_dict["number"] = 0@app.route("/")def counter(): with shared_lock: shared_dict["number"] += 1 return str(shared_dict["number"])
This works with any number of workers, like gunicorn -w 4 server:app
.
your example is a bit magic for me! I'd suggest reusing the magic already in the multiprocessing
codebase in the form of a Namespace
. I've attempted to make the following code compatible with spawn
servers (i.e. MS Windows) but I only have access to Linux machines, so can't test there
start by pulling in dependencies and defining our custom Manager
and registering a method to get out a Namespace
singleton:
from multiprocessing.managers import BaseManager, Namespace, NamespaceProxyclass SharedState(BaseManager): _shared_state = Namespace(number=0) @classmethod def _get_shared_state(cls): return cls._shared_stateSharedState.register('state', SharedState._get_shared_state, NamespaceProxy)
this might need to be more complicated if creating the initial state is expensive and hence should only be done when it's needed. note that the OPs version of initialising state during process startup will cause everything to reset if gunicorn starts a new worker process later, e.g. after killing one due to a timeout
next I define a function to get access to this shared state, similar to how the OP does it:
def shared_state(address, authkey): manager = SharedState(address, authkey) try: manager.get_server() # raises if another server started manager.start() except OSError: manager.connect() return manager.state()
though I'm not sure if I'd recommend doing things like this. when gunicorn
starts it spawns lots of processes that all race to run this code and it wouldn't surprise me if this could go wrong sometimes. also if it happens to kill off the server process (because of e.g. a timeout) every other process will start to fail
that said, if we wanted to use this we would do something like:
ss = shared_state('server.sock', b'noauth')ss.number += 1
this uses Unix domain sockets (passing a string rather than a tuple as an address) to lock this down a bit more.
also note this has the same race conditions as the OP's code: incrementing a number will cause the value to be transferred to the worker's process, which is then incremented, and sent back to the server. I'm not sure what the _lock
is supposed to be protecting, but I don't think it'll do much