Alessio Innocenzi
Alessio Innocenzi

Reputation: 461

C# - Cosmos DB bulk upsert

I have an Azure function triggered by a timer in which I want to update documents inside CosmosDB. Now I'm using the function UpdateOneAsync with option IsUpsert = true to make the update (or insert if the document doesn't exist).

However I'm doing the update operation inside a foreach loop, therefore an update operation is performed foreach item. How can I do a bulk update (upsert), performing just one operation after the foreach loop finishes?

Here it is my code right now:

foreach (var group in GetGroups(date, time, hour))
{
    dic = new MyDictionary<string>();

    //... some operations

    List<BsonElement> documents = new List<BsonElement>();
    documents.Add(new BsonElement("$inc", new BsonDocument(dic)));
    documents.Add(new BsonElement("$set", new BsonDocument(new Dictionary<string, string>() { { "c", key }, { "d", date } })));

    var doc = clicksDoc.UpdateOneAsync(t => t["_id"] == "c-" + key + "-" + date, new BsonDocument(documents), new UpdateOptions() { IsUpsert = true }).Result;
}

Instead I'd like to perform just one update after the loop. How can I do that?

Upvotes: 5

Views: 13303

Answers (2)

Amy Barrett
Amy Barrett

Reputation: 614

2020 answer

Bulk support has been added to the .NET SDK:
Introducing Bulk support in the .NET SDK

To use it, first enable bulk execution when you create your client:

CosmosClient client = new CosmosClientBuilder(options.Value.ConnectionString)
    .WithConnectionModeDirect()
    .WithBulkExecution(true)
    .Build();

Then get your container as normal:

Container container = client.GetContainer("databaseName", "containerName");

Then do your bulk operation, e.g. upsert:

public async Task BulkUpsert(List<SomeItem> items)
{
    var concurrentTasks = new List<Task>();

    foreach (SomeItem item in items)
    {
        concurrentTasks.Add(container.UpsertItemAsync(item, new PartitionKey(item.PartitionKeyField)));
    }

    await Task.WhenAll(concurrentTasks);
}

Upvotes: 12

Sajeetharan
Sajeetharan

Reputation: 222582

You can use the method BulkUpdateAsync from the BulkExecutor,

List<UpdateItem> updateList = initialDocuments.Select(d =>
                new UpdateItem(
                    d.id,
                    d.AccountNumber,
                    new List<UpdateOperation> {
                        new SetUpdateOperation<string>(
                            "NewSimpleProperty", 
                            "New Property Value"),
                        new SetUpdateOperation<dynamic>(
                            "NewComplexProperty", 
                            new {
                                prop1 = "Hello",
                                prop2 = "World!"
                            }),
                        new UnsetUpdateOperation(nameof(FakeOrder.DocumentIndex)),
                    }))
                .ToList();

            var updateSetResult = BulkUpdatetDocuments(_database, _collection, updateList).GetAwaiter().GetResult();

and

var executor = new BulkExecutor(_documentClient, collectionResource);
await executor.InitializeAsync();
return await executor.BulkUpdateAsync(updates);

SAMPLE

Upvotes: 1

Related Questions