Avoid global variables for unpicklable shared state among multiprocessing.Pool workers
This was surprisingly tricky. The key here is to preserve read-access to variables that are available at fork-time without serialization. Most solutions to sharing memory in multiprocessing end up serializing. I tried using a weakref.proxy
to pass in a classifier without serialization, but that didn't work because both dill and pickle will try to follow and serialize the referent. However, a module-ref works.
This organization gets us close:
import multiprocessing as mpimport csvdef classify(classifier, data_file): with open(data_file, "rt") as fp, mp.Pool() as pool: rd = csv.DictReader(fp) yield from pool.imap_unordered(classifier.classify, rd)def orchestrate(classifier_spec, data_file): # construct a classifier from the spec; note that we can # even dynamically import modules here, using config values # from the spec import classifier_module classifier_module.init(classifier_spec) return classify(classifier_module, data_file)if __name__ == '__main__': list(orchestrate(None, 'data.txt'))
A few changes to note here:
- we add an
orchestrate
method for some DI goodness; orchestrate figures out how to construct/initialize a classifier, and hands it toclassify
, decoupling the two classify
only needs to assume that theclassifier
parameter has aclassify
method; it doesn't care if it's an instance or a module
For this Proof of Concept, we provide a Classifier that is obviously not serializable:
# classifier_module.pydef _create_classifier(spec): # obviously not pickle-able because it's inside a function class Classifier(): def __init__(self, spec): pass def classify(self, x): print(x) return x return Classifier(spec)def init(spec): global __classifier __classifier = _create_classifier(spec)def classify(x): return __classifier.classify(x)
Unfortunately, there's still a global in here, but it's now nicely encapsulated inside a module as a private variable, and the module exports a tight interface composed of the classify
and init
functions.
This design unlocks some possibilities:
orchestrate
can import and init different classifier modules, based on what it sees inclassifier_spec
- one could also pass an instance of some
Classifier
class toclassify
, as long as this instance is serializable and has a classify method of the same signature
If you want to use forking, I don't see a way around using a global. But I also don't see a reason why you would have to feel bad about using a global in this case, you're not manipulating a global list with multi-threading or so.
It's possible to cope with the ugliness in your example, though. You want to pass classifier.classify
directly, but the Classifier
object contains objects which cannot be pickled.
import osimport csvimport uuidfrom threading import Lockfrom multiprocessing import Poolfrom weakref import WeakValueDictionaryclass Classifier: def __init__(self, spec): self.lock = Lock() # unpickleable self.spec = spec def classify(self, row): return f'classified by pid: {os.getpid()} with spec: {self.spec}', row
I suggest we subclass Classifier
and define __getstate__
and __setstate__
to enable pickling. Since you're using forking anyway, all state it has to pickle, is information how to get a reference to a forked global instance. Then we'll just update the pickled object's __dict__
with the __dict__
of the forked instance (which hasn't gone through the reduction of pickling) and your instance is complete again.
To achieve this without additional boilerplate, the subclassed Classifier
instance has to generate a name for itself and register this as a global variable. This first reference, will be a weak reference, so the instance can be garbage collected when the user expects it. The second reference is created by the user when he assigns classifier = Classifier(classifier_spec)
. This one, doesn't have to be global.
The generated name in the example below is generated with help of standard-lib's uuid
module. An uuid is converted to a string and edited into a valid identifier (it wouldn't have to be, but it's convenient for debugging in interactive mode).
class SubClassifier(Classifier): def __init__(self, spec): super().__init__(spec) self.uuid = self._generate_uuid_string() self.pid = os.getpid() self._register_global() def __getstate__(self): """Define pickled content.""" return {'uuid': self.uuid} def __setstate__(self, state): """Set state in child process.""" self.__dict__ = state self.__dict__.update(self._get_instance().__dict__) def _get_instance(self): """Get reference to instance.""" return globals()[self.uuid][self.uuid] @staticmethod def _generate_uuid_string(): """Generate id as valid identifier.""" # return 'uuid_' + '123' # for testing return 'uuid_' + str(uuid.uuid4()).replace('-', '_') def _register_global(self): """Register global reference to instance.""" weakd = WeakValueDictionary({self.uuid: self}) globals().update({self.uuid: weakd}) def __del__(self): """Clean up globals when deleted in parent.""" if os.getpid() == self.pid: globals().pop(self.uuid)
The sweet thing here is, the boilerplate is totally gone. You don't have to mess manually with declaring and deleting globals since the instance manages everything itself in background:
def classify(classifier_spec, data_file, n_workers): classifier = SubClassifier(classifier_spec) # assert globals()['uuid_123']['uuid_123'] # for testing with open(data_file, "rt") as fh, Pool(n_workers) as pool: rd = csv.DictReader(fh) yield from pool.imap_unordered(classifier.classify, rd)if __name__ == '__main__': PATHFILE = 'data.csv' N_WORKERS = 4 g = classify(classifier_spec='spec1', data_file=PATHFILE, n_workers=N_WORKERS) for record in g: print(record) # assert 'uuid_123' not in globals() # no reference left
The multiprocessing.sharedctypes
module provides functions for allocating ctypes objects from shared memory which can be inherited by child processes, i.e., parent and children can access the shared memory.
You could use
1. multiprocessing.sharedctypes.RawArray
to allocate a ctypes array from the shared memory.
2. multiprocessing.sharedctypes.RawValue
to allocate a ctypes object from the shared memory.
Dr Mianzhi Wang has written a very detailed document on this. You could share multiple multiprocessing.sharedctypes
objects.
You may find the solution here useful to you.