Pandas and Multiprocessing Memory Management: Splitting a DataFrame into Multiple Chunks Pandas and Multiprocessing Memory Management: Splitting a DataFrame into Multiple Chunks pandas pandas

Pandas and Multiprocessing Memory Management: Splitting a DataFrame into Multiple Chunks


Ok, so I figured it out after the hint by Sebastian Opałczyński in the comments.

The problem is that the child processes are forked from the parent, so all of them contain a reference to the original DataFrame. However, the frame is manipulated in the original process, so the copy-on-write behavior kills the whole thing slowly and eventually when the limit of the physical memory is reached.

There is a simple solution: Instead of pool = mp.Pool(n_jobs), I use the new context feature of multiprocessing:

ctx = mp.get_context('spawn')pool = ctx.Pool(n_jobs)

This guarantees that the Pool processes are just spawned and not forked from the parent process. Accordingly, none of them has access to the original DataFrame and all of them only need a tiny fraction of the parent's memory.

Note that the mp.get_context('spawn') is only available in Python 3.4 and newer.


A better implementation is just to use the pandas implementation of chunked dataframe as a generator and feed it into the "pool.imap" functionpd.read_csv('<filepath>.csv', chucksize=<chunksize>)https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html

Benefit: It doesn't read in the whole df in your main process (save memory). Each child process will be pointed the chunk it needs only. --> solve the child memory issue.

Overhead: It requires you to save your df as csv first and read it in again using pd.read_csv --> I/O time.

Note: chunksize is not available to pd.read_pickle or other loading methods that are compressed on storage.

def main():    # Job parameters    n_jobs = 4  # Poolsize    size = (10000, 1000)  # Size of DataFrame    chunksize = 100  # Maximum size of Frame Chunk    # Preparation    df = pd.DataFrame(np.random.rand(*size))    pool = mp.Pool(n_jobs)    print('Starting MP')    # Execute the wait and print function in parallel    df_chunked = pd.read_csv('<filepath>.csv',chunksize = chunksize) # modified    pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, df_chunked) # modified    pool.close()    pool.join()    print('DONE')