How to spawn parallel child processes on a multi-processor system?
What you are looking for is the process pool class in multiprocessing.
import multiprocessingimport subprocessdef work(cmd): return subprocess.call(cmd, shell=False)if __name__ == '__main__': count = multiprocessing.cpu_count() pool = multiprocessing.Pool(processes=count) print pool.map(work, ['ls'] * count)
And here is a calculation example to make it easier to understand. The following will divide 10000 tasks on N processes where N is the cpu count. Note that I'm passing None as the number of processes. This will cause the Pool class to use cpu_count for the number of processes (reference)
import multiprocessingimport subprocessdef calculate(value): return value * 10if __name__ == '__main__': pool = multiprocessing.Pool(None) tasks = range(10000) results = [] r = pool.map_async(calculate, tasks, callback=results.append) r.wait() # Wait on the results print results
Here is the solution I came up, based on Nadia and Jim's comments. I am not sure if it is the best way, but it works. The original child script being called needs to be a shell script because I need to use some 3rd party apps including Matlab. So I had to take it out of Python and code it in bash.
import sysimport osimport multiprocessingimport subprocessdef work(staname): print 'Processing station:',staname print 'Parent process:', os.getppid() print 'Process id:', os.getpid() cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ] return subprocess.call(cmd, shell=False)if __name__ == '__main__': my_list = [ 'XYZ', 'ABC', 'NYU' ] my_list.sort() print my_list # Get the number of processors available num_processes = multiprocessing.cpu_count() threads = [] len_stas = len(my_list) print "+++ Number of stations to process: %s" % (len_stas) # run until all the threads are done, and there is no data left for list_item in my_list: # if we aren't using all the processors AND there is still data left to # compute, then spawn another thread if( len(threads) < num_processes ): p = multiprocessing.Process(target=work,args=[list_item]) p.start() print p, p.is_alive() threads.append(p) else: for thread in threads: if not thread.is_alive(): threads.remove(thread)
Does this seem like a reasonable solution? I tried to use Jim's while loop format, but my script just returned nothing. I am not sure why that would be. Here is the output when I run the script with Jim's 'while' loop replacing the 'for' loop:
hostname{me}2% controller.py ['ABC', 'NYU', 'XYZ']Number of processes: 64+++ Number of stations to process: 3hostname{me}3%
When I run it with the 'for' loop, I get something more meaningful:
hostname{me}6% controller.py ['ABC', 'NYU', 'XYZ']Number of processes: 64+++ Number of stations to process: 3Processing station: ABCParent process: 1056Process id: 1068Processing station: NYUParent process: 1056Process id: 1069Processing station: XYZParent process: 1056Process id: 1071hostname{me}7%
So this works, and I am happy. However, I still don't get why I can't use Jim's 'while' style loop instead of the 'for' loop I am using. Thanks for all the help - I am impressed with the breadth of knowledge @ stackoverflow.