How would I go about using concurrent.futures and queues for a real-time scenario? How would I go about using concurrent.futures and queues for a real-time scenario? multithreading multithreading

How would I go about using concurrent.futures and queues for a real-time scenario?


The example from the Python docs, expanded to take its work from a queue. A change to note, is that this code uses concurrent.futures.wait instead of concurrent.futures.as_completed to allow new work to be started while waiting for other work to complete.

import concurrent.futuresimport urllib.requestimport timeimport queueq = queue.Queue()URLS = ['http://www.foxnews.com/',        'http://www.cnn.com/',        'http://europe.wsj.com/',        'http://www.bbc.co.uk/',        'http://some-made-up-domain.com/']def feed_the_workers(spacing):    """ Simulate outside actors sending in work to do, request each url twice """    for url in URLS + URLS:        time.sleep(spacing)        q.put(url)    return "DONE FEEDING"def load_url(url, timeout):    """ Retrieve a single page and report the URL and contents """    with urllib.request.urlopen(url, timeout=timeout) as conn:        return conn.read()# We can use a with statement to ensure threads are cleaned up promptlywith concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:    # start a future for a thread which sends work in through the queue    future_to_url = {        executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'}    while future_to_url:        # check for status of the futures which are currently working        done, not_done = concurrent.futures.wait(            future_to_url, timeout=0.25,            return_when=concurrent.futures.FIRST_COMPLETED)        # if there is incoming work, start a new future        while not q.empty():            # fetch a url from the queue            url = q.get()            # Start the load operation and mark the future with its URL            future_to_url[executor.submit(load_url, url, 60)] = url        # process any completed futures        for future in done:            url = future_to_url[future]            try:                data = future.result()            except Exception as exc:                print('%r generated an exception: %s' % (url, exc))            else:                if url == 'FEEDER DONE':                    print(data)                else:                    print('%r page is %d bytes' % (url, len(data)))            # remove the now completed future            del future_to_url[future]

Output from fetching each url twice:

'http://www.foxnews.com/' page is 67574 bytes'http://www.cnn.com/' page is 136975 bytes'http://www.bbc.co.uk/' page is 193780 bytes'http://some-made-up-domain.com/' page is 896 bytes'http://www.foxnews.com/' page is 67574 bytes'http://www.cnn.com/' page is 136975 bytesDONE FEEDING'http://www.bbc.co.uk/' page is 193605 bytes'http://some-made-up-domain.com/' page is 896 bytes'http://europe.wsj.com/' page is 874649 bytes'http://europe.wsj.com/' page is 874649 bytes


At work I found a situation where I wanted to do parallel work on an unbounded stream of data. I created a small library inspired by the excellent answer already provided by Stephen Rauch.

I originally approached this problem by thinking about two separate threads, one that submits work to a queue and one that monitors the queue for any completed tasks and makes more room for new work to come in. This is similar to what Stephen Rauch proposed, where he consumes the stream using a feed_the_workers function that runs in a separate thread.

Talking to one of my colleagues, he helped me realize that you can get away with doing everything in a single thread if you define a buffered iterator that allows you to control how many elements are let out of the input stream every time you are ready to submit more work to the thread pool.

So we introduce the BufferedIter class

class BufferedIter(object):    def __init__(self, iterator):        self.iter = iterator    def nextN(self, n):        vals = []        for _ in range(n):            vals.append(next(self.iter))        return vals

which allows us to define the stream processor in the following way

import loggingimport queueimport signalimport sysimport timefrom concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETEDlevel = logging.DEBUGlog = logging.getLogger(__name__)handler = logging.StreamHandler(sys.stdout)handler.setFormatter(logging.Formatter('%(asctime)s %(message)s'))handler.setLevel(level)log.addHandler(handler)log.setLevel(level)WAIT_SLEEP = 1  # second, adjust this based on the timescale of your tasksdef stream_processor(input_stream, task, num_workers):    # Use a queue to signal shutdown.    shutting_down = queue.Queue()    def shutdown(signum, frame):        log.warning('Caught signal %d, shutting down gracefully ...' % signum)        # Put an item in the shutting down queue to signal shutdown.        shutting_down.put(None)    # Register the signal handler    signal.signal(signal.SIGTERM, shutdown)    signal.signal(signal.SIGINT, shutdown)    def is_shutting_down():        return not shutting_down.empty()    futures = dict()    buffer = BufferedIter(input_stream)    with ThreadPoolExecutor(num_workers) as executor:        num_success = 0        num_failure = 0        while True:            idle_workers = num_workers - len(futures)            if not is_shutting_down():                items = buffer.nextN(idle_workers)                for data in items:                    futures[executor.submit(task, data)] = data            done, _ = wait(futures, timeout=WAIT_SLEEP, return_when=ALL_COMPLETED)            for f in done:                data = futures[f]                try:                    f.result(timeout=0)                except Exception as exc:                    log.error('future encountered an exception: %r, %s' % (data, exc))                    num_failure += 1                else:                    log.info('future finished successfully: %r' % data)                    num_success += 1                del futures[f]            if is_shutting_down() and len(futures) == 0:                break        log.info("num_success=%d, num_failure=%d" % (num_success, num_failure))

Below we show an example for how to use the stream processor

import itertoolsdef integers():    """Simulate an infinite stream of work."""    for i in itertools.count():        yield idef task(x):    """The task we would like to perform in parallel.    With some delay to simulate a time consuming job.    With a baked in exception to simulate errors.    """    time.sleep(3)    if x == 4:        raise ValueError('bad luck')    return x * xstream_processor(integers(), task, num_workers=3)

The output for this example is shown below

2019-01-15 22:34:40,193 future finished successfully: 12019-01-15 22:34:40,193 future finished successfully: 02019-01-15 22:34:40,193 future finished successfully: 22019-01-15 22:34:43,201 future finished successfully: 52019-01-15 22:34:43,201 future encountered an exception: 4, bad luck2019-01-15 22:34:43,202 future finished successfully: 32019-01-15 22:34:46,208 future finished successfully: 62019-01-15 22:34:46,209 future finished successfully: 72019-01-15 22:34:46,209 future finished successfully: 82019-01-15 22:34:49,215 future finished successfully: 112019-01-15 22:34:49,215 future finished successfully: 102019-01-15 22:34:49,215 future finished successfully: 9^C <=== THIS IS WHEN I HIT Ctrl-C2019-01-15 22:34:50,648 Caught signal 2, shutting down gracefully ...2019-01-15 22:34:52,221 future finished successfully: 132019-01-15 22:34:52,222 future finished successfully: 142019-01-15 22:34:52,222 future finished successfully: 122019-01-15 22:34:52,222 num_success=14, num_failure=1


I really liked the interesting approach by @pedro above. However, when processing thousands of files, I noticed that at the end a StopIteration would be thrown and some files would always be skipped. I had to make a little modification to as follows. Very useful answer again.

class BufferedIter(object):    def __init__(self, iterator):        self.iter = iterator    def nextN(self, n):        vals = []        try:            for _ in range(n):                vals.append(next(self.iter))            return vals, False        except StopIteration as e:            return vals, True

-- Call as follows

...if not is_shutting_down():   items, is_finished = buffer.nextN(idle_workers)   if is_finished:        stop()...

-- Where stop is a function that simply tells to shutdown

def stop():    shutting_down.put(None)