Parallelizing loading data from MongoDB into python Parallelizing loading data from MongoDB into python pandas pandas

Parallelizing loading data from MongoDB into python


I was looking into pymongo parallelization and this is what worked for me. It took my humble gaming laptop nearly 100 minutes to process my mongodb of 40 million documents. The CPU was 100% utilised I had to turn on the AC :)

I used skip and limit functions to split the database, then assigned batches to processes. The code is written for Python 3:

import multiprocessingfrom pymongo import MongoClientdef your_function(something):    <...>    return resultdef process_cursor(skip_n,limit_n):    print('Starting process',skip_n//limit_n,'...')    collection = MongoClient().<db_name>.<collection_name>    cursor = collection.find({}).skip(skip_n).limit(limit_n)    for doc in cursor:                <do your magic>         # for example:        result = your_function(doc['your_field'] # do some processing on each document        # update that document by adding the result into a new field        collection.update_one({'_id': doc['_id']}, {'$set': {'<new_field_eg>': result} })    print('Completed process',skip_n//limit_n,'...')if __name__ == '__main__':    n_cores = 7                # number of splits (logical cores of the CPU-1)    collection_size = 40126904 # your collection size    batch_size = round(collection_size/n_cores+0.5)    skips = range(0, n_cores*batch_size, batch_size)    processes = [ multiprocessing.Process(target=process_cursor, args=(skip_n,batch_size)) for skip_n in skips]    for process in processes:        process.start()    for process in processes:        process.join()

The last split will have a larger limit than the remaining documents, but that won't raise an error


I think dask-mongo will do the work for here. You can install it with pip or conda, and in the repo you can find some examples in a notebook.

dask-mongo will read the data you have in MongoDB as a Dask bag but then you can go from a Dask bag to a Dask Dataframe with df = b.to_dataframe() where b is the bag you read from mongo using with dask_mongo.read_mongo


"Read the mans, thery're rulez" :)

pymongo.Collection has method parallel_scan that returns a list of cursors.

UPDATE. This function can do the job, if the collection does not change too often, and queries are always the same (my case). One could just store query results in different collections and run parallel scans.