Parallelize apply after pandas groupby Parallelize apply after pandas groupby python python

Parallelize apply after pandas groupby


This seems to work, although it really should be built in to pandas

import pandas as pdfrom joblib import Parallel, delayedimport multiprocessingdef tmpFunc(df):    df['c'] = df.a + df.b    return dfdef applyParallel(dfGrouped, func):    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)    return pd.concat(retLst)if __name__ == '__main__':    df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])    print 'parallel version: '    print applyParallel(df.groupby(df.index), tmpFunc)    print 'regular version: '    print df.groupby(df.index).apply(tmpFunc)    print 'ideal version (does not work): '    print df.groupby(df.index).applyParallel(tmpFunc)


Ivan's answer is great, but it looks like it can be slightly simplified, also removing the need to depend on joblib:

from multiprocessing import Pool, cpu_countdef applyParallel(dfGrouped, func):    with Pool(cpu_count()) as p:        ret_list = p.map(func, [group for name, group in dfGrouped])    return pandas.concat(ret_list)

By the way: this can not replace any groupby.apply(), but it will cover the typical cases: e.g. it should cover cases 2 and 3 in the documentation, while you should obtain the behaviour of case 1 by giving the argument axis=1 to the final pandas.concat() call.

EDIT: the docs changed; the old version can be found here, in any case I'm copypasting the three examples below.

case 1: group DataFrame apply aggregation function (f(chunk) -> Series) yield DataFrame, with group axis having group labelscase 2: group DataFrame apply transform function ((f(chunk) -> DataFrame with same indexes) yield DataFrame with resulting chunks glued togethercase 3: group Series apply function with f(chunk) -> DataFrame yield DataFrame with result of chunks glued together


I have a hack I use for getting parallelization in Pandas. I break my dataframe into chunks, put each chunk into the element of a list, and then use ipython's parallel bits to do a parallel apply on the list of dataframes. Then I put the list back together using pandas concat function.

This is not generally applicable, however. It works for me because the function I want to apply to each chunk of the dataframe takes about a minute. And the pulling apart and putting together of my data does not take all that long. So this is clearly a kludge. With that said, here's an example. I'm using Ipython notebook so you'll see %%time magic in my code:

## make some example dataimport pandas as pdnp.random.seed(1)n=10000df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n),                    'data' : np.random.rand(n)})grouped = df.groupby('mygroup')

For this example I'm going to make 'chunks' based on the above groupby, but this does not have to be how the data is chunked. Although it's a pretty common pattern.

dflist = []for name, group in grouped:    dflist.append(group)

set up the parallel bits

from IPython.parallel import Clientrc = Client()lview = rc.load_balanced_view()lview.block = True

write a silly function to apply to our data

def myFunc(inDf):    inDf['newCol'] = inDf.data ** 10    return inDf

now let's run the code in serial then in parallel. serial first:

%%timeserial_list = map(myFunc, dflist)CPU times: user 14 s, sys: 19.9 ms, total: 14 sWall time: 14 s

now parallel

%%timeparallel_list = lview.map(myFunc, dflist)CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 sWall time: 1.56 s

then it only takes a few ms to merge them back into one dataframe

%%timecombinedDf = pd.concat(parallel_list) CPU times: user 296 ms, sys: 5.27 ms, total: 301 msWall time: 300 ms

I'm running 6 IPython engines on my MacBook, but you can see it drops the execute time down to 2s from 14s.

For really long running stochastic simulations I can use AWS backend by firing up a cluster with StarCluster. Much of the time, however, I parallelize just across 8 CPUs on my MBP.