python multiprocessing/threading cleanup python multiprocessing/threading cleanup multithreading multithreading

python multiprocessing/threading cleanup


I think you're corrupting your multiprocessing.Queue by calling p.terminate() on on the child process. The docs have a warning about this:

Warning: If this method is used when the associated process is using a pipe or queue then the pipe or queue is liable to become corrupted and may become unusable by other process. Similarly, if the process has acquired a lock or semaphore etc. then terminating it is liable to cause other processes to deadlock.

In some cases, it looks like p is terminating before your MediatorSender._loop method can consume the sentinel you loaded into it to let it know that it should exit.

Also, you're installing a signal handler that expects to work in the main process only, but the SIGINT is actually received by both the parent and the child processes, which means signal_handler gets called in both processes, could result in ms.stop getting called twice, due to a race condition in the way you handle setting ms.running to False

I would recommend just exploiting that both processes receive the SIGINT, and have both the parent and child handle KeyboardInterrupt directly. That way, each then have each shut themselves down cleanly, rather than have the parent terminate the child. The following code demonstrates that, and in my testing never hung. I've simplified your code in a few places, but functionally it's exactly the same:

#!/usr/bin/env python# -*- coding: utf-8 -*-import loggingfrom multiprocessing import Process, Queuefrom threading import Threadfrom time import sleeplogger = logging.getLogger("mepy-client")class SocketClientProtocol(object):    def __init__(self, q_in, q_out, q_binary):        self.q_in = q_in        self.q_out = q_out        self.q_binary = q_binary        t = Thread(target=self._loop)        t.start()        t = Thread(target=self._loop_binary)        t.start()    def _loop(self):        print("start of loop 2")        for res in iter(self.q_in.get, None):            self._handle_msg(res)        print("end of loop 2")    def _loop_binary(self):        print("start of loop 3")        for res in iter(self.q_binary.get, None):            self._handle_binary(res)        print("end of loop 3")    def _handle_msg(self, msg):        msg_type = msg[0]        if msg_type == "stop2":            self.q_in.put(None)            self.q_binary.put(None)    def _put_msg(self, msg):        self.q_out.put(msg)    def stop(self):        print("STOP RECEIVED")        self.q_in.put(None)        self.q_binary.put(None)    def _handle_binary(self, data):        pass    def handle_element(self):        self._put_msg(["something"])def run_twisted(q_in, q_out, q_binary):    s = SocketClientProtocol(q_in, q_out, q_binary)    try:        while True:            sleep(2)            s.handle_element()    except KeyboardInterrupt:        s.stop()class MediatorSender(object):    def __init__(self):        self.q_in = None        self.q_out = None        self.q_binary = None        self.p = None        self.running = False    def start(self):        if self.running:            return        self.running = True        self.q_in = Queue()        self.q_out = Queue()        self.q_binary = Queue()        print("!!!!START")        self.p = Process(target=run_twisted,                          args=(self.q_in, self.q_out, self.q_binary))        self.p.start()        self.loop = Thread(target=self._loop)        self.loop.start()    def stop(self):        print("!!!!STOP")        if not self.running:            return        print("STOP2")        self.running = False        self.q_out.put(None)    def _loop(self):        print("start of loop 1")        for res in iter(self.q_out.get, None):            self._handle_msg(res)        print("end of loop 1")    def _handle_msg(self, msg):        self._put_msg(msg)    def _put_msg(self, msg):        self.q_in.put(msg)    def _put_binary(self, msg):        self.q_binary.put(msg)    def send_chunk(self, chunk):        self._put_binary(chunk)if __name__ == "__main__":    ms = MediatorSender()    try:        ms.start()        for i in range(100):            ms.send_chunk("some chunk of data")        # You actually have to join w/ a timeout in a loop on         # Python 2.7. If you just call join(), SIGINT won't be         # received by the main process, and the program will         # hang. This is a bug, and is fixed in Python 3.x.        while True:            ms.loop.join()      except KeyboardInterrupt:        ms.stop()

Edit:

If you prefer to use a signal handler rather than catching KeyboardInterrupt, you just need to make sure the child process uses its own signal handler, rather than inheriting the parent's:

#!/usr/bin/env python# -*- coding: utf-8 -*-import signalimport loggingfrom functools import partialfrom multiprocessing import Process, Queuefrom threading import Threadfrom time import sleeplogger = logging.getLogger("mepy-client")class SocketClientProtocol(object):    def __init__(self, q_in, q_out, q_binary):        self.q_in = q_in        self.q_out = q_out        self.q_binary = q_binary        self.running = True        t = Thread(target=self._loop)        t.start()        t = Thread(target=self._loop_binary)        t.start()    def _loop(self):        print("start of loop 2")        for res in iter(self.q_in.get, None):            self._handle_msg(res)        print("end of loop 2")    def _loop_binary(self):        print("start of loop 3")        for res in iter(self.q_binary.get, None):            self._handle_binary(res)        print("end of loop 3")    def _handle_msg(self, msg):        msg_type = msg[0]        if msg_type == "stop2":            self.q_in.put(None)            self.q_binary.put(None)    def _put_msg(self, msg):        self.q_out.put(msg)    def stop(self):        print("STOP RECEIVED")        self.running = False        self.q_in.put(None)        self.q_binary.put(None)    def _handle_binary(self, data):        pass    def handle_element(self):        self._put_msg(["something"])def run_twisted(q_in, q_out, q_binary):    s = SocketClientProtocol(q_in, q_out, q_binary)    signal.signal(signal.SIGINT, partial(signal_handler_child, s))    while s.running:        sleep(2)        s.handle_element()class MediatorSender(object):    def __init__(self):        self.q_in = None        self.q_out = None        self.q_binary = None        self.p = None        self.running = False    def start(self):        if self.running:            return        self.running = True        self.q_in = Queue()        self.q_out = Queue()        self.q_binary = Queue()        print("!!!!START")        self.p = Process(target=run_twisted,                          args=(self.q_in, self.q_out, self.q_binary))        self.p.start()        self.loop = Thread(target=self._loop)        self.loop.start()    def stop(self):        print("!!!!STOP")        if not self.running:            return        print("STOP2")        self.running = False        self.q_out.put(None)    def _loop(self):        print("start of loop 1")        for res in iter(self.q_out.get, None):            self._handle_msg(res)        print("end of loop 1")    def _handle_msg(self, msg):        self._put_msg(msg)    def _put_msg(self, msg):        self.q_in.put(msg)    def _put_binary(self, msg):        self.q_binary.put(msg)    def send_chunk(self, chunk):        self._put_binary(chunk)def signal_handler_main(ms, *args):    ms.stop()def signal_handler_child(s, *args):    s.stop()if __name__ == "__main__":    ms = MediatorSender()    signal.signal(signal.SIGINT, partial(signal_handler_main, ms))    ms.start()    for i in range(100):        ms.send_chunk("some chunk of data")    while ms.loop.is_alive():        ms.loop.join(9999999)      print('done main')


Maybe you should try to capture SIGINT signal, which is generated by Ctrl + C using signal.signal like this:

#!/usr/bin/env pythonimport signalimport sysdef signal_handler(signal, frame):        print('You pressed Ctrl+C!')        sys.exit(0)signal.signal(signal.SIGINT, signal_handler)print('Press Ctrl+C')signal.pause()

Code stolen from here


This usually works for me if I am using the threading module. It will not work if you use the multiprocessing one though. If you are running the script from the terminal try running it in the background, like this.

python scriptFoo.py &

After you run the process it will output the PID like this

[1] 23107

Whenever you need to quit the script you just type kill and the script PID like this.

kill 23107

Hit enter again and it should kill all the subprocesses and output this.

[1]+  Terminated              python scriptFoo.py

As far as I know you cannot kill all the subprocesses with 'Ctrl+C'