What's a Pythonic way to make a non-blocking version of an object? What's a Pythonic way to make a non-blocking version of an object? multithreading multithreading

What's a Pythonic way to make a non-blocking version of an object?


I've done something similar by using a metaclass to create non-blocking versions of blocking functions on the object. It allows you to create a non-blocking version of a class just by doing this:

class NB_Hardware(object):    __metaclass__ = NonBlockBuilder    delegate = Hardware    nb_funcs = ['blocking_command']

I've taken my original implementation, which targeted Python 3 and used a concurrent.futures.ThreadPoolExecutor (I was wrapping blocking I/O calls to make them non-blocking in an asyncio context*), and adapted them to use Python 2 and a concurrent.futures.ProcessPoolExecutor. Here's the implementation of the metaclass along with its helper classes:

from multiprocessing import cpu_countfrom concurrent.futures import ProcessPoolExecutordef runner(self, cb, *args, **kwargs):    return getattr(self, cb)(*args, **kwargs)class _ExecutorMixin():    """ A Mixin that provides asynchronous functionality.    This mixin provides methods that allow a class to run    blocking methods in a ProcessPoolExecutor.    It also provides methods that attempt to keep the object    picklable despite having a non-picklable ProcessPoolExecutor    as part of its state.    """    pool_workers = cpu_count()    def run_in_executor(self, callback, *args, **kwargs):        """  Runs a function in an Executor.        Returns a concurrent.Futures.Future        """        if not hasattr(self, '_executor'):            self._executor = self._get_executor()        return self._executor.submit(runner, self, callback, *args, **kwargs)    def _get_executor(self):        return ProcessPoolExecutor(max_workers=self.pool_workers)    def __getattr__(self, attr):        if (self._obj and hasattr(self._obj, attr) and            not attr.startswith("__")):            return getattr(self._obj, attr)        raise AttributeError(attr)    def __getstate__(self):        self_dict = self.__dict__        self_dict['_executor'] = None        return self_dict    def __setstate__(self, state):        self.__dict__.update(state)        self._executor = self._get_executor()class NonBlockBuilder(type):    """ Metaclass for adding non-blocking versions of methods to a class.      Expects to find the following class attributes:    nb_funcs - A list containing methods that need non-blocking wrappers    delegate - The class to wrap (add non-blocking methods to)    pool_workers - (optional) how many workers to put in the internal pool.    The metaclass inserts a mixin (_ExecutorMixin) into the inheritence    hierarchy of cls. This mixin provides methods that allow    the non-blocking wrappers to do their work.    """    def __new__(cls, clsname, bases, dct, **kwargs):        nbfunc_list = dct.get('nb_funcs', [])        existing_nbfuncs = set()        def find_existing_nbfuncs(d):            for attr in d:                if attr.startswith("nb_"):                    existing_nbfuncs.add(attr)        # Determine if any bases include the nb_funcs attribute, or        # if either this class or a base class provides an actual        # implementation for a non-blocking method.        find_existing_nbfuncs(dct)        for b in bases:            b_dct = b.__dict__            nbfunc_list.extend(b_dct.get('nb_funcs', []))            find_existing_nbfuncs(b_dct)        # Add _ExecutorMixin to bases.        if _ExecutorMixin not in bases:            bases += (_ExecutorMixin,)        # Add non-blocking funcs to dct, but only if a definition        # is not already provided by dct or one of our bases.        for func in nbfunc_list:            nb_name = 'nb_{}'.format(func)            if nb_name not in existing_nbfuncs:                dct[nb_name] = cls.nbfunc_maker(func)        return super(NonBlockBuilder, cls).__new__(cls, clsname, bases, dct)    def __init__(cls, name, bases, dct):        """ Properly initialize a non-blocking wrapper.        Sets pool_workers and delegate on the class, and also        adds an __init__ method to it that instantiates the        delegate with the proper context.        """        super(NonBlockBuilder, cls).__init__(name, bases, dct)        pool_workers = dct.get('pool_workers')        delegate = dct.get('delegate')        old_init = dct.get('__init__')        # Search bases for values we care about, if we didn't        # find them on the child class.        for b in bases:            if b is object:  # Skip object                continue            b_dct = b.__dict__            if not pool_workers:                pool_workers = b_dct.get('pool_workers')            if not delegate:                delegate = b_dct.get('delegate')            if not old_init:                old_init = b_dct.get('__init__')        cls.delegate = delegate        # If we found a value for pool_workers, set it. If not,        # ExecutorMixin sets a default that will be used.        if pool_workers:            cls.pool_workers = pool_workers        # Here's the __init__ we want every wrapper class to use.        # It just instantiates the delegate object.        def init_func(self, *args, **kwargs):            # Be sure to call the original __init__, if there            # was one.            if old_init:                old_init(self, *args, **kwargs)            if self.delegate:                self._obj = self.delegate(*args, **kwargs)        cls.__init__ = init_func    @staticmethod    def nbfunc_maker(func):        def nb_func(self, *args, **kwargs):            return self.run_in_executor(func, *args, **kwargs)        return nb_func

Usage:

from nb_helper import NonBlockBuilderimport timeclass Hardware:    def __init__(self, other_init_args):        self.other = other_init_args    def blocking_command(self, arg_1, arg_2, arg_3):        print("start blocking")        time.sleep(5)        return "blocking"    def normal_command(self):        return "normal"class NBHardware(object):    __metaclass__ = NonBlockBuilder    delegate = Hardware    nb_funcs = ['blocking_command']if __name__ == "__main__":    h = NBHardware("abc")    print "doing blocking call"    print h.blocking_command(1,2,3)    print "done"    print "doing non-block call"    x = h.nb_blocking_command(1,2,3)  # This is non-blocking and returns concurrent.future.Future    print h.normal_command()  # You can still use the normal functions, too.    print x.result()  # Waits for the result from the Future

Output:

doing blocking callstart blocking< 5 second delay >blockingdonedoing non-block callstart blockingnormal< 5 second delay >blocking

The one tricky piece for you is making sure Hardware is picklable. You can probably do that by making __getstate__ delete the dll object, and recreate it in __setstate__, similar to what _ExecutorMixin does.

You'll also need the Python 2.x backport of concurrent.futures.

Note that there's a bunch of complexity in the metaclass so that they'll work properly with inheritance, and support things like providing custom implementations of __init__ and the nb_* methods. For example, something like this is supported:

class AioBaseLock(object):    __metaclass__ = NonBlockBuilder    pool_workers = 1    coroutines = ['acquire', 'release']def __init__(self, *args, **kwargs):    self._threaded_acquire = False    def _after_fork(obj):        obj._threaded_acquire = False    register_after_fork(self, _after_fork)def coro_acquire(self, *args, **kwargs):    def lock_acquired(fut):        if fut.result():            self._threaded_acquire = True    out = self.run_in_executor(self._obj.acquire, *args, **kwargs)    out.add_done_callback(lock_acquired)    return outclass AioLock(AioBaseLock):    delegate = Lockclass AioRLock(AioBaseLock):    delegate = RLock

If you don't need that kind of flexibility, you can simplify the implementation quite a bit:

class NonBlockBuilder(type):    """ Metaclass for adding non-blocking versions of methods to a class.      Expects to find the following class attributes:    nb_funcs - A list containing methods that need non-blocking wrappers    delegate - The class to wrap (add non-blocking methods to)    pool_workers - (optional) how many workers to put in the internal pool.    The metaclass inserts a mixin (_ExecutorMixin) into the inheritence    hierarchy of cls. This mixin provides methods that allow    the non-blocking wrappers to do their work.    """    def __new__(cls, clsname, bases, dct, **kwargs):        nbfunc_list = dct.get('nb_funcs', [])        # Add _ExecutorMixin to bases.        if _ExecutorMixin not in bases:            bases += (_ExecutorMixin,)        # Add non-blocking funcs to dct, but only if a definition        # is not already provided by dct or one of our bases.        for func in nbfunc_list:            nb_name = 'nb_{}'.format(func)            dct[nb_name] = cls.nbfunc_maker(func)        return super(NonBlockBuilder, cls).__new__(cls, clsname, bases, dct)    def __init__(cls, name, bases, dct):        """ Properly initialize a non-blocking wrapper.        Sets pool_workers and delegate on the class, and also        adds an __init__ method to it that instantiates the        delegate with the proper context.        """        super(NonBlockBuilder, cls).__init__(name, bases, dct)        pool_workers = dct.get('pool_workers')        cls.delegate = dct['delegate']        # If we found a value for pool_workers, set it. If not,        # ExecutorMixin sets a default that will be used.        if pool_workers:            cls.pool_workers = pool_workers        # Here's the __init__ we want every wrapper class to use.        # It just instantiates the delegate object.        def init_func(self, *args, **kwargs):            self._obj = self.delegate(*args, **kwargs)        cls.__init__ = init_func    @staticmethod    def nbfunc_maker(func):        def nb_func(self, *args, **kwargs):            return self.run_in_executor(func, *args, **kwargs)        return nb_func

* The original code is here, for reference.


One method I've used to launch class methods asynchronously is to create a pool and call a few function aliases with apply_async instead of directly calling the class methods.

Say you have an even simpler version of your classes:

class Hardware:    def __init__(self, stuff):        self.stuff = stuff        return    def blocking_command(self, arg1):        self.stuff.call_function(arg1)        return

At the top level of your module, define a new function that looks like this:

def _blocking_command(Hardware_obj, arg1):    return Hardware_obj.blocking_command(Hardware_obj, arg1)

Since the class and this "alias" function are both defined at the top level of the module, they are pickleable and you can kick it off using the multiprocessing library:

import multiprocessinghw_obj = Harware(stuff)pool = multiprocessing.Pool()results_obj = pool.apply_async(_blocking_command, (hw_obj, arg1))

The results of your function calls will be available in the results object. I like this approach because it uses a relatively small amount of code to make parallelization a lot easier. Specifically, it only adds a few two-line functions instead of any classes, and there are no extra imports required besides multiprocessing.

Notes:

  1. Don't use this for methods that need to modify the objects attributes, but it works fine if it is used after all of the class' attributes have been set, effectively treating the class attributes as "read-only".

  2. You can also use this approach inside a class method to launch other class methods, you just have to explicitly pass "self". This could allow you to move your floating "hardware_child_process" function into the class. it would still act as a dispatcher of a bunch of asynchronous processes, but it would centralize that functionality in your Hardware class.