Python multiprocessing safely writing to a file Python multiprocessing safely writing to a file python python

Python multiprocessing safely writing to a file


@GP89 mentioned a good solution. Use a queue to send the writing tasks to a dedicated process that has sole write access to the file. All the other workers have read only access. This will eliminate collisions. Here is an example that uses apply_async, but it will work with map too:

import multiprocessing as mpimport timefn = 'c:/temp/temp.txt'def worker(arg, q):    '''stupidly simulates long running process'''    start = time.clock()    s = 'this is a test'    txt = s    for i in range(200000):        txt += s     done = time.clock() - start    with open(fn, 'rb') as f:        size = len(f.read())    res = 'Process' + str(arg), str(size), done    q.put(res)    return resdef listener(q):    '''listens for messages on the q, writes to file. '''    with open(fn, 'w') as f:        while 1:            m = q.get()            if m == 'kill':                f.write('killed')                break            f.write(str(m) + '\n')            f.flush()def main():    #must use Manager queue here, or will not work    manager = mp.Manager()    q = manager.Queue()        pool = mp.Pool(mp.cpu_count() + 2)    #put listener to work first    watcher = pool.apply_async(listener, (q,))    #fire off workers    jobs = []    for i in range(80):        job = pool.apply_async(worker, (i, q))        jobs.append(job)    # collect results from the workers through the pool result queue    for job in jobs:         job.get()    #now we are done, kill the listener    q.put('kill')    pool.close()    pool.join()if __name__ == "__main__":   main()


It looks to me that you need to use Manager to temporarily save your results to a list and then write the results from the list to a file. Also, use starmap to pass the object you want to process and the managed list. The first step is to build the parameter to be passed to starmap, which includes the managed list.

from multiprocessing import Managerfrom multiprocessing import Pool  import pandas as pddef worker(row, param):    # do something here and then append it to row    x = param**2    row.append(x)if __name__ == '__main__':    pool_parameter = [] # list of objects to process    with Manager() as mgr:        row = mgr.list([])        # build list of parameters to send to starmap        for param in pool_parameter:            params.append([row,param])        with Pool() as p:            p.starmap(worker, params)

From this point you need to decide how you are going to handle the list. If you have tons of RAM and a huge data set feel free to concatenate using pandas. Then you can save of the file very easily as a csv or a pickle.

        df = pd.concat(row, ignore_index=True)        df.to_pickle('data.pickle')        df.to_csv('data.csv')