Python: concurrent.futures How to make it cancelable? Python: concurrent.futures How to make it cancelable? python python

Python: concurrent.futures How to make it cancelable?


I don't know why concurrent.futures.Future does not have a .kill() method, but you can accomplish what you want by shutting down the process pool with pool.shutdown(wait=False), and killing the remaining child processes by hand.

Create a function for killing child processes:

import signal, psutildef kill_child_processes(parent_pid, sig=signal.SIGTERM):    try:        parent = psutil.Process(parent_pid)    except psutil.NoSuchProcess:        return    children = parent.children(recursive=True)    for process in children:        process.send_signal(sig)

Run your code until you get the first result, then kill all remaining child processes:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait# function that profits from partitioned search spacedef m_run(partition):    for elem in partition:        if elem == 135135515:            return elem    return Falsefutures = []# used to create the partitionssteps = 100000000pool = ProcessPoolExecutor(max_workers=4)for i in range(4):    # run 4 tasks with a partition, but only *one* solution is needed    partition = range(i*steps,(i+1)*steps)    futures.append(pool.submit(m_run, partition))done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED)# Shut down poolpool.shutdown(wait=False)# Kill remaining child processeskill_child_processes(os.getpid())


Unfortunately, running Futures cannot be cancelled. I believe the core reason is to ensure the same API over different implementations (it's not possible to interrupt running threads or coroutines).

The Pebble library was designed to overcome this and other limitations.

from pebble import ProcessPooldef function(foo, bar=0):    return foo + barwith ProcessPool() as pool:    future = pool.schedule(function, args=[1])    # if running, the container process will be terminated     # a new process will be started consuming the next task    future.cancel()  


I found your question interesting so here's my finding.

I found the behaviour of .cancel() method is as stated in python documentation. As for your running concurrent functions, unfortunately they could not be cancelled even after they were told to do so. If my finding is correct, then I reason that Python does require a more effective .cancel() method.

Run the code below to check my finding.

from concurrent.futures import ProcessPoolExecutor, as_completedfrom time import time # function that profits from partitioned search spacedef m_run(partition):    for elem in partition:        if elem == 3351355150:            return elem            break #Added to terminate loop once found    return Falsestart = time()futures = []# used to create the partitionssteps = 1000000000with ProcessPoolExecutor(max_workers=4) as pool:    for i in range(4):        # run 4 tasks with a partition, but only *one* solution is needed        partition = range(i*steps,(i+1)*steps)        futures.append(pool.submit(m_run, partition))    ### New Code: Start ###     for f in as_completed(futures):        print(f.result())        if f.result():            print('break')            break    for f in futures:        print(f, 'running?',f.running())        if f.running():            f.cancel()            print('Cancelled? ',f.cancelled())    print('New Instruction Ended at = ', time()-start )print('Total Compute Time = ', time()-start )

Update:It is possible to forcefully terminate the concurrent processes via bash, but the consequence is that the main python program will terminate too. If this isn't an issue with you, then try the below code.

You have to add the below codes between the last 2 print statements to see this for yourself. Note: This code works only if you aren't running any other python3 program.

import subprocess, os, signal result = subprocess.run(['ps', '-C', 'python3', '-o', 'pid='],                        stdout=subprocess.PIPE).stdout.decode('utf-8').split()print ('result =', result)for i in result:    print('PID = ', i)    if i != result[0]:        os.kill(int(i), signal.SIGKILL)        try:            os.kill(int(i), 0)           raise Exception("""wasn't able to kill the process                               HINT:use signal.SIGKILL or signal.SIGABORT""")        except OSError as ex:           continue