Checking up on a `concurrent.futures.ThreadPoolExecutor`
There is some visibility into the Pool, and the pending workitem queue. To find out what's available, print poolx.__dict__
to see the structure. Read the ThreadPool code, it's pretty good: concurrent.futures.thread
The following creates a pool with one thread. It then creates two jobs: one sleeps for 3 seconds, the other immediately returns. The pool's number of pending work items is then printed.
Following that, we print out items from the work queue. In this case, a thread is already executing the time.sleep(3)
function, so that's not in the queue. The function sleep
with args [0]
and kwargs {}
is printed, because that's the next work item for the pool to run.
Kudos to @dano for the nondestructive queue insight, and @abarnert.
source
import concurrent.futures, timepoolx = concurrent.futures.ThreadPoolExecutor(max_workers=1)poolx.submit(time.sleep, 3)poolx.submit(time.sleep, 0) # very fastprint('pending:', poolx._work_queue.qsize(), 'jobs')print('threads:', len(poolx._threads))print()# TODO: make thread safe; work on copy of queue?print('Estimated Pending Work Queue:')for num,item in enumerate(poolx._work_queue.queue): print('{}\t{}\t{}\t{}'.format( num+1, item.fn, item.args, item.kwargs, ))poolx.shutdown(wait=False)
output
pending: 1 jobsthreads: 1Pending Work Queue:1 <built-in function sleep> (0,) {}
Nor very clean and reliable way to find pending
futures, but I do it like this:
if 'state=pending' in str(future): logger.debug('PENDING')elif future.running(): logger.debug('RUNNING')elif future.cancelled(): logger.debug('CANCELLED')elif future.exception(): logger.debug('EXCEPTION')elif future.done(): logger.debug('DONE')