Robert W. Hunter
Robert W. Hunter

Reputation: 3003

Python MongoDB (PyMongo) Mutliprocessing cursor

I am trying to make a multiprocessing MongoDB utility, it is perfectly working, but I think I have a performance issue... Even with 20 workers,it isn't processing more than 2800 docs per second... I think I can get 5x faster... This is my code, it isn't doing anything exceptional, just prints a remaining time to the end of the cursor.

Maybe there is a better way to perform multiprocessing on a MongoDB cursor, because I need to run some stuff on every doc with a 17.4M records collection, so performance and less time is a must.

START = time.time()
def remaining_time(a, b):
    if START:
        y = (time.time() - START)
        z = ((a * y) / b) - y
        d = time.strftime('%H:%M:%S', time.gmtime(z))
        e = round(b / y)
        progress("{0}/{1} | Tiempo restante {2} ({3}p/s)".format(b, a, d, e), b, a)


def progress(p, c, t):
    pc = (c * 100) / t
    sys.stdout.write("%s [%-20s] %d%%\r" % (p, '█' * (pc / 5), pc))
    sys.stdout.flush()

def dowork(queue):
    for p, i, pcount in iter(queue.get, 'STOP'):
        remaining_time(pcount, i)


def populate_jobs(queue):
    mongo_query = {}
    products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
    if products:
        pcount = products.count()
        i = 1
        print "Procesando %s productos..." % pcount
        for p in products:
            try:
                queue.put((p, i, pcount))
                i += 1
            except Exception, e:
                utils.log(e)
                continue
    queue.put('STOP')


def main():
    queue = multiprocessing.Queue()

    procs = [multiprocessing.Process(target=dowork, args=(queue,)) for _ in range(CONFIG_POOL_SIZE)]

    for p in procs:
        p.start()

    populate_jobs(queue)

    for p in procs:
        p.join()

Also, I've noticed that about every 2500 aprox documents, script pauses for about .5 - 1 secs which is obviously a bad issue. This is a MongoDB problem becase if I do the exactly same loop but using a range(0, 1000000) script doesn't pause at all and runs at 57,000 iterations per second, with a total of 20 seconds to end the script... Huge difference from 2,800 MongoDB documents per second...

This is the code to run a 1,000,000 iteration loop instead docs.

def populate_jobs(queue):
    mongo_query = {}
    products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
    if products:
        pcount = 1000000
        i = 1
        print "Procesando %s productos..." % pcount
        for p in range(0, 1000000):
            queue.put((p, i, pcount))
            i += 1
    queue.put('STOP')

UPDATE As I saw, the problem is not the multiprocessing itself, is the cursor filling the Queue which is not running in multiprocessing mode, it is one simple process that fills the Queue (populateJobs method) maybe if I could make the cursor multithread/multirpocess and fill the Queue in parallel it will be filled up faster, then the multiprocessing method dowork will do faster, because I think there's a bottleneck where I only fill about 2,800 items per second in Queue and retrieving a lot more in dowork multiprocess, but I don't know how can I parallelize MongoDB cursor.

Maybe, the problem is the latency between my computer and the server's MongoDB. That latency, between me asking for next cursor and MongoDB telling me which is, reduces my performance by 2000% (from 61,000 str/s to 2,800 doc/s) NOPE I've tried on a localhost MongoDB and performance is exactly the same... This is driving me nuts

Upvotes: 4

Views: 6711

Answers (2)

dano
dano

Reputation: 94901

Here's how you can use a Pool to feed the children:

START = time.time()
def remaining_time(a, b):
    if START:
        y = (time.time() - START)
        z = ((a * y) / b) - y
        d = time.strftime('%H:%M:%S', time.gmtime(z))
        e = round(b / y)
        progress("{0}/{1} | Tiempo restante {2} ({3}p/s)".format(b, a, d, e), b, a)


def progress(p, c, t):
    pc = (c * 100) / t
    sys.stdout.write("%s [%-20s] %d%%\r" % (p, '█' * (pc / 5), pc))
    sys.stdout.flush()

def dowork(args):
    p, i, pcount  = args
    remaining_time(pcount, i)

def main():
    queue = multiprocessing.Queue()

    procs = [multiprocessing.Process(target=dowork, args=(queue,)) for _ in range(CONFIG_POOL_SIZE)]
    pool = multiprocessing.Pool(CONFIG_POOL_SIZE)
    mongo_query = {}
    products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
    pcount = products.count()
    pool.map(dowork, ((p, idx, pcount) for idx,p in enumerate(products)))
    pool.close()
    pool.join()

Note that using pool.map requires loading everything from the cursor into memory at once, though, which might be a problem because of how large it is. You can use imap to avoid consuming the whole thing at once, but you'll need to specify a chunksize to minimize IPC overhead:

# Calculate chunksize using same algorithm used internally by pool.map
chunksize, extra = divmod(pcount, CONFIG_POOL_SIZE * 4)
if extra:
   chunksize += 1

pool.imap(dowork, ((p, idx, pcount) for idx,p in enumerate(products)), chunksize=chunksize)
pool.close()
pool.join()

For 1,000,000 items, that gives a chunksize of 12,500. You can try sizes larger and smaller than that, and see how it affects performance.

I'm not sure this will help much though, if the bottleneck is actually just pulling the data out of MongoDB.

Upvotes: 5

paulmelnikow
paulmelnikow

Reputation: 17208

Why are you using multiprocessing? You don't seem to be doing actual work in other threads using the queue. Python has a global interpreter lock which makes multithreaded code less performant than you'd expect. It's probably making this program slower, not faster.

A couple performance tips:

  1. Try setting batch_size in your find() call to some big number (e.g. 20000). This is the maximum number of documents returned at a time, before the client fetches more, and the default is 101.

  2. Try setting cursor_type to pymongo.cursor.CursorType.EXHAUST, which might reduce the latency you're seeing.

Upvotes: 1

Related Questions