Reputation: 507
I am using beanie==1.10.1
I want to perform bulk operation with updating multiple documents with upsert=True
. I expect following code to insert full document if find query didn't give results.
I was using this as a reference: https://github.com/roman-right/beanie/blob/main/tests/odm/documents/test_bulk_write.py
Here is full code:
import beanie
import asyncio
import random
from beanie import BulkWriter
from beanie.odm.operators.update.general import Set
from motor.motor_asyncio import AsyncIOMotorClient
class TestDoc(beanie.Document):
a: str
b: int
async def init_mongo():
mongo_client = AsyncIOMotorClient("mongodb://127.0.0.1:27017")
await beanie.init_beanie(
database=mongo_client.db_name, document_models=[TestDoc]
)
async def run_test():
await init_mongo()
docs = [TestDoc(a=f"id_{i}", b=random.randint(1, 100)) for i in range(10)]
async with BulkWriter() as bulk_writer:
for doc in docs:
await TestDoc \
.find_one({TestDoc.a: doc.a}, bulk_writer=bulk_writer) \
.upsert(Set({TestDoc.b: doc.b}), on_insert=doc, bulk_writer=bulk_writer)
# .update_one(Set(doc), bulk_writer=bulk_writer, upsert=True)
read_docs = await TestDoc.find().to_list()
print(f"read_docs: {read_docs}")
if __name__ == '__main__':
pool = asyncio.get_event_loop()
pool.run_until_complete(run_test())
After executing no documents are inserted into db. Not with .upsert()
nor with .update_one()
method. What is correct way to achieve that logic?
With pymongo
such operation would be written like so (and it works):
def write_reviews(self, docs: List[TestDoc]):
operations = []
for doc in docs:
doc_dict = to_dict(doc)
update_operation = pymongo.UpdateOne(
{"a": doc.a}, {"$set": doc_dict}, upsert=True
)
operations.append(update_operation)
result = self.test_collection.bulk_write(operations)
Upvotes: 0
Views: 5591
Reputation: 833
This is old, and you probably figured it out but since its the first result in a google search for me I thought I would answer.
The current way you use bulk_writer is just wrapping the options and then committing them.
from beanie.odm.operators.update.general import Set
async def run_test():
await init_mongo()
docs = [TestDoc(a=f"id_{i}", b=random.randint(1, 100)) for i in range(10)]
async with BulkWriter() as bulk_writer:
for doc in docs:
await TestDoc \
.find_one({TestDoc.a: doc.a}) \
.upsert(Set({TestDoc.b: doc.b}), on_insert=doc)
bulk_writer.commit()
read_docs = await TestDoc.find().to_list()
print(f"read_docs: {read_docs}")
Original Test in Beanie Test Suite
Upvotes: 2