Avoid global variables for unpicklable shared state among multiprocessing.Pool workers Avoid global variables for unpicklable shared state among multiprocessing.Pool workers python-3.x python-3.x

Avoid global variables for unpicklable shared state among multiprocessing.Pool workers


This was surprisingly tricky. The key here is to preserve read-access to variables that are available at fork-time without serialization. Most solutions to sharing memory in multiprocessing end up serializing. I tried using a weakref.proxy to pass in a classifier without serialization, but that didn't work because both dill and pickle will try to follow and serialize the referent. However, a module-ref works.

This organization gets us close:

import multiprocessing as mpimport csvdef classify(classifier, data_file):    with open(data_file, "rt") as fp, mp.Pool() as pool:        rd = csv.DictReader(fp)        yield from pool.imap_unordered(classifier.classify, rd)def orchestrate(classifier_spec, data_file):    # construct a classifier from the spec; note that we can    # even dynamically import modules here, using config values    # from the spec    import classifier_module    classifier_module.init(classifier_spec)    return classify(classifier_module, data_file)if __name__ == '__main__':    list(orchestrate(None, 'data.txt'))

A few changes to note here:

  • we add an orchestrate method for some DI goodness; orchestrate figures out how to construct/initialize a classifier, and hands it to classify, decoupling the two
  • classify only needs to assume that the classifier parameter has a classify method; it doesn't care if it's an instance or a module

For this Proof of Concept, we provide a Classifier that is obviously not serializable:

# classifier_module.pydef _create_classifier(spec):    # obviously not pickle-able because it's inside a function    class Classifier():        def __init__(self, spec):            pass        def classify(self, x):            print(x)            return x    return Classifier(spec)def init(spec):    global __classifier    __classifier = _create_classifier(spec)def classify(x):    return __classifier.classify(x)

Unfortunately, there's still a global in here, but it's now nicely encapsulated inside a module as a private variable, and the module exports a tight interface composed of the classify and init functions.

This design unlocks some possibilities:

  • orchestrate can import and init different classifier modules, based on what it sees in classifier_spec
  • one could also pass an instance of some Classifier class to classify, as long as this instance is serializable and has a classify method of the same signature


If you want to use forking, I don't see a way around using a global. But I also don't see a reason why you would have to feel bad about using a global in this case, you're not manipulating a global list with multi-threading or so.

It's possible to cope with the ugliness in your example, though. You want to pass classifier.classify directly, but the Classifier object contains objects which cannot be pickled.

import osimport csvimport uuidfrom threading import Lockfrom multiprocessing import Poolfrom weakref import WeakValueDictionaryclass Classifier:    def __init__(self, spec):        self.lock = Lock()  # unpickleable        self.spec = spec    def classify(self, row):        return f'classified by pid: {os.getpid()} with spec: {self.spec}', row

I suggest we subclass Classifier and define __getstate__ and __setstate__ to enable pickling. Since you're using forking anyway, all state it has to pickle, is information how to get a reference to a forked global instance. Then we'll just update the pickled object's __dict__ with the __dict__ of the forked instance (which hasn't gone through the reduction of pickling) and your instance is complete again.

To achieve this without additional boilerplate, the subclassed Classifier instance has to generate a name for itself and register this as a global variable. This first reference, will be a weak reference, so the instance can be garbage collected when the user expects it. The second reference is created by the user when he assigns classifier = Classifier(classifier_spec). This one, doesn't have to be global.

The generated name in the example below is generated with help of standard-lib's uuid module. An uuid is converted to a string and edited into a valid identifier (it wouldn't have to be, but it's convenient for debugging in interactive mode).

class SubClassifier(Classifier):    def __init__(self, spec):        super().__init__(spec)        self.uuid = self._generate_uuid_string()        self.pid = os.getpid()        self._register_global()    def __getstate__(self):        """Define pickled content."""        return {'uuid': self.uuid}    def __setstate__(self, state):        """Set state in child process."""        self.__dict__ = state        self.__dict__.update(self._get_instance().__dict__)    def _get_instance(self):        """Get reference to instance."""        return globals()[self.uuid][self.uuid]    @staticmethod    def _generate_uuid_string():        """Generate id as valid identifier."""        # return 'uuid_' + '123' # for testing        return 'uuid_' + str(uuid.uuid4()).replace('-', '_')    def _register_global(self):        """Register global reference to instance."""        weakd = WeakValueDictionary({self.uuid: self})        globals().update({self.uuid: weakd})    def __del__(self):        """Clean up globals when deleted in parent."""        if os.getpid() == self.pid:            globals().pop(self.uuid)

The sweet thing here is, the boilerplate is totally gone. You don't have to mess manually with declaring and deleting globals since the instance manages everything itself in background:

def classify(classifier_spec, data_file, n_workers):    classifier = SubClassifier(classifier_spec)    # assert globals()['uuid_123']['uuid_123'] # for testing    with open(data_file, "rt") as fh, Pool(n_workers) as pool:        rd = csv.DictReader(fh)        yield from pool.imap_unordered(classifier.classify, rd)if __name__ == '__main__':    PATHFILE = 'data.csv'    N_WORKERS = 4    g = classify(classifier_spec='spec1', data_file=PATHFILE, n_workers=N_WORKERS)    for record in g:        print(record)   # assert 'uuid_123' not in globals() # no reference left


The multiprocessing.sharedctypes module provides functions for allocating ctypes objects from shared memory which can be inherited by child processes, i.e., parent and children can access the shared memory.

You could use
1. multiprocessing.sharedctypes.RawArray to allocate a ctypes array from the shared memory.
2. multiprocessing.sharedctypes.RawValue to allocate a ctypes object from the shared memory.

Dr Mianzhi Wang has written a very detailed document on this. You could share multiple multiprocessing.sharedctypes objects.

You may find the solution here useful to you.