Python: Writing to a single file with queue while using multiprocessing Pool Python: Writing to a single file with queue while using multiprocessing Pool python python

Python: Writing to a single file with queue while using multiprocessing Pool


Multiprocessing pools implement a queue for you. Just use a pool method that returns the worker return value to the caller. imap works well:

import multiprocessing import redef mp_worker(filename):    with open(filename) as f:        text = f.read()    m = re.findall("x+", text)    count = len(max(m, key=len))    return filename, countdef mp_handler():    p = multiprocessing.Pool(32)    with open('infilenamess.txt') as f:        filenames = [line for line in (l.strip() for l in f) if line]    with open('results.txt', 'w') as f:        for result in p.imap(mp_worker, filenames):            # (filename, count) tuples from worker            f.write('%s: %d\n' % result)if __name__=='__main__':    mp_handler()


I took the accepted answer and simplified it for my own understanding of how this works. I am posting it here in case it helps someone else.

import multiprocessingdef mp_worker(number):    number += 1    return numberdef mp_handler():    p = multiprocessing.Pool(32)    numbers = list(range(1000))    with open('results.txt', 'w') as f:        for result in p.imap(mp_worker, numbers):            f.write('%d\n' % result)if __name__=='__main__':    mp_handler()


Here's my approach using a multiprocessing Manager object. The nice thing about this approach is that when processing drops out of the manager with block in the run_multi() function, the filewriter queue is automatically closed making code very easy to read and you have no hassle trying to stop listening on the queue.

from functools import partialfrom multiprocessing import Manager, Pool, Queuefrom random import randintimport timedef run_multi():    input = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]    with Manager() as manager:        pool = Pool()  # By default pool will size depending on cores available        message_queue = manager.Queue()  # Queue for sending messages to file writer listener        pool.apply_async(file_writer, (message_queue, ))  # Start file listener ahead of doing the work        pool.map(partial(worker, message_queue=message_queue), input)  # Partial function allows us to use map to divide workloaddef worker(input: int, message_queue: Queue):    message_queue.put(input * 10)    time.sleep(randint(1, 5))  # Simulate hard workdef file_writer(message_queue: Queue):    with open("demo.txt", "a") as report:        while True:            report.write(f"Value is: {message_queue.get()}\n")if __name__ == "__main__":    run_multi()