Mark Clancy
Mark Clancy

Reputation: 7889

Fastest way to insert 100,000+ records into DocumentDB

As the title suggests, I need to insert 100,000+ records into a DocumentDb collection programatically. The data will be used for creating reports later on. I am using the Azure Documents SDK and a stored procedure for bulk inserting documents (See question Azure documentdb bulk insert using stored procedure).

The following console application shows how I'm inserting documents.

InsertDocuments generates 500 test documents to pass to the stored procedure. The main function calls InsertDocuments 10 times, inserting 5,000 documents overall. Running this application results in 500 documents getting inserted every few seconds. If I increase the number of documents per call I start to get errors and lost documents.

Can anyone recommend a faster way to insert documents?

static void Main(string[] args)
{
    Console.WriteLine("Starting...");

    MainAsync().Wait();
}

static async Task MainAsync()
{
    int campaignId = 1001,
        count = 500;

    for (int i = 0; i < 10; i++)
    {
        await InsertDocuments(campaignId, (count * i) + 1, (count * i) + count);
    }
}

static async Task InsertDocuments(int campaignId, int startId, int endId)
{
    using (DocumentClient client = new DocumentClient(new Uri(documentDbUrl), documentDbKey))
    {
        List<dynamic> items = new List<dynamic>();

        // Create x number of documents to insert
        for (int i = startId; i <= endId; i++)
        {
            var item = new
            {
                id = Guid.NewGuid(),
                campaignId = campaignId,
                userId = i,
                status = "Pending"
            };

            items.Add(item);
        }

        var task = client.ExecuteStoredProcedureAsync<dynamic>("/dbs/default/colls/campaignusers/sprocs/bulkImport", new RequestOptions()
        {
            PartitionKey = new PartitionKey(campaignId)
        },
        new
        {
            items = items
        });

        try
        {
            await task;

            int insertCount = (int)task.Result.Response;

            Console.WriteLine("{0} documents inserted...", insertCount);
        }
        catch (Exception e)
        {
            Console.WriteLine("Error: {0}", e.Message);
        }
    }
}

Upvotes: 25

Views: 39534

Answers (5)

Fabian Nicollier
Fabian Nicollier

Reputation: 2861

Cosmos DB SDK has been updated to allow bulk insert: https://learn.microsoft.com/en-us/azure/cosmos-db/tutorial-sql-api-dotnet-bulk-import via the AllowBulkExecution option.

Upvotes: 3

Aravind Krishna R.
Aravind Krishna R.

Reputation: 8003

The fastest way to insert documents into Azure DocumentDB. is available as a sample on Github: https://github.com/Azure/azure-documentdb-dotnet/tree/master/samples/documentdb-benchmark

The following tips will help you achieve the best througphput using the .NET SDK:

  • Initialize a singleton DocumentClient
  • Use Direct connectivity and TCP protocol (ConnectionMode.Direct and ConnectionProtocol.Tcp)
  • Use 100s of Tasks in parallel (depends on your hardware)
  • Increase the MaxConnectionLimit in the DocumentClient constructor to a high value, say 1000 connections
  • Turn gcServer on
  • Make sure your collection has the appropriate provisioned throughput (and a good partition key)
  • Running in the same Azure region will also help

With 10,000 RU/s, you can insert 100,000 documents in about 50 seconds (approximately 5 request units per write).

With 100,000 RU/s, you can insert in about 5 seconds. You can make this as fast as you want to, by configuring throughput (and for very high # of inserts, spread inserts across multiple VMs/workers)

EDIT: You can now use the bulk executor library at https://learn.microsoft.com/en-us/azure/cosmos-db/bulk-executor-overview, 7/12/19

Upvotes: 34

Glenit
Glenit

Reputation: 116

The Cosmos Db team have just released a bulk import and update SDK, unfortunately only available in Framework 4.5.1 but this apparently does a lot of the heavy lifting for you and maximize use of throughput. see

https://learn.microsoft.com/en-us/azure/cosmos-db/bulk-executor-overview https://learn.microsoft.com/en-us/azure/cosmos-db/sql-api-sdk-bulk-executor-dot-net

Upvotes: 4

Debasis Mondal
Debasis Mondal

Reputation: 1

private async Task<T> ExecuteDataUpload<T>(IEnumerable<object> data,PartitionKey partitionKey)
    {
        using (var client = new DocumentClient(m_endPointUrl, m_authKey, connPol))
            {

                while (true)
                {
                    try
                    {
                        var result = await client.ExecuteStoredProcedureAsync<T>(m_spSelfLink, new RequestOptions { PartitionKey = partitionKey }, data);
                        return result;
                    }
                    catch (DocumentClientException ex)
                    {
                        if (429 == (int)ex.StatusCode)
                        {
                            Thread.Sleep(ex.RetryAfter);
                            continue;
                        }
                        if (HttpStatusCode.RequestTimeout == ex.StatusCode)
                        {
                            Thread.Sleep(ex.RetryAfter);
                            continue;
                        }
                        throw ex;
                    }
                    catch (Exception)
                    {
                        Thread.Sleep(TimeSpan.FromSeconds(1));
                        continue;
                    }
                }
            }
        }





public async Task uploadData(IEnumerable<object> data, string partitionKey)
        {
            int groupSize = 600;
            int dataSize = data.Count();
            int chunkSize = dataSize > groupSize ? groupSize : dataSize;
            List<Task> uploadTasks = new List<Task>();
            while (dataSize > 0)
            {
                IEnumerable<object> chunkData = data.Take(chunkSize);
                object[] taskData = new object[3];
                taskData[0] = chunkData;
                taskData[1] = chunkSize;
                taskData[2] = partitionKey;     
                uploadTasks.Add(Task.Factory.StartNew(async (arg) =>
                {
                    object[] reqdData = (object[])arg;
                    int chunkSizes = (int)reqdData[1];
                    IEnumerable<object> chunkDatas = (IEnumerable<object>)reqdData[0];
                    var partKey = new PartitionKey((string)reqdData[2]);
                    int chunkDatasCount = chunkDatas.Count();
                    while (chunkDatasCount > 0)
                    {
                        int insertedCount = await ExecuteDataUpload<int>(chunkDatas, partKey);
                        chunkDatas = chunkDatas.Skip(insertedCount);
                        chunkDatasCount = chunkDatasCount - insertedCount;
                    }
                }, taskData));
                data = data.Skip(chunkSize);

                dataSize = dataSize - chunkSize;
                chunkSize = dataSize > groupSize ? groupSize : dataSize;
            }
            await Task.WhenAll(uploadTasks);
        }

Now call the uploadData in parallel with list of objects you want to upload. Just keep one thing in mind send data of like Partitionkey only.

Upvotes: 0

donald
donald

Reputation: 478

Other aproach is stored procedure as mentioned by other people . Stored procedure requires partitioning key. Also stored procedure should end within 4sec as per documentation otherwise all records will rollback. See code below using python azure documentdb sdk and javascript based stored procedure. I have modified the script and resolved lot of error below code is working fine:-

function bulkimport2(docObject) {
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();

// The count of imported docs, also used as current doc index.
var count = 0;

getContext().getResponse().setBody(docObject.items);
//return

// Validate input.
//if (!docObject.items || !docObject.items.length) getContext().getResponse().setBody(docObject);
docObject.items=JSON.stringify(docObject.items)
docObject.items = docObject.items.replace("\\\\r", "");
docObject.items = docObject.items.replace("\\\\n", "");
var docs = JSON.parse(docObject.items);
var docsLength = docObject.items.length;
if (docsLength == 0) {
    getContext().getResponse().setBody(0);
    return;
}

// Call the CRUD API to create a document.
tryCreate(docs[count], callback, collectionLink,count);

// Note that there are 2 exit conditions:
// 1) The createDocument request was not accepted.
//    In this case the callback will not be called, we just call setBody and we are done.
// 2) The callback was called docs.length times.
//    In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done.
function tryCreate(doc, callback, collectionLink,count ) {
    doc=JSON.stringify(doc);
    if (typeof doc == "undefined") {
        getContext().getResponse().setBody(count);
        return ;
        } else {
        doc = doc.replace("\\r", "");
        doc = doc.replace("\\n", "");
        doc=JSON.parse(doc);
       }

    getContext().getResponse().setBody(doc);

    var isAccepted = collection.upsertDocument(collectionLink, doc, callback);

    // If the request was accepted, callback will be called.
    // Otherwise report current count back to the client, 
    // which will call the script again with remaining set of docs.
    // This condition will happen when this stored procedure has been running too long
    // and is about to get cancelled by the server. This will allow the calling client
    // to resume this batch from the point we got to before isAccepted was set to false
    if (!isAccepted) {
        getContext().getResponse().setBody(count);
     }
}

// This is called when collection.createDocument is done and the document has been persisted.
function callback(err, doc, options) {
    if (err) throw getContext().getResponse().setBody(err + doc);

    // One more document has been inserted, increment the count.
    count++;

    if (count >= docsLength) {
        // If we have created all documents, we are done. Just set the response.
        getContext().getResponse().setBody(count);
        return ;
    } else {
        // Create next document.
        tryCreate(docs[count], callback,  collectionLink,count);
    }
}

}

EDIT:- getContext().getResponse().setBody(count); return ; //when all records are processed completely.

python script to load stored procedure and do batch import

# Initialize the Python DocumentDB client
client = document_client.DocumentClient(config['ENDPOINT'], {'masterKey': config['MASTERKEY'] ,'DisableSSLVerification' : 'true'  })

# Create a database
#db = client.CreateDatabase({ 'id': config['DOCUMENTDB_DATABASE'] })
db=client.ReadDatabases({ 'id': 'db2' })
print(db)
# Create collection options
options = {
    'offerEnableRUPerMinuteThroughput': True,
    'offerVersion': "V2",
    'offerThroughput': 400
}

# Create a collection
#collection = client.CreateCollection('dbs/db2' , { 'id': 'coll2'}, options)

#collection = client.CreateCollection({ 'id':'db2'},{ 'id': 'coll2'}, options)

database_link = 'dbs/db2' 
collection_link = database_link + '/colls/coll2'
"""
#List collections
collection = client.ReadCollection(collection_link)

print(collection)
print('Databases:')

databases = list(client.ReadDatabases())

if not databases:
    print('No Databases:')

for database in databases:
    print(database['id']) 
"""

# Create some documents
"""
document1 = client.CreateDocument(collection['_self'],
    { 

        'Web Site': 0,
        'Cloud Service': 0,
        'Virtual Machine': 0,
        'name': 'some' 
    })

document2 = client.CreateDocument(collection['_self'],
    { 

        'Web Site': 1,
        'Cloud Service': 0,
        'Virtual Machine': 0,
        'name': 'some' 
    })
"""

# Query them in SQL
"""
query = { 'query': 'SELECT * FROM server s' }    

options = {} 
options['enableCrossPartitionQuery'] = True
options['maxItemCount'] = 20

#result_iterable = client.QueryDocuments(collection['_self'], query, options)

result_iterable = client.QueryDocuments(collection_link, query, options)


results = list(result_iterable);

print(results)
"""



##How to store procedure and use it
"""
sproc3 = {
        'id': 'storedProcedure2',
        'body': (
            'function (input) {' +
                '  getContext().getResponse().setBody(' +
                '      \'a\' + input.temp);' +
            '}')
    }
retrieved_sproc3 = client.CreateStoredProcedure(collection_link,sproc3)

result = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/storedProcedure3',{'temp': 'so'})
"""


## delete all records in collection
"""
result = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkDeleteSproc',"SELECT * FROM c ORDER BY c._ts DESC ")
print(result)

"""


multiplerecords="""[{
     "Virtual Machine": 0,
     "name": "some",
     "Web Site": 0,
     "Cloud Service": 0
}, 
{     
     "Virtual Machine": 0,
     "name": "some",
     "Web Site": 1,
     "Cloud Service": 0
}]"""

multiplerecords=json.loads(multiplerecords)

print(multiplerecords)
print(str(json.dumps(json.dumps(multiplerecords).encode('utf8'))))

#bulkloadresult = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkImport',json.dumps(multiplerecords).encode('utf8'))

#bulkloadresult = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkImport',json.dumps(json.loads(r'{"items": [{"name":"John","age":30,"city":"New York"},{"name":"John","age":30,"city":"New York"}]}')).encode('utf8'))



str1='{name":"John","age":30,"city":"New York","PartitionKey" : "Morisplane"}'


str2='{name":"John","age":30,"city":"New York","partitionKey" : "Morisplane"}'


key1=base64.b64encode(str1.encode("utf-8"))
key2=base64.b64encode(str2.encode("utf-8"))

data= {"items":[{"id": key1 ,"name":"John","age":30,"city":"Morisplane","PartitionKey" : "Morisplane" },{"id":  key2,"name":"John","age":30,"city":"Morisplane","partitionKey" : "Morisplane"}] , "city": "Morisplane", "partitionKey" : "Morisplane"} 

print(repr(data))

#retrieved_sproc3 =client.DeleteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkimport2')

sproc3 = {
        'id': 'bulkimport2',
        'body': (
          """function bulkimport2(docObject) {
    var collection = getContext().getCollection();
    var collectionLink = collection.getSelfLink();

    // The count of imported docs, also used as current doc index.
    var count = 0;

    getContext().getResponse().setBody(docObject.items);
    //return

    // Validate input.
    //if (!docObject.items || !docObject.items.length) getContext().getResponse().setBody(docObject);
    docObject.items=JSON.stringify(docObject.items)
    docObject.items = docObject.items.replace("\\\\r", "");
    docObject.items = docObject.items.replace("\\\\n", "");
    var docs = JSON.parse(docObject.items);
    var docsLength = docObject.items.length;
    if (docsLength == 0) {
        getContext().getResponse().setBody(0);
        return;
    }

    // Call the CRUD API to create a document.
    tryCreate(docs[count], callback, collectionLink,count);

    // Note that there are 2 exit conditions:
    // 1) The createDocument request was not accepted.
    //    In this case the callback will not be called, we just call setBody and we are done.
    // 2) The callback was called docs.length times.
    //    In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done.
    function tryCreate(doc, callback, collectionLink,count ) {
        doc=JSON.stringify(doc);
        if (typeof doc == "undefined") {
            getContext().getResponse().setBody(count);
            return ;
            } else {
            doc = doc.replace("\\r", "");
            doc = doc.replace("\\n", "");
            doc=JSON.parse(doc);
           }

        getContext().getResponse().setBody(doc);
        return
        var isAccepted = collection.upsertDocument(collectionLink, doc, callback);

        // If the request was accepted, callback will be called.
        // Otherwise report current count back to the client, 
        // which will call the script again with remaining set of docs.
        // This condition will happen when this stored procedure has been running too long
        // and is about to get cancelled by the server. This will allow the calling client
        // to resume this batch from the point we got to before isAccepted was set to false
        if (!isAccepted) {
            getContext().getResponse().setBody(count);
         }
    }

    // This is called when collection.createDocument is done and the document has been persisted.
    function callback(err, doc, options) {
        if (err) throw getContext().getResponse().setBody(err + doc);

        // One more document has been inserted, increment the count.
        count++;

        if (count >= docsLength) {
            // If we have created all documents, we are done. Just set the response.
            getContext().getResponse().setBody(count);
            return ;
        } else {
            // Create next document.
            tryCreate(docs[count], callback,  collectionLink,count);
        }
    }
}"""

            )
    }

#retrieved_sproc3 = client.CreateStoredProcedure(collection_link,sproc3)

bulkloadresult = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkimport2', data  , {"partitionKey" : "Morisplane"}  )


print(repr(bulkloadresult))

Upvotes: 0

Related Questions