Assembling Numpy Array in Parallel Assembling Numpy Array in Parallel numpy numpy

Assembling Numpy Array in Parallel


I dont know If I am qualified to give proper advice on the topic of shared memory arrays, but I had a similar need to share arrays across processes in python recently and came across a small custom numpy.ndarray implementation for a shared memory array in numpy using the shared ctypes within multiprocessing. Here is a link to the code: shmarray.py. It acts just like a normal array,except the underlying data is stored in shared memory, meaning separate processes can both read and write to the same array.

Using Shared Memory Array

In threading, all information available to the thread (global and local namespace) can be handled as shared between all other threads that have access to it, but in multiprocessing that data is not so easily accessible. On linux data is available for reading, but cannot be written to. Instead when a write is done, the data is copied and then written to, meaning no other process can see those changes. However, if the memory being written to is shared memory, it is not copied. This means with shmarray we can do things similar to the way we would do threading, with the true parallelism of multiprocessing. One way to have access to the shared memory array is with a subclass. I know you are currently using Pool.map(), but I had felt limited by the way map worked, especially when dealing with n-dimensional arrays. Pool.map() is not really designed to work with numpy styled interfaces, at least I don't think it can easily. Here is a simple idea where you would spawn a process for each j in N:

import numpy as npimport shmarrayimport multiprocessingclass Worker(multiprocessing.Process):    def __init__(self, j, C, x):        multiprocessing.Process.__init__()        self.shared_x = x        self.C = C        self.j = j    def run(self):         #Your Stuff         #indx_j will have n_j <<N entries          indx_j = build_indices(self.C,self.j)         #x_j will be entries to be added to vector x at indices indx_j         x_j = build_x_j(indx_j,self.C)         #Add x_j into entries of x         self.shared_x[indx_j] = self.shared_x[indx_j] + x_j  #And then actually do the work  N = #What ever N should be  x = shmarray.zeros(shape=(N,1))  C = #What ever C is, doesn't need to be shared mem, since no writing is happening  procs = []  for j in range(N):      proc = Worker(j, C, x)      procs.append(proc)      proc.start()  #And then join() the processes with the main process  for proc in procs:      proc.join()

Custom Process Pool and Queues

So this might work, but spawning several thousand processes is not really going to be of any use if you only have a few cores. The way I handled this was to implement a Queue system between my process. That is to say, we have a Queue that the main process fills with j's and then a couple worker processes get numbers from the Queue and do work with it, note that by implementing this, you are essentially doing exactly what Pool does. Also note we are actually going to use multiprocessing.JoinableQueue for this since it lets use join() to wait till a queue is emptied.

Its not hard to implement this at all really, simply we must modify our Subclass a bit and how we use it. import numpy as np import shmarray import multiprocessing

class Worker(multiprocessing.Process):    def __init__(self, C, x, job_queue):        multiprocessing.Process.__init__()        self.shared_x = x        self.C = C        self.job_queue = job_queue    def run(self):         #New Queue Stuff         j = None         while j!='kill':  #this is how I kill processes with queues, there might be a cleaner way.             j = self.job_queue.get()  #gets a job from the queue if there is one, otherwise blocks.             if j!='kill':                 #Your Stuff                 indx_j = build_indices(self.C,j)                 x_j = build_x_j(indx_j,self.C)                 self.shared_x[indx_j] = self.shared_x[indx_j] + x_j                 #This tells the queue that the job that was pulled from it                 #Has been completed (we need this for queue.join())             self.job_queue.task_done()  #The way we interact has changed, now we need to define a job queue  job_queue = multiprocessing.JoinableQueue()  N = #What ever N should be  x = shmarray.zeros(shape=(N,1))  C = #What ever C is, doesn't need to be shared mem, since no writing is happening  procs = []  proc_count = multiprocessing.cpu_count() # create as many procs as cores  for _ in range(proc_count):      proc = Worker(C, x, job_queue) #now we pass the job queue instead      procs.append(proc)      proc.start()  #all the workers are just waiting for jobs now.  for j in range(N):      job_queue.put(j)  job_queue.join() #this blocks the main process until the queue has been emptied  #Now if you want to kill all the processes, just send a 'kill'  #job for each process.  for proc in procs:      job_queue.put('kill')  job_queue.join()

Finally, I really cannot say anything about how this will handle writing to overlapping indices at the same time. Worst case is that you could have a serious problem if two things attempt to write at the same time and things get corrupted/crash(I am no expert here so I really have no idea if that would happen). Best case since you are just doing addition, and order of operations doesn't matter, everything runs smoothly. If it doesn't run smoothly, my suggestion is to create a second custom Process subclass that specifically does the array assignment. To implement this you would need to pass both a job queue, and an 'output' queue to the Worker subclass. Within the while loop, you should have a `output_queue.put((indx_j, x_j)). NOTE: If you are putting these into a Queue they are being pickled, which can be slow. I recommend making them shared memory arrays if they can be before using put. It may be faster to just pickle them in some cases, but I have not tested this. To assign these as they are generated, you then need to have your Assigner process read these values from a queue as jobs and apply them, such that the work loop would essentially be:

def run(self):    job = None    while job!='kill':        job = self.job_queue.get()        if job!='kill':            indx_j, x_j = job            #Note this is the process which really needs access to the X array.            self.x[indx_j] += x_j        self.job_queue.task_done()

This last solution will likely be slower than doing the assignment within the worker threads, but if you are doing it this way, you have no worries about race conditions, and memory is still lighter since you can use up the indx_j and x_j values as you generate them, instead of waiting till all of them are done.

Note for Windows

I didn't do any of this work on windows, so I am not 100% certain, but I believe the code above will be very memory intensive since windows does not implement a copy-on-write system for spawning independent processes. Essentially windows will copy ALL information that a process needs when spawning a new one from the main process. To fix this, I think replacing all your x_j and C with shared memory arrays (anything you will be handing around to other processes) instead of normal arrays should cause windows to not copy the data, but I am not certain. You did not specify which platform you were on so I figured better safe than sorry, since multiprocessing is a different beast on windows than linux.