ppatus
ppatus

Reputation: 21

MongoDB - Update different arrays simultaneously with update_many()

First, some background.
I have a function in Python which consults an external API to retrieve some information associated with an ID. Such function takes as argument an ID and it returns a list of numbers (they correspond to some metadata associated with such ID).
For example, let us introduce in such function the IDs {0001, 0002, 0003}. Let's say that the function returns for each ID the following arrays:

0001 → [45,70,20]  
0002 → [20,10,30,45]  
0003 → [10,45]

My goal is to implement a collection which structures data as so:

{
    "_id":45,
    "list":[0001,0002,0003]
},
{
    "_id":70,
    "list":[0001]
},
{
    "_id":20,
    "list":[0001,0002]
},
{
    "_id":10,
    "list":[0002,0003]
},
{
    "_id":30,
    "list":[0002]
}

As it can be seen, I want my collection to index the information by the metadata itself. With this structure, the document with $_id "45" contains a list with all the IDs that have metadata 45 associated. This way I can retrieve with a single request to the collection all IDs mapped to a particular metadata value.
The class method in charge of inserting IDs and metadata in the collection is the following:

def add_entries(self,id,metadataVector):
    start = time.time()
    id=int(id)
    
    for data in metadataVector:
        self.SegmentDB.update_one(
            filter = {"_id":data},
            update = {"$addToSet":{"list":id}},
            upsert = True
        )


    end = time.time()
    duration = end-start
    return duration

metadataVector is the list which contains all metadata (integers) associated to a given ID (i.e.:[45,70,20]).
id is the ID associated to the metadata in metadataVector. (i.e.:0001).
This method currently iterates through the list and performs an operation for every element (every metadata) on the list. This method implements the collection I desire: it updates the document whose "_id" is a given metadata and adds to its corresponding list the ID from which such metadata originated (if such document doesn't exist yet, it inserts it - that's what upsert = true is all for).
However, this implementation ends up being somewhat slow on the long run. metadataVector usually has around 1000-3000 items for each ID (metainformation integers which can range in 800 - 23000000), and I have around 40000 IDs to analyze. As a result, the collection grows quickly. At the moment, I have around 3.2m documents in the collection (one specifically dedicated to each individual metadata integer). I would like to implement a faster solution; if possible, I would like to insert all metadata in one only DB request instead of calling an update for each item in metadataVector individually. I tried this approach but it doesn't seem to work as I intended:

def add_entries(self,id,metadataVector):           
    start = time.time()                        
    id=int(id)                                 
                                                   
    self.SegmentDB.update_many(                
        filter={"_id": {"$in":metadataVector}},
        update={"$addToSet":{"list":id}},      
        upsert = True          
    )                                          
                                                   
        end = time.time()                          
        duration = end-start                       
        return duration

                        

I tried using update_many (as it seemed the natural approach to tackle the problem) specifying a filter which, to my understanding, states "any document whose _id is in metadataVector". In this way, all documents involved would add to the list the originating ID (or the document would be created if it didn't exist due to the Upsert condition) but instead the collection ends up being filled with documents containing a single element in the list and an ObjectId() _id. Picture showing the final result.

Is there a way to implement what I want? Should I restructure the DB differently all together?

Thanks a lot in advance!

Upvotes: 2

Views: 292

Answers (1)

prasad_
prasad_

Reputation: 14317

Here is an example, and it uses Bulk Write operations. Bulk operations submits multiple inserts, updates, deletes (can be a combination) as a single call to the database and returns a result. This is more efficient than multiple single calls to the database.

Scenario 1:

Input: 3 -> [10, 45]

def some_fn(id):
    # id = 3; and after some process... returns a dictionary
    return { 10: 3, 45: 3, }

Scenario 2:

Input (as a list):

3 -> [10, 45]
1 -> [45, 70, 20]

def some_fn(ids):
    # ids are 1 and 3; and after some process... returns a dictionary
    return { 10: [ 3 ], 45: [ 3, 1 ], 20: [ 1 ], 70: [ 1 ] }

Perform Bulk Write

Now, perform the bulk operation on the database using the returned value from some_fn.

data = some_fn(id) # or some_fn(ids)

requests = []
for k, v in data.items():
    op = UpdateOne({ '_id': k }, { '$push': { 'list': { '$each': v }}}, upsert=True)
    requests.append(op)

result = db.collection.bulk_write(requests, ordered=False)

Note the ordered=False - this option is used for, again, better performance as writes can happen in parallel.

References:

Upvotes: 2

Related Questions