Python MongoDB (PyMongo) Mutliprocessing cursor Python MongoDB (PyMongo) Mutliprocessing cursor mongodb mongodb

Python MongoDB (PyMongo) Mutliprocessing cursor


Here's how you can use a Pool to feed the children:

START = time.time()def remaining_time(a, b):    if START:        y = (time.time() - START)        z = ((a * y) / b) - y        d = time.strftime('%H:%M:%S', time.gmtime(z))        e = round(b / y)        progress("{0}/{1} | Tiempo restante {2} ({3}p/s)".format(b, a, d, e), b, a)def progress(p, c, t):    pc = (c * 100) / t    sys.stdout.write("%s [%-20s] %d%%\r" % (p, '█' * (pc / 5), pc))    sys.stdout.flush()def dowork(args):    p, i, pcount  = args    remaining_time(pcount, i)def main():    queue = multiprocessing.Queue()    procs = [multiprocessing.Process(target=dowork, args=(queue,)) for _ in range(CONFIG_POOL_SIZE)]    pool = multiprocessing.Pool(CONFIG_POOL_SIZE)    mongo_query = {}    products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)    pcount = products.count()    pool.map(dowork, ((p, idx, pcount) for idx,p in enumerate(products)))    pool.close()    pool.join()

Note that using pool.map requires loading everything from the cursor into memory at once, though, which might be a problem because of how large it is. You can use imap to avoid consuming the whole thing at once, but you'll need to specify a chunksize to minimize IPC overhead:

# Calculate chunksize using same algorithm used internally by pool.mapchunksize, extra = divmod(pcount, CONFIG_POOL_SIZE * 4)if extra:   chunksize += 1pool.imap(dowork, ((p, idx, pcount) for idx,p in enumerate(products)), chunksize=chunksize)pool.close()pool.join()

For 1,000,000 items, that gives a chunksize of 12,500. You can try sizes larger and smaller than that, and see how it affects performance.

I'm not sure this will help much though, if the bottleneck is actually just pulling the data out of MongoDB.


Why are you using multiprocessing? You don't seem to be doing actual work in other threads using the queue. Python has a global interpreter lock which makes multithreaded code less performant than you'd expect. It's probably making this program slower, not faster.

A couple performance tips:

  1. Try setting batch_size in your find() call to some big number (e.g. 20000). This is the maximum number of documents returned at a time, before the client fetches more, and the default is 101.

  2. Try setting cursor_type to pymongo.cursor.CursorType.EXHAUST, which might reduce the latency you're seeing.