Python: concurrent.futures How to make it cancelable?
I don't know why concurrent.futures.Future
does not have a .kill()
method, but you can accomplish what you want by shutting down the process pool with pool.shutdown(wait=False)
, and killing the remaining child processes by hand.
Create a function for killing child processes:
import signal, psutildef kill_child_processes(parent_pid, sig=signal.SIGTERM): try: parent = psutil.Process(parent_pid) except psutil.NoSuchProcess: return children = parent.children(recursive=True) for process in children: process.send_signal(sig)
Run your code until you get the first result, then kill all remaining child processes:
from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait# function that profits from partitioned search spacedef m_run(partition): for elem in partition: if elem == 135135515: return elem return Falsefutures = []# used to create the partitionssteps = 100000000pool = ProcessPoolExecutor(max_workers=4)for i in range(4): # run 4 tasks with a partition, but only *one* solution is needed partition = range(i*steps,(i+1)*steps) futures.append(pool.submit(m_run, partition))done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED)# Shut down poolpool.shutdown(wait=False)# Kill remaining child processeskill_child_processes(os.getpid())
Unfortunately, running Futures
cannot be cancelled. I believe the core reason is to ensure the same API over different implementations (it's not possible to interrupt running threads or coroutines).
The Pebble library was designed to overcome this and other limitations.
from pebble import ProcessPooldef function(foo, bar=0): return foo + barwith ProcessPool() as pool: future = pool.schedule(function, args=[1]) # if running, the container process will be terminated # a new process will be started consuming the next task future.cancel()
I found your question interesting so here's my finding.
I found the behaviour of .cancel()
method is as stated in python documentation. As for your running concurrent functions, unfortunately they could not be cancelled even after they were told to do so. If my finding is correct, then I reason that Python does require a more effective .cancel() method.
Run the code below to check my finding.
from concurrent.futures import ProcessPoolExecutor, as_completedfrom time import time # function that profits from partitioned search spacedef m_run(partition): for elem in partition: if elem == 3351355150: return elem break #Added to terminate loop once found return Falsestart = time()futures = []# used to create the partitionssteps = 1000000000with ProcessPoolExecutor(max_workers=4) as pool: for i in range(4): # run 4 tasks with a partition, but only *one* solution is needed partition = range(i*steps,(i+1)*steps) futures.append(pool.submit(m_run, partition)) ### New Code: Start ### for f in as_completed(futures): print(f.result()) if f.result(): print('break') break for f in futures: print(f, 'running?',f.running()) if f.running(): f.cancel() print('Cancelled? ',f.cancelled()) print('New Instruction Ended at = ', time()-start )print('Total Compute Time = ', time()-start )
Update:It is possible to forcefully terminate the concurrent processes via bash, but the consequence is that the main python program will terminate too. If this isn't an issue with you, then try the below code.
You have to add the below codes between the last 2 print statements to see this for yourself. Note: This code works only if you aren't running any other python3 program.
import subprocess, os, signal result = subprocess.run(['ps', '-C', 'python3', '-o', 'pid='], stdout=subprocess.PIPE).stdout.decode('utf-8').split()print ('result =', result)for i in result: print('PID = ', i) if i != result[0]: os.kill(int(i), signal.SIGKILL) try: os.kill(int(i), 0) raise Exception("""wasn't able to kill the process HINT:use signal.SIGKILL or signal.SIGABORT""") except OSError as ex: continue