multiprocessing and garbage collection multiprocessing and garbage collection unix unix

multiprocessing and garbage collection


In the end, I ended up passing the pool reference around and terminating it manually once the pool.imap iterator was finished:

class Volatile(object):    def do_stuff(self, ...):        pool = multiprocessing.Pool()        return pool, pool.imap(...)    def call_stuff(self):        pool, results = self.do_stuff()        for result in results:            # lazy evaluation of the imap        pool.terminate()

In case anyone stumbles upon this solution in the future: the chunksize parameter is very important in Pool.imap (as opposed to plain Pool.map, where it didn't matter). I manually set it so that each process receives 1 + len(input) / len(pool) jobs. Leaving it to the default chunksize=1 gave me the same performance as if I didn't use parallel processing at all... bad.

I guess there's no real benefit to using ordered imap vs. ordered map, I just personally like iterators better.


In python, you basically have no guarantee of when things will be destroyed, and in this case this is not how multiprocessing pools are designed to be used.

The right thing to do is to share a single pool across multiple calls to the function. The easiest way to do that is to store the pool as a class (or, maybe, instance) variable:

class Dispatcher:    pool = multiprocessing.Pool()    def do_stuff(self, ...):        result = self.pool.map(...)        return result


Indeed, even when all user references to the pool object are deleted, and no tasks are in the queue code, and all garbage collection is done, then still the processes stay as unusable zombies in the operating system - plus we have 3 zombie service threads from Pool hanging (Python 2.7 and 3.4):

>>> del pool>>> gc.collect()0>>> gc.garbage[]>>> threading.enumerate()[<_MainThread(MainThread, started 5632)>, <Thread(Thread-8, started daemon 5252)>,  <Thread(Thread-9, started daemon 5260)>, <Thread(Thread-7, started daemon 7608)>]

And further Pool()'s will add more and more process and thread zombies... which stay until the main process is terminated.

It requires a special poke to stop such zombie pool - via its service thread _handle_workers:

>>> ths = threading.enumerate()>>> for th in ths: ...     try: th.name, th._state, th._Thread__target...     except AttributeError: pass...     ('MainThread', 1, None)('Thread-8', 0, <function _handle_tasks at 0x01462A30>)('Thread-9', 0, <function _handle_results at 0x014629F0>)('Thread-7', 0, <function _handle_workers at 0x01462A70>)>>> ths[-1]._state = multiprocessing.pool.CLOSE  # or TERMINATE>>> threading.enumerate()[<_MainThread(MainThread, started 5632)>]>>> 

That terminates the other service threads and also terminates the child processes.


I think one problem is, that there is a resource leak bug in the Python library, which could be fixed by right usage of weakref's.

The other point is that Pool creation & termination is expensive (including 3 service threads per pool just for management!), and there is ususually no reason to have much more worker processes than CPU cores (high CPU loads) or more than a limited number according to another limiting resource (e.g. network bandwidth). So its reasonable to treat a pool more like a singular app global resource (optionally managed by a timeout) rather than a quicky object just held by a closure (or a terminate()-workaround because of the bug).

For example:

try:    _unused = pool   # reload safe global varexcept NameError:    pool = Nonedef get_pool():    global pool    if pool is None:        atexit.register(stop_pool)        pool = Pool(CPUCORES)    return pooldef stop_pool():    global pool    if pool:        pool.terminate()        pool = None