Efficiently applying a function to a grouped pandas DataFrame in parallel
From the comments above, it seems that this is planned for pandas
some time (there's also an interesting-looking rosetta
project which I just noticed).
However, until every parallel functionality is incorporated into pandas
, I noticed that it's very easy to write efficient & non-memory-copying parallel augmentations to pandas
directly using cython
+ OpenMP and C++.
Here's a short example of writing a parallel groupby-sum, whose use is something like this:
import pandas as pdimport para_group_demodf = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})print para_group_demo.sum(df.a, df.b)
and output is:
sumkey 0 61 112 4
Note Doubtlessly, this simple example's functionality will eventually be part of pandas
. Some things, however, will be more natural to parallelize in C++ for some time, and it's important to be aware of how easy it is to combine this into pandas
.
To do this, I wrote a simple single-source-file extension whose code follows.
It starts with some imports and type definitions
from libc.stdint cimport int64_t, uint64_tfrom libcpp.vector cimport vectorfrom libcpp.unordered_map cimport unordered_mapcimport cythonfrom cython.operator cimport dereference as deref, preincrement as incfrom cython.parallel import prangeimport pandas as pdctypedef unordered_map[int64_t, uint64_t] counts_tctypedef unordered_map[int64_t, uint64_t].iterator counts_it_tctypedef vector[counts_t] counts_vec_t
The C++ unordered_map
type is for summing by a single thread, and the vector
is for summing by all threads.
Now to the function sum
. It starts off with typed memory views for fast access:
def sum(crit, vals): cdef int64_t[:] crit_view = crit.values cdef int64_t[:] vals_view = vals.values
The function continues by dividing the semi-equally to the threads (here hardcoded to 4), and having each thread sum the entries in its range:
cdef uint64_t num_threads = 4 cdef uint64_t l = len(crit) cdef uint64_t s = l / num_threads + 1 cdef uint64_t i, j, e cdef counts_vec_t counts counts = counts_vec_t(num_threads) counts.resize(num_threads) with cython.boundscheck(False): for i in prange(num_threads, nogil=True): j = i * s e = j + s if e > l: e = l while j < e: counts[i][crit_view[j]] += vals_view[j] inc(j)
When the threads have completed, the function merges all the results (from the different ranges) into a single unordered_map
:
cdef counts_t total cdef counts_it_t it, e_it for i in range(num_threads): it = counts[i].begin() e_it = counts[i].end() while it != e_it: total[deref(it).first] += deref(it).second inc(it)
All that's left is to create a DataFrame
and return the results:
key, sum_ = [], [] it = total.begin() e_it = total.end() while it != e_it: key.append(deref(it).first) sum_.append(deref(it).second) inc(it) df = pd.DataFrame({'key': key, 'sum': sum_}) df.set_index('key', inplace=True) return df