Multiprocessing causes Python to crash and gives an error may have been in progress in another thread when fork() was called
This error occurs because of added security to restrict multithreading in macOS High Sierra and later versions of macOS. I know this answer is a bit late, but I solved the problem using the following method:
Set an environment variable .bash_profile
(or .zshrc
for recent macOS) to allow multithreading applications or scripts under the new macOS High Sierra security rules.
Open a terminal:
$ nano .bash_profile
Add the following line to the end of the file:
OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
Save, exit, close terminal and re-open the terminal. Check to see that the environment variable is now set:
$ env
You will see output similar to:
TERM_PROGRAM=Apple_TerminalSHELL=/bin/bashTERM=xterm-256colorTMPDIR=/var/folders/pn/vasdlj3ojO#OOas4dasdffJq/T/Apple_PubSub_Socket_Render=/private/tmp/com.apple.launchd.E7qLFJDSo/RenderTERM_PROGRAM_VERSION=404TERM_SESSION_ID=NONEOBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
You should now be able to run your Python script with multithreading.
The solution that works for me without OBJC_DISABLE_INITIALIZE_FORK_SAFETY
flag in the environment involves initializing the multiprocessing.Pool
class right after the main()
program starts.
This is most likely not the fastest solution possible and I am not sure if it works in all situations, however, pre-heating the worker processes early enough before my programs starts does not result in any ... may have been in progress in another thread when fork() was called
errors and I do get a significant performance boost compared to what I get with non-parallelized code.
I have created a convenience class Parallelizer
which I am starting very early and then using throughout the lifecycle of my program.
# entry point to my programdef main(): parallelizer = Parallelizer() ...
Then whenever you want to have parallelization:
# this function is parallelized. it is run by each child process.def processing_function(input): ... return output...inputs = [...]results = parallelizer.map( inputs, processing_function)
And the parallelizer class:
class Parallelizer: def __init__(self): self.input_queue = multiprocessing.Queue() self.output_queue = multiprocessing.Queue() self.pool = multiprocessing.Pool(multiprocessing.cpu_count(), Parallelizer._run, (self.input_queue, self.output_queue,)) def map(self, contents, processing_func): size = 0 for content in contents: self.input_queue.put((content, processing_func)) size += 1 results = [] while size > 0: result = self.output_queue.get(block=True) results.append(result) size -= 1 return results @staticmethod def _run(input_queue, output_queue): while True: content, processing_func = input_queue.get(block=True) result = processing_func(content) output_queue.put(result)
One caveat: the parallelized code might be difficult to debug so I have also prepared a non-parallelizing version of my class which I enable when something goes wrong in the child processes:
class NullParallelizer: @staticmethod def map(contents, processing_func): results = [] for content in contents: results.append(processing_func(content)) return results