python concurrent.futures.ProcessPoolExecutor: Performance of .submit() vs .map() python concurrent.futures.ProcessPoolExecutor: Performance of .submit() vs .map() python python

python concurrent.futures.ProcessPoolExecutor: Performance of .submit() vs .map()


Overview:

There are 2 parts to my answer:

  • Part 1 shows how to gain more speed-up from @niemmi's ProcessPoolExecutor.map() solution.
  • Part 2 shows when the ProcessPoolExecutor's subclasses .submit() and .map() yield non-equivalent compute times.

=======================================================================

Part 1: More Speed-up for ProcessPoolExecutor.map()

Background:This section builds on @niemmi's .map() solution, which by itself is excellent. While doing some research on his discretization scheme to better understand how that interact with .map() chunksizes arguement, I found this interesting solution.

I regard @niemmi's definition of chunk = nmax // workers to be a definition for chunksize, i.e. a smaller size of actual number range (given task) to be tackled by each worker in the worker pool. Now, this definition is premised on the assumption that if a computer has x number of workers, dividing the task equally among each worker will result in optimum use of each worker and hence the total task will be completed fastest. Therefore, the number of chunks to break up a given task into should always equal the number of pool workers. However, is this assumption correct?

Proposition: Here, I propose that the above assumption does not always lead to the fastest compute time when used with ProcessPoolExecutor.map(). Rather, discretising a task to an amount greater than the number of pool workers can lead to speed-up, i.e. faster completion of a given task.

Experiment: I have modified @niemmi's code to allow the number of discretized tasks to exceed the number of pool workers. This code is given below and used to fin the number of times the number 5 appears in the number range of 0 to 1E8. I have executed this code using 1, 2, 4, and 6 pool workers and for various ratio of number of discretized tasks vs the number of pool workers. For each scenario, 3 runs were made and the compute times were tabulated. "Speed-up" is defined here as the average compute time using equal number of chunks and pool workers over the average compute time of when the number of discretized tasks is greater than the number of pool workers.

Findings:

nchunk over nworkers

  1. Figure on left shows the compute time taken by all the scenarios mentioned in the experiment section. It shows that the compute time taken by number of chunks / number of workers = 1 is always greater than the compute time taken by number of chunks > number of workers. That is, the former case is always less efficient than the latter.

  2. Figure on right shows that a speed-up of 1.2 times or more was gained when the number of chunks / number of workers reach a threshold value of 14 or more. It is interesting to observe that the speed-up trend also occurred when ProcessPoolExecutor.map() was executed with 1 worker.

Conclusion: When customizing the number of discrete tasks that ProcessPoolExecutor.map()` should use to solve a given task, it is prudent to ensure that this number is greater than the number pool workers as this practice shortens compute time.

concurrent.futures.ProcessPoolExecutor.map() code. (revised parts only)

def _concurrent_map(nmax, number, workers, num_of_chunks):    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to       find the occurrences of a given number in a number range in a parallelised       manner.'''    # 1. Local variables    start = time()    chunksize = nmax // num_of_chunks    futures = []    found =[]    #2. Parallelization    with cf.ProcessPoolExecutor(max_workers=workers) as executor:        # 2.1. Discretise workload and submit to worker pool        cstart = (chunksize * i for i in range(num_of_chunks))        cstop = (chunksize * i if i != num_of_chunks else nmax                 for i in range(1, num_of_chunks + 1))        futures = executor.map(_findmatch, cstart, cstop,                               itertools.repeat(number))        # 2.2. Consolidate result as a list and return this list.        for future in futures:            #print('type(future)=',type(future))            for f in future:                if f:                    try:                        found.append(f)                    except:                        print_exc()        foundsize = len(found)        end = time() - start        print('\n within statement of def _concurrent(nmax, number):')        print("found {0} in {1:.4f}sec".format(foundsize, end))    return foundif __name__ == '__main__':    nmax = int(1E8) # Number range maximum.    number = str(5) # Number to be found in number range.    workers = 4     # Pool of workers    chunks_vs_workers = 14 # A factor of =>14 can provide optimum performance      num_of_chunks = chunks_vs_workers * workers    start = time()    a = _concurrent_map(nmax, number, workers, num_of_chunks)    end = time() - start    print('\n main')    print('nmax={}, workers={}, num_of_chunks={}'.format(          nmax, workers, num_of_chunks))    print('workers = ', workers)    print("found {0} in {1:.4f}sec".format(len(a),end))

=======================================================================

Part 2: Total compute time from using ProcessPoolExecutor subclasses .submit() and .map() can be dissimilar when returning a sorted/ordered result list.

Background: I have amended both the .submit() and .map() codes to allow an "apple-to-apple" comparison of their compute time and the ability to visualize the compute time of the main code, the compute time of the _concurrent method called by the main code to performs the concurrent operations, and the compute time for each discretized task/worker called by the _concurrent method. Furthermore, the concurrent method in these codes was structured to return an unordered and ordered list of the result directly from the future object of .submit() and the iterator of .map(). Source code is provided below (Hope it helps you.).

Experiments These two newly improved codes were used to perform the same experiment described in Part 1, save that only 6 pool workers were considered and the python built-in list and sorted methods were used to return an unordered and ordered list of the results to the main section of the code, respectively.

Findings:.submit vs .map plus list vs sorted

  1. From the _concurrent method's result, we can see the compute times of the _concurrent method used to create all the Future objects of ProcessPoolExecutor.submit(), and to create the iterator of ProcessPoolExecutor.map(), as a function of the number of discretized task over the number of pool workers, are equivalent. This result simply means that the ProcessPoolExecutor sub-classes .submit() and .map() are equally efficient/fast.
  2. Comparing the compute times from main and it's _concurrent method, we can see that main ran longer than it's _concurrent method. This is to be expected as their time difference reflects the amount of compute times of the list and sorted methods (and that of the other methods encased within these methods). Clearly seen, the list method took less compute time to return a result list than the sorted method. The average compute times of the list method for both the .submit() and .map() codes were similar, at ~0.47sec. The average compute time of the sorted method for the .submit() and .map() codes was 1.23sec and 1.01sec, respectively. In other words, the list method performed 2.62 times and 2.15 times faster than sorted method for the .submit() and .map() codes, respectively.
  3. It is not clear why the sorted method generated an ordered list from.map() faster than from .submit(), as the number of discretizedtasks increased more than the number of pool workers, save when thenumber of discretized tasks equaled the number of pool workers.That said, these findings shows that the decision to use the equally fast .submit() or .map() sub-classes can be encumbered by the sorted method. For example, if the intent is to generate an ordered list in the shortest time possible, the use of ProcessPoolExecutor.map() should be preferred over ProcessPoolExecutor.submit() as .map() can allow the shortest total compute time.
  4. The discretization scheme mentioned in Part 1 of my answer is shown here to speed-up the performance of both the .submit() and .map() sub-classes. The amount of speed-up can be as much as 20% over the case when the number of discretized tasks equaled the number of pool workers.

Improved .map() code

#!/usr/bin/python3.5# -*- coding: utf-8 -*-import concurrent.futures as cffrom time import timefrom itertools import repeat, chain def _findmatch(nmin, nmax, number):    '''Function to find the occurence of number in range nmin to nmax and return       the found occurences in a list.'''    start = time()    match=[]    for n in range(nmin, nmax):        if number in str(n):            match.append(n)    end = time() - start    #print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".    #      format(nmin, nmax, number, len(match),end))    return matchdef _concurrent(nmax, number, workers, num_of_chunks):    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to       find the occurrences of a given number in a number range in a concurrent       manner.'''    # 1. Local variables    start = time()    chunksize = nmax // num_of_chunks    #2. Parallelization    with cf.ProcessPoolExecutor(max_workers=workers) as executor:        # 2.1. Discretise workload and submit to worker pool        cstart = (chunksize * i for i in range(num_of_chunks))        cstop = (chunksize * i if i != num_of_chunks else nmax                 for i in range(1, num_of_chunks + 1))        futures = executor.map(_findmatch, cstart, cstop, repeat(number))    end = time() - start    print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')    print("found in {0:.4f}sec".format(end))    return list(chain.from_iterable(futures)) #Return an unordered result list    #return sorted(chain.from_iterable(futures)) #Return an ordered result listif __name__ == '__main__':    nmax = int(1E8) # Number range maximum.    number = str(5) # Number to be found in number range.    workers = 6     # Pool of workers    chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance     num_of_chunks = chunks_vs_workers * workers    start = time()    found = _concurrent(nmax, number, workers, num_of_chunks)    end = time() - start    print('\n main')    print('nmax={}, workers={}, num_of_chunks={}'.format(          nmax, workers, num_of_chunks))    #print('found = ', found)    print("found {0} in {1:.4f}sec".format(len(found),end))    

Improved .submit() code.
This code is same as .map code except you replace the _concurrent method with the following:

def _concurrent(nmax, number, workers, num_of_chunks):    '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to       find the occurrences of a given number in a number range in a concurrent       manner.'''    # 1. Local variables    start = time()    chunksize = nmax // num_of_chunks    futures = []    #2. Parallelization    with cf.ProcessPoolExecutor(max_workers=workers) as executor:        # 2.1. Discretise workload and submit to worker pool        for i in range(num_of_chunks):            cstart = chunksize * i            cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax            futures.append(executor.submit(_findmatch, cstart, cstop, number))    end = time() - start    print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')    print("found in {0:.4f}sec".format(end))    return list(chain.from_iterable(f.result() for f in cf.as_completed(        futures))) #Return an unordered list    #return list(chain.from_iterable(f.result() for f in cf.as_completed(    #    futures))) #Return an ordered list

=======================================================================


You're comparing apples to oranges here. When using map you produce all the 1E8 numbers and transfer them to worker processes. This takes a lot of time compared to actual execution. When using submit you just create 6 sets of parameters that get transferred.

If you change map to operate with the same principle you'll get numbers that are close to each other:

def _findmatch(nmin, nmax, number):    '''Function to find the occurrence of number in range nmin to nmax and return       the found occurrences in a list.'''    print('\n def _findmatch', nmin, nmax, number)    start = time()    match=[]    for n in range(nmin, nmax):        if number in str(n):            match.append(n)    end = time() - start    print("found {0} in {1:.4f}sec".format(len(match),end))    return matchdef _concurrent_map(nmax, number, workers):    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to       find the occurrences of a given number in a number range in a parallelised       manner.'''    # 1. Local variables    start = time()    chunk = nmax // workers    futures = []    found =[]    #2. Parallelization    with cf.ProcessPoolExecutor(max_workers=workers) as executor:        # 2.1. Discretise workload and submit to worker pool        cstart = (chunk * i for i in range(workers))        cstop = (chunk * i if i != workers else nmax for i in range(1, workers + 1))        futures = executor.map(_findmatch, cstart, cstop, itertools.repeat(number))        # 2.3. Consolidate result as a list and return this list.        for future in futures:            for f in future:                try:                    found.append(f)                except:                    print_exc()        foundsize = len(found)        end = time() - start        print('within statement of def _concurrent(nmax, number):')        print("found {0} in {1:.4f}sec".format(foundsize, end))    return found

You could improve the performance of submit by using as_completed correctly. For given iterable of futures it will return an iterator that will yield futures in the order they complete.

You could also skip the copying of the data to another array and use itertools.chain.from_iterable to combine the results from futures to single iterable:

import concurrent.futures as cfimport itertoolsfrom time import timefrom traceback import print_excfrom itertools import chaindef _findmatch(nmin, nmax, number):    '''Function to find the occurrence of number in range nmin to nmax and return       the found occurrences in a list.'''    print('\n def _findmatch', nmin, nmax, number)    start = time()    match=[]    for n in range(nmin, nmax):        if number in str(n):            match.append(n)    end = time() - start    print("found {0} in {1:.4f}sec".format(len(match),end))    return matchdef _concurrent_map(nmax, number, workers):    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to       find the occurrences of a given number in a number range in a parallelised       manner.'''    # 1. Local variables    chunk = nmax // workers    futures = []    found =[]    #2. Parallelization    with cf.ProcessPoolExecutor(max_workers=workers) as executor:        # 2.1. Discretise workload and submit to worker pool        for i in range(workers):            cstart = chunk * i            cstop = chunk * (i + 1) if i != workers - 1 else nmax            futures.append(executor.submit(_findmatch, cstart, cstop, number))    return chain.from_iterable(f.result() for f in cf.as_completed(futures))if __name__ == '__main__':    nmax = int(1E8) # Number range maximum.    number = str(5) # Number to be found in number range.    workers = 6     # Pool of workers    start = time()    a = _concurrent_map(nmax, number, workers)    end = time() - start    print('\n main')    print('workers = ', workers)    print("found {0} in {1:.4f}sec".format(sum(1 for x in a),end))