Using multiprocessing.Process with a maximum number of simultaneous processes
It might be most sensible to use multiprocessing.Pool
which produces a pool of worker processes based on the max number of cores available on your system, and then basically feeds tasks in as the cores become available.
The example from the standard docs (http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers) shows that you can also manually set the number of cores:
from multiprocessing import Pooldef f(x): return x*xif __name__ == '__main__': pool = Pool(processes=4) # start 4 worker processes result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously print result.get(timeout=1) # prints "100" unless your computer is *very* slow print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
And it's also handy to know that there is the multiprocessing.cpu_count()
method to count the number of cores on a given system, if needed in your code.
Edit: Here's some draft code that seems to work for your specific case:
import multiprocessingdef f(name): print 'hello', nameif __name__ == '__main__': pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument for i in xrange(0, 512): pool.apply_async(f, args=(i,)) pool.close() pool.join()
I think Semaphore is what you are looking for, it will block the main process after counting down to 0. Sample code:
from multiprocessing import Processfrom multiprocessing import Semaphoreimport timedef f(name, sema): print('process {} starting doing business'.format(name)) # simulate a time-consuming task by sleeping time.sleep(5) # `release` will add 1 to `sema`, allowing other # processes blocked on it to continue sema.release()if __name__ == '__main__': concurrency = 20 total_task_num = 1000 sema = Semaphore(concurrency) all_processes = [] for i in range(total_task_num): # once 20 processes are running, the following `acquire` call # will block the main process since `sema` has been reduced # to 0. This loop will continue only after one or more # previously created processes complete. sema.acquire() p = Process(target=f, args=(i, sema)) all_processes.append(p) p.start() # inside main process, wait for all processes to finish for p in all_processes: p.join()
The following code is more structured since it acquires and releases sema
in the same function. However, it will consume too much resources if total_task_num
is very large:
from multiprocessing import Processfrom multiprocessing import Semaphoreimport timedef f(name, sema): print('process {} starting doing business'.format(name)) # `sema` is acquired and released in the same # block of code here, making code more readable, # but may lead to problem. sema.acquire() time.sleep(5) sema.release()if __name__ == '__main__': concurrency = 20 total_task_num = 1000 sema = Semaphore(concurrency) all_processes = [] for i in range(total_task_num): p = Process(target=f, args=(i, sema)) all_processes.append(p) # the following line won't block after 20 processes # have been created and running, instead it will carry # on until all 1000 processes are created. p.start() # inside main process, wait for all processes to finish for p in all_processes: p.join()
The above code will create total_task_num
processes but only concurrency
processes will be running while other processes are blocked, consuming precious system resources.
more generally, this could also look like this:
import multiprocessingdef chunks(l, n): for i in range(0, len(l), n): yield l[i:i + n]numberOfThreads = 4if __name__ == '__main__': jobs = [] for i, param in enumerate(params): p = multiprocessing.Process(target=f, args=(i,param)) jobs.append(p) for i in chunks(jobs,numberOfThreads): for j in i: j.start() for j in i: j.join()
Of course, that way is quite cruel (since it waits for every process in a junk until it continues with the next chunk). Still it works well for approx equal run times of the function calls.