Parallelize apply after pandas groupby
This seems to work, although it really should be built in to pandas
import pandas as pdfrom joblib import Parallel, delayedimport multiprocessingdef tmpFunc(df): df['c'] = df.a + df.b return dfdef applyParallel(dfGrouped, func): retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped) return pd.concat(retLst)if __name__ == '__main__': df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2']) print 'parallel version: ' print applyParallel(df.groupby(df.index), tmpFunc) print 'regular version: ' print df.groupby(df.index).apply(tmpFunc) print 'ideal version (does not work): ' print df.groupby(df.index).applyParallel(tmpFunc)
Ivan's answer is great, but it looks like it can be slightly simplified, also removing the need to depend on joblib:
from multiprocessing import Pool, cpu_countdef applyParallel(dfGrouped, func): with Pool(cpu_count()) as p: ret_list = p.map(func, [group for name, group in dfGrouped]) return pandas.concat(ret_list)
By the way: this can not replace any groupby.apply(), but it will cover the typical cases: e.g. it should cover cases 2 and 3 in the documentation, while you should obtain the behaviour of case 1 by giving the argument axis=1
to the final pandas.concat()
call.
EDIT: the docs changed; the old version can be found here, in any case I'm copypasting the three examples below.
case 1: group DataFrame apply aggregation function (f(chunk) -> Series) yield DataFrame, with group axis having group labelscase 2: group DataFrame apply transform function ((f(chunk) -> DataFrame with same indexes) yield DataFrame with resulting chunks glued togethercase 3: group Series apply function with f(chunk) -> DataFrame yield DataFrame with result of chunks glued together
I have a hack I use for getting parallelization in Pandas. I break my dataframe into chunks, put each chunk into the element of a list, and then use ipython's parallel bits to do a parallel apply on the list of dataframes. Then I put the list back together using pandas concat
function.
This is not generally applicable, however. It works for me because the function I want to apply to each chunk of the dataframe takes about a minute. And the pulling apart and putting together of my data does not take all that long. So this is clearly a kludge. With that said, here's an example. I'm using Ipython notebook so you'll see %%time
magic in my code:
## make some example dataimport pandas as pdnp.random.seed(1)n=10000df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n), 'data' : np.random.rand(n)})grouped = df.groupby('mygroup')
For this example I'm going to make 'chunks' based on the above groupby, but this does not have to be how the data is chunked. Although it's a pretty common pattern.
dflist = []for name, group in grouped: dflist.append(group)
set up the parallel bits
from IPython.parallel import Clientrc = Client()lview = rc.load_balanced_view()lview.block = True
write a silly function to apply to our data
def myFunc(inDf): inDf['newCol'] = inDf.data ** 10 return inDf
now let's run the code in serial then in parallel. serial first:
%%timeserial_list = map(myFunc, dflist)CPU times: user 14 s, sys: 19.9 ms, total: 14 sWall time: 14 s
now parallel
%%timeparallel_list = lview.map(myFunc, dflist)CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 sWall time: 1.56 s
then it only takes a few ms to merge them back into one dataframe
%%timecombinedDf = pd.concat(parallel_list) CPU times: user 296 ms, sys: 5.27 ms, total: 301 msWall time: 300 ms
I'm running 6 IPython engines on my MacBook, but you can see it drops the execute time down to 2s from 14s.
For really long running stochastic simulations I can use AWS backend by firing up a cluster with StarCluster. Much of the time, however, I parallelize just across 8 CPUs on my MBP.