Python 3: Catching warnings during multiprocessing
you can try to override the Process.run
method to use warnings.catch_warnings
.
>>> from multiprocessing import Process>>> >>> def yell(text):... import warnings... print 'about to yell %s' % text... warnings.warn(text)... >>> class CustomProcess(Process):... def run(self, *args, **kwargs):... import warnings... with warnings.catch_warnings():... warnings.simplefilter("ignore")... return Process.run(self, *args, **kwargs)... >>> if __name__ == '__main__':... quiet = CustomProcess(target=yell, args=('...not!',))... quiet.start()... quiet.join()... noisy = Process(target=yell, args=('AAAAAAaaa!',))... noisy.start()... noisy.join()... about to yell ...not!about to yell AAAAAAaaa!__main__:4: UserWarning: AAAAAAaaa!>>>
or you can use some of the internals... (__warningregistry__
)
>>> from multiprocessing import Process>>> import exceptions>>> def yell(text):... import warnings... print 'about to yell %s' % text... warnings.warn(text)... # not filtered... warnings.warn('complimentary second warning.')... >>> WARNING_TEXT = 'AAAAaaaaa!'>>> WARNING_TYPE = exceptions.UserWarning>>> WARNING_LINE = 4>>> >>> class SelectiveProcess(Process):... def run(self, *args, **kwargs):... registry = globals().setdefault('__warningregistry__', {})... registry[(WARNING_TEXT, WARNING_TYPE, WARNING_LINE)] = True... return Process.run(self, *args, **kwargs)... >>> if __name__ == '__main__':... p = SelectiveProcess(target=yell, args=(WARNING_TEXT,))... p.start()... p.join()... about to yell AAAAaaaaa!__main__:6: UserWarning: complimentary second warning.>>>
The unpickling would not cause the __init__
to be executed twice. I ran the following code on Windows, and it doesn't happen (each __init__
is run precisely once).
Therefore, you need to provide us with the code from my_load_balancer
and from widgets' class. At this point, your question simply doesn't provide enough information.
As a random guess, you might check whether my_load_balancer
makes copies of widgets, causing them to be instantiated once again.
import multiprocessingimport collections"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"def my_load_balancer(widgets): partitions = tuple(set() for _ in range(8)) for i, widget in enumerate(widgets): partitions[i % 8].add(widget) for partition in partitions: yield partitiondef my_frobnal_counter(widget): return widget.iddef frobnicate_parallel_worker(widgets, output_queue): resultant_widget = max(widgets, key=my_frobnal_counter) output_queue.put(resultant_widget)def frobnicate_parallel(widgets): output_queue = multiprocessing.Queue() # partitions: Generator yielding tuples of sets partitions = my_load_balancer(widgets) processes = [] # Line A: Possible start of where the warnings are coming from. for partition in partitions: p = multiprocessing.Process( target=frobnicate_parallel_worker, args=(partition, output_queue)) processes.append(p) p.start() finalists = [] for p in processes: finalists.append(output_queue.get()) # Avoid deadlocks in Unix by draining queue before joining processes for p in processes: p.join() # Line B: Warnings no longer possible after here. return max(finalists, key=my_frobnal_counter)class Widget: id = 0 def __init__(self): print('initializing Widget {}'.format(self.id)) self.id = Widget.id Widget.id += 1 def __str__(self): return str(self.id) def __repr__(self): return str(self)def main(): widgets = [Widget() for _ in range(16)] result = frobnicate_parallel(widgets) print(result.id)if __name__ == '__main__': main()
Years later, I finally have a solution (found while working on an unrelated problem). I've tested this on Python 3.7, 3.8, and 3.9.
Temporarily patch sys.warnoptions
with the empty list []
. You only need to do this around the call to process.start()
. sys.warnoptions
is documented as an implementation detail that you shouldn't manually modify; the official recommendations are to use functions in the warnings
module and to set PYTHONWARNINGS
in os.environ
. This doesn't work. The only thing that seems to work is patching sys.warnoptions
. In a test, you can do the following:
import multiprocessingfrom unittest.mock import patchp = multiprocessing.Process(target=my_function)with patch('sys.warnoptions', []): p.start()p.join()
If you don't want to use unittest.mock
, just patch by hand:
import multiprocessingimport sysp = multiprocessing.Process(target=my_function)old_warnoptions = sys.warnoptionstry: sys.warnoptions = [] p.start()finally: sys.warnoptions = old_warnoptionsp.join()