Fast or Bulk Upsert in pymongo

How can I do a bulk upsert in pymongo? I want to Update a bunch of entries and doing them one at a time is very slow.

The answer to an almost identical question is here: Bulk update/upsert in MongoDB?

The accepted answer doesn't actually answer the question. It simply gives a link to the mongo CLI for doing import/exports.

I would also be open to someone explaining why doing a bulk upsert is no possible / no a best practice, but please explain what the preferred solution to this sort of problem is.

Upvotes: 58

Views: 44725

Answers (6)

Kevin J. Rice
Kevin J. Rice

Reputation: 3373

MongoDB 2.6+ has support for bulk operations. This includes bulk inserts, upserts, updates, etc. The point of this is to reduce/eliminate delays from the round-trip latency of doing record-by-record operations ('document by document' to be correct).

So, how does this work? Example in Python, because that's what I'm working in.

>>> import pymongo
>>> pymongo.version
'2.7rc0'

To use this feature, we create a 'bulk' object, add documents to it, then call execute on it and it will send all the updates at once. Caveats: The BSONsize of the collected operations (sum of the bsonsizes) cannot be over the document size limit of 16 MB. Of course, the number of operations can thus vary significantly, Your Mileage May Vary.

Example in Pymongo of Bulk upsert operation:

import pymongo
conn = pymongo.MongoClient('myserver', 8839)
db = conn['mydbname']
coll = db.myCollection
bulkop = coll.initialize_ordered_bulk_op()
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':1}})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':2}})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':3}})
retval = bulkop.execute()

This is the essential method. More info available at:

http://api.mongodb.org/python/2.7rc1/examples/bulk.html

Edit :- since version 3.5 of python driver, initialize_ordered_bulk_op is deprecated. Use bulk_write() instead. [ http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.bulk_write ]

Upvotes: 34

袁華谷-Andy
袁華谷-Andy

Reputation: 121

if you have many data, and you want to use "_id" for judge if data exist,

you can try...

import pymongo
from pymongo import UpdateOne
client = pymongo.MongoClient('localhost', 27017)
db=client['sampleDB']

collectionInfo = db.sample

#sample data
datas=[
    {"_id":123456,"name":"aaa","N":1,"comment":"first sample","lat":22,"lng":33},
    {"_id":234567,"name":"aaa","N":1,"comment":"second sample","lat":22,"lng":33},
    {"_id":345678,"name":"aaa","N":1,"comment":"xxx sample","lat":22,"lng":33},
    {"_id":456789,"name":"aaa","N":1,"comment":"yyy sample","lat":22,"lng":33},
    {"_id":123456,"name":"aaaaaaaaaaaaaaaaaa","N":1,"comment":"zzz sample","lat":22,"lng":33},
    {"_id":11111111,"name":"aaa","N":1,"comment":"zzz sample","lat":22,"lng":33}
]

#you should split judge item and other data 
ids=[data.pop("_id") for data in datas]

operations=[UpdateOne({"_id":idn},{'$set':data},upsert=True) for idn ,data in zip(ids,datas)]

collectionInfo.bulk_write(operations)

My English is very poor, if you can't understand what I say, I'm sorry

Upvotes: 12

El Ruso
El Ruso

Reputation: 15871

Fastest bulk update with Python 3.5+, motor and asyncio:

import asyncio
import datetime
import logging
import random
import time

import motor.motor_asyncio
import pymongo.errors


async def execute_bulk(bulk):
    try:
        await bulk.execute()
    except pymongo.errors.BulkWriteError as err:
        logging.error(err.details)


async def main():
    cnt = 0
    bulk = db.initialize_unordered_bulk_op()
    tasks = []
    async for document in db.find({}, {}, no_cursor_timeout=True):
        cnt += 1
        bulk.find({'_id': document['_id']}).update({'$set': {"random": random.randint(0,10)}})
        if not cnt % 1000:
            task = asyncio.ensure_future(execute_bulk(bulk))
            tasks.append(task)
            bulk = db.initialize_unordered_bulk_op()
    if cnt % 1000:
        task = asyncio.ensure_future(bulk.execute(bulk))
        tasks.append(task)
    logging.info('%s processed', cnt)
    await asyncio.gather(*tasks)


logging.basicConfig(level='INFO')    
db = motor.motor_asyncio.AsyncIOMotorClient()['database']['collection']
start_time = time.time()
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    execution_time = time.time() - start_time
    logging.info('Execution time: %s', datetime.timedelta(seconds=execution_time))

Upvotes: 1

Neil Lunn
Neil Lunn

Reputation: 151122

Modern releases of pymongo ( greater than 3.x ) wrap bulk operations in a consistent interface that downgrades where the server release does not support bulk operations. This is now consistent in MongoDB officially supported drivers.

So the preferred method for coding is to use bulk_write() instead, where you use an UpdateOne other other appropriate operation action instead. And now of course it is preferred to use the natural language lists rather than a specific builder

The direct translation of the old documention:

from pymongo import UpdateOne

operations = [
    UpdateOne({ "field1": 1},{ "$push": { "vals": 1 } },upsert=True),
    UpdateOne({ "field1": 1},{ "$push": { "vals": 2 } },upsert=True),
    UpdateOne({ "field1": 1},{ "$push": { "vals": 3 } },upsert=True)
]

result = collection.bulk_write(operations)

Or the classic document transformation loop:

import random
from pymongo import UpdateOne

random.seed()

operations = []

for doc in collection.find():
    # Set a random number on every document update
    operations.append(
        UpdateOne({ "_id": doc["_id"] },{ "$set": { "random": random.randint(0,10) } })
    )

    # Send once every 1000 in batch
    if ( len(operations) == 1000 ):
        collection.bulk_write(operations,ordered=False)
        operations = []

if ( len(operations) > 0 ):
    collection.bulk_write(operations,ordered=False)

The returned result is of BulkWriteResult which will contain counters of matched and updated documents as well as the returned _id values for any "upserts" that occur.

There is a bit of a misconception about the size of the bulk operations array. The actual request as sent to the server cannot exceed the 16MB BSON limit since that limit also applies to the "request" sent to the server which is using BSON format as well.

However that does not govern the size of the request array that you can build, as the actual operations will only be sent and processed in batches of 1000 anyway. The only real restriction is that those 1000 operation instructions themselves do not actually create a BSON document greater than 16MB. Which is indeed a pretty tall order.

The general concept of bulk methods is "less traffic", as a result of sending many things at once and only dealing with one server response. The reduction of that overhead attached to every single update request saves lots of time.

Upvotes: 60

Bernie Hackett
Bernie Hackett

Reputation: 8909

You can update all documents that match your query spec using multi=True.

There is a bug here about doing a batch of commands the way you want.

Upvotes: 2

user2665694
user2665694

Reputation:

The answer remains the same: no support for bulk upserts.

Upvotes: 2

Related Questions