Python Multiprocessing: Handling Child Errors in Parent Python Multiprocessing: Handling Child Errors in Parent python python

Python Multiprocessing: Handling Child Errors in Parent


Why not to let the Process to take care of its own exceptions, like this:

from __future__ import print_functionimport multiprocessing as mpimport tracebackclass Process(mp.Process):    def __init__(self, *args, **kwargs):        mp.Process.__init__(self, *args, **kwargs)        self._pconn, self._cconn = mp.Pipe()        self._exception = None    def run(self):        try:            mp.Process.run(self)            self._cconn.send(None)        except Exception as e:            tb = traceback.format_exc()            self._cconn.send((e, tb))            # raise e  # You can still rise this exception if you need to    @property    def exception(self):        if self._pconn.poll():            self._exception = self._pconn.recv()        return self._exception

Now you have, both error and traceback at your hands:

def target():    raise ValueError('Something went wrong...')p = Process(target = target)p.start()p.join()if p.exception:    error, traceback = p.exception    print(traceback)

Regards,Marek


I don't know standard practice but what I've found is that to have reliable multiprocessing I design the methods/class/etc. specifically to work with multiprocessing. Otherwise you never really know what's going on on the other side (unless I've missed some mechanism for this).

Specifically what I do is:

  • Subclass multiprocessing.Process or make functions that specifically support multiprocessing (wrapping functions that you don't have control over if necessary)
  • always provide a shared error multiprocessing.Queue from the main process to each worker process
  • enclose the entire run code in a try: ... except Exception as e. Then when something unexpected happens send an error package with:
    • the process id that died
    • the exception with it's original context (check here). The original context is really important if you want to log useful information in the main process.
  • of course handle expected issues as normal within the normal operation of the worker
  • (similar to what you said already) assuming a long-running process, wrap the running code (inside the try/catch-all) with a loop
    • define a stop token in the class or for functions.
    • When the main process wants the worker(s) to stop, just send the stop token. to stop everyone, send enough for all the processes.
    • the wrapping loop checks the input q for the token or whatever other input you want

The end result is worker processes that can survive for a long time and that can let you know what's happening when something goes wrong. They will die quietly since you can handle whatever you need to do after the catch-all exception and you will also know when you need to restart a worker.

Again, I've just come to this pattern through trial and error so I don't know how standard it is. Does that help with what you are asking for?


@mrkwjc 's solution is simple, so easy to understand and implement, but there is one disadvantage of this solution. When we have few processes and we want to stop all processes if any single process has error, we need to wait until all processes are finished in order to check if p.exception. Below is the code which fixes this problem (ie when one child has error, we terminate also another child):

import multiprocessingimport tracebackfrom time import sleepclass Process(multiprocessing.Process):    """    Class which returns child Exceptions to Parent.    https://stackoverflow.com/a/33599967/4992248    """    def __init__(self, *args, **kwargs):        multiprocessing.Process.__init__(self, *args, **kwargs)        self._parent_conn, self._child_conn = multiprocessing.Pipe()        self._exception = None    def run(self):        try:            multiprocessing.Process.run(self)            self._child_conn.send(None)        except Exception as e:            tb = traceback.format_exc()            self._child_conn.send((e, tb))            # raise e  # You can still rise this exception if you need to    @property    def exception(self):        if self._parent_conn.poll():            self._exception = self._parent_conn.recv()        return self._exceptionclass Task_1:    def do_something(self, queue):        queue.put(dict(users=2))class Task_2:    def do_something(self, queue):        queue.put(dict(users=5))def main():    try:        task_1 = Task_1()        task_2 = Task_2()        # Example of multiprocessing which is used:        # https://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/        task_1_queue = multiprocessing.Queue()        task_2_queue = multiprocessing.Queue()        task_1_process = Process(            target=task_1.do_something,            kwargs=dict(queue=task_1_queue))        task_2_process = Process(            target=task_2.do_something,            kwargs=dict(queue=task_2_queue))        task_1_process.start()        task_2_process.start()        while task_1_process.is_alive() or task_2_process.is_alive():            sleep(10)            if task_1_process.exception:                error, task_1_traceback = task_1_process.exception                # Do not wait until task_2 is finished                task_2_process.terminate()                raise ChildProcessError(task_1_traceback)            if task_2_process.exception:                error, task_2_traceback = task_2_process.exception                # Do not wait until task_1 is finished                task_1_process.terminate()                raise ChildProcessError(task_2_traceback)        task_1_process.join()        task_2_process.join()        task_1_results = task_1_queue.get()        task_2_results = task_2_queue.get()        task_1_users = task_1_results['users']        task_2_users = task_2_results['users']    except Exception:        # Here usually I send email notification with error.        print('traceback:', traceback.format_exc())if __name__ == "__main__":    main()