Python's `concurrent.futures`: Iterate on futures according to order of completion Python's `concurrent.futures`: Iterate on futures according to order of completion multithreading multithreading

Python's `concurrent.futures`: Iterate on futures according to order of completion


executor.map(), like the builtin map(), only returns results in the order of the iterable, so unfortunately you can't use it to determine the order of completion. concurrent.futures.as_completed() is what you're looking for - here's an example:

import timeimport concurrent.futurestimes = [3, 1, 2]def sleeper(secs):    time.sleep(secs)    print('I slept for {} seconds'.format(secs))    return secs# returns in the order givenwith concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:    print(list(executor.map(sleeper, times)))# I slept for 1 seconds# I slept for 2 seconds# I slept for 3 seconds# [3, 1, 2]# returns in the order completedwith concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:    futs = [executor.submit(sleeper, secs) for secs in times]    print([fut.result() for fut in concurrent.futures.as_completed(futs)])# I slept for 1 seconds# I slept for 2 seconds# I slept for 3 seconds# [1, 2, 3]

Of course if you are required to use a map interface, you could create your own map_as_completed() function which encapsulates the above (maybe add it to a subclassed Executor()), but I think creating futures instances through executor.submit() is a simpler/cleaner way to go (also allows you to provide no-args, kwargs).


concurrent futures returns an iterator based on time of completion -- this sounds like it's exactly what you were looking for.

http://docs.python.org/dev/library/concurrent.futures.html#concurrent.futures.as_completed

Please let me know if you have any confusion or difficulty wrt implementation.


From python doc

concurrent.futures.as_completed(fs, timeout=None

Returns an iterator over the Future instances (possibly created by different Executor instances) given by fs that yields futures as they complete (finished or were cancelled). Any futures that completed before as_completed() is called will be yielded first. The returned iterator raises a TimeoutError if next() is called and the result isn’t available after timeout seconds from the original call to as_completed(). timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

You would need to understand difference between executor.map() and executor.submit(). The first one maps a function to a vector of arguments. It is pretty similar to map, but launch tasks asynchronously. submit(func, arg) launches one task at each call. In this task, func is applied to arg.

Here is an example for using as_completed() with submit() that I could run on python 3.0

from concurrent import futuresimport urllib.requestURLS = ['http://www.foxnews.com/',        'http://www.cnn.com/',        'http://europe.wsj.com/',        'http://www.bbc.co.uk/',        'http://some-made-up-domain.com/']def load_url(url, timeout):    return urllib.request.urlopen(url, timeout=timeout).read()def main():    with futures.ThreadPoolExecutor(max_workers=5) as executor:        future_to_url = dict(            (executor.submit(load_url, url, 60), url)             for url in URLS)        for future in futures.as_completed(future_to_url):            url = future_to_url[future]            try:                print('%r page is %d bytes' % (                          url, len(future.result())))            except Exception as e:                print('%r generated an exception: %s' % (                          url, e))if __name__ == '__main__':    main()

no map() is used here, tasks are run with submit and as_completed()

returns an iterator over the Future instances given by fs that yields futures as they complete (finished or were cancelled).