Python multiprocessing safely writing to a file
@GP89 mentioned a good solution. Use a queue to send the writing tasks to a dedicated process that has sole write access to the file. All the other workers have read only access. This will eliminate collisions. Here is an example that uses apply_async, but it will work with map too:
import multiprocessing as mpimport timefn = 'c:/temp/temp.txt'def worker(arg, q): '''stupidly simulates long running process''' start = time.clock() s = 'this is a test' txt = s for i in range(200000): txt += s done = time.clock() - start with open(fn, 'rb') as f: size = len(f.read()) res = 'Process' + str(arg), str(size), done q.put(res) return resdef listener(q): '''listens for messages on the q, writes to file. ''' with open(fn, 'w') as f: while 1: m = q.get() if m == 'kill': f.write('killed') break f.write(str(m) + '\n') f.flush()def main(): #must use Manager queue here, or will not work manager = mp.Manager() q = manager.Queue() pool = mp.Pool(mp.cpu_count() + 2) #put listener to work first watcher = pool.apply_async(listener, (q,)) #fire off workers jobs = [] for i in range(80): job = pool.apply_async(worker, (i, q)) jobs.append(job) # collect results from the workers through the pool result queue for job in jobs: job.get() #now we are done, kill the listener q.put('kill') pool.close() pool.join()if __name__ == "__main__": main()
It looks to me that you need to use Manager
to temporarily save your results to a list and then write the results from the list to a file. Also, use starmap
to pass the object you want to process and the managed list. The first step is to build the parameter to be passed to starmap
, which includes the managed list.
from multiprocessing import Managerfrom multiprocessing import Pool import pandas as pddef worker(row, param): # do something here and then append it to row x = param**2 row.append(x)if __name__ == '__main__': pool_parameter = [] # list of objects to process with Manager() as mgr: row = mgr.list([]) # build list of parameters to send to starmap for param in pool_parameter: params.append([row,param]) with Pool() as p: p.starmap(worker, params)
From this point you need to decide how you are going to handle the list. If you have tons of RAM and a huge data set feel free to concatenate using pandas. Then you can save of the file very easily as a csv or a pickle.
df = pd.concat(row, ignore_index=True) df.to_pickle('data.pickle') df.to_csv('data.csv')