Sharing contiguous numpy arrays between processes in python Sharing contiguous numpy arrays between processes in python numpy numpy

Sharing contiguous numpy arrays between processes in python


Wrap numpy's ndarray around multiprocessing's RawArray()

There are multiple ways to share numpy arrays in memory across processes. Let's have a look at how you can do it using the multiprocessing module.

The first important observation is that numpy provides the np.frombuffer() function to wrap an ndarray interface around a preexisting object that supports the buffer protocol (such as bytes(), bytearray(), array() and so on). This creates read-only arrays from read-only objects and writable arrays from writable objects.

We can combine that with the shared memory RawArray() that multiprocessing provides. Note that Array() doesn't work for that purpose, as it is a proxy object with a lock and doesn't directly expose the buffer interface. Of course that means that we need to provide for proper synchronization of our numpified RawArrays ourselves.

There is one complicating issue regarding ndarray-wrapped RawArrays: When multiprocessing sends such an array between processes - and indeed it will need to send our arrays, once created, to both workers - it pickles and then unpickles them. Unfortunately, that results in it creating copies of the ndarrays instead of sharing them in memory.

The solution, while a bit ugly, is to keep the RawArrays as is until they are transferred to the workers and only wrap them in ndarrays once each worker process has started.

Furthermore, it would have been preferable to communicate arrays, be it a plain RawArray or an ndarray-wrapped one, directly via a multiprocessing.Queue, but that doesn't work, either. A RawArray cannot be put inside such a Queue and an ndarray-wrapped one would have been pickled and unpickled, so in effect copied.

The workaround is to send a list of all pre-allocated arrays to the worker processes and communicate indices into that list over the Queues. It's very much like passing around tokens (the indices) and whoever holds the token is allowed to operate on the associated array.

The structure of the main program could look like this:

#!/usr/bin/env python3# -*- coding: utf-8 -*-import numpy as npimport queuefrom multiprocessing import freeze_support, set_start_methodfrom multiprocessing import Event, Process, Queuefrom multiprocessing.sharedctypes import RawArraydef create_shared_arrays(size, dtype=np.int32, num=2):    dtype = np.dtype(dtype)    if dtype.isbuiltin and dtype.char in 'bBhHiIlLfd':        typecode = dtype.char    else:        typecode, size = 'B', size * dtype.itemsize    return [RawArray(typecode, size) for _ in range(num)]def main():    my_dtype = np.float32    # 125000000 (size) * 4 (dtype) * 2 (num) ~= 1 GB memory usage    arrays = create_shared_arrays(125000000, dtype=my_dtype)    q_free = Queue()    q_used = Queue()    bail = Event()    for arr_id in range(len(arrays)):        q_free.put(arr_id)  # pre-fill free queue with allocated array indices    pr1 = MyDataLoader(arrays, q_free, q_used, bail,                       dtype=my_dtype, step=1024)    pr2 = MyDataProcessor(arrays, q_free, q_used, bail,                          dtype=my_dtype, step=1024)    pr1.start()    pr2.start()    pr2.join()    print("\n{} joined.".format(pr2.name))    pr1.join()    print("{} joined.".format(pr1.name))if __name__ == '__main__':    freeze_support()    # On Windows, only "spawn" is available.    # Also, this tests proper sharing of the arrays without "cheating".    set_start_method('spawn')    main()

This prepares a list of two arrays, two Queues - a "free" queue where MyDataProcessor puts array indices it is done with and MyDataLoader fetches them from as well as a "used" queue where MyDataLoader puts indices of readily filled arrays and MyDataProcessor fetches them from - and a multiprocessing.Event to start a concerted bail out of all workers. We could do away with the latter for now, as we have only one producer and one consumer of arrays, but it doesn't hurt being prepared for more workers.

Then we pre-fill the "empty" Queue with all indices of our RawArrays in the list and instantiate one of each type of workers, passing them the necessary communication objects. We start both of them and just wait for them to join().

Here's how MyDataProcessor could look like, which consumes array indices from the "used" Queue and sends the data off to some external black box (debugio.output in the example):

class MyDataProcessor(Process):    def __init__(self, arrays, q_free, q_used, bail, dtype=np.int32, step=1):        super().__init__()        self.arrays = arrays        self.q_free = q_free        self.q_used = q_used        self.bail = bail        self.dtype = dtype        self.step = step    def run(self):        # wrap RawArrays inside ndarrays        arrays = [np.frombuffer(arr, dtype=self.dtype) for arr in self.arrays]        from debugio import output as writer        while True:            arr_id = self.q_used.get()            if arr_id is None:                break            arr = arrays[arr_id]            print('(', end='', flush=True)          # just visualizing activity            for j in range(0, len(arr), self.step):                writer.write(str(arr[j]) + '\n')            print(')', end='', flush=True)          # just visualizing activity            self.q_free.put(arr_id)            writer.flush()        self.bail.set()                     # tell loaders to bail out ASAP        self.q_free.put(None, timeout=1)    # wake up loader blocking on get()        try:            while True:                self.q_used.get_nowait()    # wake up loader blocking on put()        except queue.Empty:            pass

The first it does is wrap the received RawArrays in ndarrays using 'np.frombuffer()' and keep the new list, so they are usable as numpy arrays during the process' runtime and it doesn't have to wrap them over and over again.

Note also that MyDataProcessor only ever writes to the self.bail Event, it never checks it. Instead, if it needs to be told to quit, it will find a None mark on the queue instead of an array index. This is done for when a MyDataLoader has no more data available and starts the tear down procedure, MyDataProcessor can still process all valid arrays that are in the queue without prematurely exiting.

This is how MyDataLoader could look like:

class MyDataLoader(Process):    def __init__(self, arrays, q_free, q_used, bail, dtype=np.int32, step=1):        super().__init__()        self.arrays = arrays        self.q_free = q_free        self.q_used = q_used        self.bail = bail        self.dtype = dtype        self.step = step    def run(self):        # wrap RawArrays inside ndarrays        arrays = [np.frombuffer(arr, dtype=self.dtype) for arr in self.arrays]        from debugio import input as reader        for _ in range(10):  # for testing we end after a set amount of passes            if self.bail.is_set():                # we were asked to bail out while waiting on put()                return            arr_id = self.q_free.get()            if arr_id is None:                # we were asked to bail out while waiting on get()                self.q_free.put(None, timeout=1)  # put it back for next loader                return            if self.bail.is_set():                # we were asked to bail out while we got a normal array                return            arr = arrays[arr_id]            eof = False            print('<', end='', flush=True)          # just visualizing activity            for j in range(0, len(arr), self.step):                line = reader.readline()                if not line:                    eof = True                    break                arr[j] = np.fromstring(line, dtype=self.dtype, sep='\n')            if eof:                print('EOF>', end='', flush=True)   # just visualizing activity                break            print('>', end='', flush=True)          # just visualizing activity            if self.bail.is_set():                # we were asked to bail out while we filled the array                return            self.q_used.put(arr_id)     # tell processor an array is filled        if not self.bail.is_set():            self.bail.set()             # tell other loaders to bail out ASAP            # mark end of data for processor as we are the first to bail out            self.q_used.put(None)

It is very similar in structure to the other worker. The reason it is bloated up a bit is that it checks the self.bail Event at many points, so as to reduce the likelihood to get stuck. (It's not completely foolproof, as there is a tiny chance that the Event could get set between checking and accessing the Queue. If that's a problem, one needs to use some synchronization primitive arbitrating access to both the Event and the Queue combined.)

It also wraps the received RawArrays in ndarrays at the very beginning and reads data from an external black box (debugio.input in the example).

Note that by playing around with the step= arguments to both workers in the main() function, we can change the ratio of how much reading and writing is done (strictly for testing purposes - in a production environment step= would be 1, reading and writing all numpy array members).

Increasing both values makes the workers only access a few of the values in the numpy arrays, thereby significantly speeding everything up, which goes to show that the performance is not limited by the communication between the worker processes. Had we put numpy arrays directly onto the Queues, copying them forth and back between the processes in whole, increasing the step size would not have significantly improved the performance - it would have remained slow.

For reference, here is the debugio module I used for testing:

#!/usr/bin/env python3# -*- coding: utf-8 -*-from ast import literal_evalfrom io import RawIOBase, BufferedReader, BufferedWriter, TextIOWrapperclass DebugInput(RawIOBase):    def __init__(self, end=None):        if end is not None and end < 0:            raise ValueError("end must be non-negative")        super().__init__()        self.pos = 0        self.end = end    def readable(self):        return True    def read(self, size=-1):        if self.end is None:            if size < 0:                raise NotImplementedError("size must be non-negative")            end = self.pos + size        elif size < 0:            end = self.end        else:            end = min(self.pos + size, self.end)        lines = []        while self.pos < end:            offset = self.pos % 400            pos = self.pos - offset            if offset < 18:                i = (offset + 2) // 2                pos += i * 2 - 2            elif offset < 288:                i = (offset + 12) // 3                pos += i * 3 - 12            else:                i = (offset + 112) // 4                pos += i * 4 - 112            line = str(i).encode('ascii') + b'\n'            line = line[self.pos - pos:end - pos]            self.pos += len(line)            size -= len(line)            lines.append(line)        return b''.join(lines)    def readinto(self, b):        data = self.read(len(b))        b[:len(data)] = data        return len(data)    def seekable(self):        return True    def seek(self, offset, whence=0):        if whence == 0:            pos = offset        elif whence == 1:            pos = self.pos + offset        elif whence == 2:            if self.end is None:                raise ValueError("cannot seek to end of infinite stream")            pos = self.end + offset        else:            raise NotImplementedError("unknown whence value")        self.pos = max((pos if self.end is None else min(pos, self.end)), 0)        return self.posclass DebugOutput(RawIOBase):    def __init__(self):        super().__init__()        self.buf = b''        self.num = 1    def writable(self):        return True    def write(self, b):        *lines, self.buf = (self.buf + b).split(b'\n')        for line in lines:            value = literal_eval(line.decode('ascii'))            if value != int(value) or int(value) & 255 != self.num:                raise ValueError("expected {}, got {}".format(self.num, value))            self.num = self.num % 127 + 1        return len(b)input = TextIOWrapper(BufferedReader(DebugInput()), encoding='ascii')output = TextIOWrapper(BufferedWriter(DebugOutput()), encoding='ascii')