Is there a simple process-based parallel map for python? Is there a simple process-based parallel map for python? python python

Is there a simple process-based parallel map for python?


I seems like what you need is the map method in multiprocessing.Pool():

map(func, iterable[, chunksize])

A parallel equivalent of the map() built-in function (it supports onlyone iterable argument though). It blocks till the result is ready.This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integ

For example, if you wanted to map this function:

def f(x):    return x**2

to range(10), you could do it using the built-in map() function:

map(f, range(10))

or using a multiprocessing.Pool() object's method map():

import multiprocessingpool = multiprocessing.Pool()print pool.map(f, range(10))


This can be done elegantly with Ray, a system that allows you to easily parallelize and distribute your Python code.

To parallelize your example, you'd need to define your map function with the @ray.remote decorator, and then invoke it with .remote. This will ensure that every instance of the remote function will executed in a different process.

import timeimport rayray.init()# Define the function you want to apply map on, as remote function. @ray.remotedef f(x):    # Do some work...    time.sleep(1)    return x*x# Define a helper parmap(f, list) function.# This function executes a copy of f() on each element in "list".# Each copy of f() runs in a different process.# Note f.remote(x) returns a future of its result (i.e., # an identifier of the result) rather than the result itself.  def parmap(f, list):    return [f.remote(x) for x in list]# Call parmap() on a list consisting of first 5 integers.result_ids = parmap(f, range(1, 6))# Get the resultsresults = ray.get(result_ids)print(results)

This will print:

[1, 4, 9, 16, 25]

and it will finish in approximately len(list)/p (rounded up the nearest integer) where p is number of cores on your machine. Assuming a machine with 2 cores, our example will execute in 5/2 rounded up, i.e, in approximately 3 sec.

There are a number of advantages of using Ray over the multiprocessing module. In particular, the same code will run on a single machine as well as on a cluster of machines. For more advantages of Ray see this related post.


For those who looking for Python equivalent of R's mclapply(), here is my implementation. It is an improvement of the following two examples:

It can be apply to map functions with single or multiple arguments.

import numpy as np, pandas as pdfrom scipy import sparseimport functools, multiprocessingfrom multiprocessing import Poolnum_cores = multiprocessing.cpu_count()def parallelize_dataframe(df, func, U=None, V=None):    #blockSize = 5000    num_partitions = 5 # int( np.ceil(df.shape[0]*(1.0/blockSize)) )    blocks = np.array_split(df, num_partitions)    pool = Pool(num_cores)    if V is not None and U is not None:        # apply func with multiple arguments to dataframe (i.e. involves multiple columns)        df = pd.concat(pool.map(functools.partial(func, U=U, V=V), blocks))    else:        # apply func with one argument to dataframe (i.e. involves single column)        df = pd.concat(pool.map(func, blocks))    pool.close()    pool.join()    return dfdef square(x):    return x**2def test_func(data):    print("Process working on: ", data.shape)    data["squareV"] = data["testV"].apply(square)    return datadef vecProd(row, U, V):    return np.sum( np.multiply(U[int(row["obsI"]),:], V[int(row["obsJ"]),:]) )def mProd_func(data, U, V):    data["predV"] = data.apply( lambda row: vecProd(row, U, V), axis=1 )    return datadef generate_simulated_data():    N, D, nnz, K = [302, 184, 5000, 5]    I = np.random.choice(N, size=nnz, replace=True)    J = np.random.choice(D, size=nnz, replace=True)    vals = np.random.sample(nnz)    sparseY = sparse.csc_matrix((vals, (I, J)), shape=[N, D])    # Generate parameters U and V which could be used to reconstruct the matrix Y    U = np.random.sample(N*K).reshape([N,K])    V = np.random.sample(D*K).reshape([D,K])    return sparseY, U, Vdef main():    Y, U, V = generate_simulated_data()    # find row, column indices and obvseved values for sparse matrix Y    (testI, testJ, testV) = sparse.find(Y)    colNames = ["obsI", "obsJ", "testV", "predV", "squareV"]    dtypes = {"obsI":int, "obsJ":int, "testV":float, "predV":float, "squareV": float}    obsValDF = pd.DataFrame(np.zeros((len(testV), len(colNames))), columns=colNames)    obsValDF["obsI"] = testI    obsValDF["obsJ"] = testJ    obsValDF["testV"] = testV    obsValDF = obsValDF.astype(dtype=dtypes)    print("Y.shape: {!s}, #obsVals: {}, obsValDF.shape: {!s}".format(Y.shape, len(testV), obsValDF.shape))    # calculate the square of testVals        obsValDF = parallelize_dataframe(obsValDF, test_func)    # reconstruct prediction of testVals using parameters U and V    obsValDF = parallelize_dataframe(obsValDF, mProd_func, U, V)    print("obsValDF.shape after reconstruction: {!s}".format(obsValDF.shape))    print("First 5 elements of obsValDF:\n", obsValDF.iloc[:5,:])if __name__ == '__main__':    main()