Using multiprocessing.Process with a maximum number of simultaneous processes Using multiprocessing.Process with a maximum number of simultaneous processes multithreading multithreading

Using multiprocessing.Process with a maximum number of simultaneous processes


It might be most sensible to use multiprocessing.Pool which produces a pool of worker processes based on the max number of cores available on your system, and then basically feeds tasks in as the cores become available.

The example from the standard docs (http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers) shows that you can also manually set the number of cores:

from multiprocessing import Pooldef f(x):    return x*xif __name__ == '__main__':    pool = Pool(processes=4)              # start 4 worker processes    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

And it's also handy to know that there is the multiprocessing.cpu_count() method to count the number of cores on a given system, if needed in your code.

Edit: Here's some draft code that seems to work for your specific case:

import multiprocessingdef f(name):    print 'hello', nameif __name__ == '__main__':    pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument    for i in xrange(0, 512):        pool.apply_async(f, args=(i,))    pool.close()    pool.join()


I think Semaphore is what you are looking for, it will block the main process after counting down to 0. Sample code:

from multiprocessing import Processfrom multiprocessing import Semaphoreimport timedef f(name, sema):    print('process {} starting doing business'.format(name))    # simulate a time-consuming task by sleeping    time.sleep(5)    # `release` will add 1 to `sema`, allowing other     # processes blocked on it to continue    sema.release()if __name__ == '__main__':    concurrency = 20    total_task_num = 1000    sema = Semaphore(concurrency)    all_processes = []    for i in range(total_task_num):        # once 20 processes are running, the following `acquire` call        # will block the main process since `sema` has been reduced        # to 0. This loop will continue only after one or more         # previously created processes complete.        sema.acquire()        p = Process(target=f, args=(i, sema))        all_processes.append(p)        p.start()    # inside main process, wait for all processes to finish    for p in all_processes:        p.join()

The following code is more structured since it acquires and releases sema in the same function. However, it will consume too much resources if total_task_num is very large:

from multiprocessing import Processfrom multiprocessing import Semaphoreimport timedef f(name, sema):    print('process {} starting doing business'.format(name))    # `sema` is acquired and released in the same    # block of code here, making code more readable,    # but may lead to problem.    sema.acquire()    time.sleep(5)    sema.release()if __name__ == '__main__':    concurrency = 20    total_task_num = 1000    sema = Semaphore(concurrency)    all_processes = []    for i in range(total_task_num):        p = Process(target=f, args=(i, sema))        all_processes.append(p)        # the following line won't block after 20 processes        # have been created and running, instead it will carry         # on until all 1000 processes are created.        p.start()    # inside main process, wait for all processes to finish    for p in all_processes:        p.join()

The above code will create total_task_num processes but only concurrency processes will be running while other processes are blocked, consuming precious system resources.


more generally, this could also look like this:

import multiprocessingdef chunks(l, n):    for i in range(0, len(l), n):        yield l[i:i + n]numberOfThreads = 4if __name__ == '__main__':    jobs = []    for i, param in enumerate(params):        p = multiprocessing.Process(target=f, args=(i,param))        jobs.append(p)    for i in chunks(jobs,numberOfThreads):        for j in i:            j.start()        for j in i:            j.join()

Of course, that way is quite cruel (since it waits for every process in a junk until it continues with the next chunk). Still it works well for approx equal run times of the function calls.