python dask DataFrame, support for (trivially parallelizable) row apply? python dask DataFrame, support for (trivially parallelizable) row apply? python python

python dask DataFrame, support for (trivially parallelizable) row apply?


map_partitions

You can apply your function to all of the partitions of your dataframe with the map_partitions function.

df.map_partitions(func, columns=...)

Note that func will be given only part of the dataset at a time, not the entire dataset like with pandas apply (which presumably you wouldn't want if you want to do parallelism.)

map / apply

You can map a function row-wise across a series with map

df.mycolumn.map(func)

You can map a function row-wise across a dataframe with apply

df.apply(func, axis=1)

Threads vs Processes

As of version 0.6.0 dask.dataframes parallelizes with threads. Custom Python functions will not receive much benefit from thread-based parallelism. You could try processes instead

df = dd.read_csv(...)df.map_partitions(func, columns=...).compute(scheduler='processes')

But avoid apply

However, you should really avoid apply with custom Python functions, both in Pandas and in Dask. This is often a source of poor performance. It could be that if you find a way to do your operation in a vectorized manner then it could be that your Pandas code will be 100x faster and you won't need dask.dataframe at all.

Consider numba

For your particular problem you might consider numba. This significantly improves your performance.

In [1]: import numpy as npIn [2]: import pandas as pdIn [3]: s = pd.Series([10000]*120)In [4]: %pastedef slow_func(k):    A = np.random.normal(size = k) # k = 10000    s = 0    for a in A:        if a > 0:            s += 1        else:            s -= 1    return s## -- End pasted text --In [5]: %time _ = s.apply(slow_func)CPU times: user 345 ms, sys: 3.28 ms, total: 348 msWall time: 347 msIn [6]: import numbaIn [7]: fast_func = numba.jit(slow_func)In [8]: %time _ = s.apply(fast_func)  # First time incurs compilation overheadCPU times: user 179 ms, sys: 0 ns, total: 179 msWall time: 175 msIn [9]: %time _ = s.apply(fast_func)  # Subsequent times are all gainCPU times: user 68.8 ms, sys: 27 µs, total: 68.8 msWall time: 68.7 ms

Disclaimer, I work for the company that makes both numba and dask and employs many of the pandas developers.


As of v dask.dataframe.apply delegates responsibility to map_partitions:

@insert_meta_param_description(pad=12)def apply(self, func, convert_dtype=True, meta=no_default, args=(), **kwds):    """ Parallel version of pandas.Series.apply    ...    """    if meta is no_default:        msg = ("`meta` is not specified, inferred from partial data. "               "Please provide `meta` if the result is unexpected.\n"               "  Before: .apply(func)\n"               "  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result\n"               "  or:     .apply(func, meta=('x', 'f8'))            for series result")        warnings.warn(msg)        meta = _emulate(M.apply, self._meta_nonempty, func,                        convert_dtype=convert_dtype,                        args=args, **kwds)    return map_partitions(M.apply, self, func,                          convert_dtype, args, meta=meta, **kwds)