RIRAY
RIRAY

Reputation: 62

Bulk upsert stored procedure

If anyone can help, I'm new to cosmos and JS. I have a requirement where data to be upserted in bulk and I found JavaScript for that, but that is failing.

Trying to call a stored procedure from python as:

client.ExecuteStoredProcedure(sproc_linkOut, [new_docs,True], options = options) 

or

client.ExecuteStoredProcedure(sproc_linkOut, [new_docs], options = options)

but I'm getting an error show below. But I think it should not arise as error 409 this code is written. Can you please help.

HTTPFailure: Status code: 400 Sub-status: 409
{"code":"BadRequest",
"message":"Message: {"Errors":["Encountered exception while executing function. Exception = Error: {\"Errors\":[\"Resource with specified id or name already exists.\"]}\r\nStack trace: Error: {\"Errors\":[\"Resource with specified id or name already exists.\"]}.

JavaScript stored procedure:

//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

function bulkImport(docs, upsert) {
    var collection = getContext().getCollection();
    var collectionLink = collection.getSelfLink();

    // The count of imported docs, also used as current doc index.
    var count = 0;
    var errorCodes = { CONFLICT: 409 };

    // Validate input.
    if (!docs) throw new Error("The array is undefined or null.");

    var docsLength = docs.length;

    if (docsLength == 0) {
        getContext().getResponse().setBody(0);
        return;
    }

    // Call the create API to create a document.
    tryCreate(docs[count], callback);

    // Note that there are 2 exit conditions:
    // 1) The createDocument request was not accepted.
    // In this case the callback will not be called, we just call
    // setBody and we are done.
    // 2) The callback was called docs.length times.
    // In this case all documents were created and we don’t need to call
    // tryCreate anymore. Just call setBody and we are done.
    function tryCreate(doc, callback) {
        var isAccepted = collection.createDocument(collectionLink, doc, { disableAutomaticIdGeneration : true}, callback);

    // If the request was accepted, callback will be called.
    // Otherwise report current count back to the client,
    // which will call the script again with remaining set of docs.
    if (!isAccepted) getContext().getResponse().setBody(count); 
}

// To replace the document, first issue a query to find it and then call replace.
function tryReplace(doc, callback) {
    var parsedDoc = JSON.parse(doc);
    retrieveDoc(parsedDoc, null, function(retrievedDocs){
        var isAccepted = collection.replaceDocument(retrievedDocs[0]._self, parsedDoc, callback);
        if (!isAccepted) getContext().getResponse().setBody(count);
    });
}

function retrieveDoc(doc, continuation, callback) {
    var query = "select * from root r where r.id = '" + doc.id + "'";
    var requestOptions = { continuation : continuation }; 
    var isAccepted = collection.queryDocuments(collectionLink, query, requestOptions, function(err, retrievedDocs, responseOptions) {
        if (err) throw err;

        if (retrievedDocs.length > 0) {
            callback(retrievedDocs);
        } else if (responseOptions.continuation) {
            retrieveDoc(doc, responseOptions.continuation, callback);            
        } else {
            throw "Error in retrieving document: " + doc.id;
        }
    });

    if (!isAccepted) getContext().getResponse().setBody(count); 
}

// This is called when collection.createDocument is done in order to
// process the result.
function callback(err, doc, options) {
    if (err) {
        // Replace the document if status code is 409 and upsert is enabled
        if(upsert && err.number == errorCodes.CONFLICT) {
            return tryReplace(docs[count], callback);
        } else {
            throw err;
        }
    }

       // One more document has been inserted, increment the count.
      count++;
      if (count >= docsLength) {
        // If we created all documents, we are done. Just set the response.
        getContext().getResponse().setBody(count); 
      } else {
        // Create next document.
        tryCreate(docs[count], callback);
      } 
}

As stored procedure gets two parameters, I tried with earlier mentioned both ways, but failed.


Update Code:

print (param)
[[{'ACCOUNT': '1', 'id': '1', 'CASE_N': 'AB'}], True]

client.ExecuteStoredProcedure(sproc_link, [param],options=options)

Still get 400 error:

400 Sub-status: 400 {"code":"BadRequest","message":"Message: {\"Errors\":[\"Encountered exception while executing function. Exception = Error: {\\"Errors\\":[\\"One of the specified inputs is invalid\\"]}\r\nStack trace: Error: {\\"Errors\\":[\\"One of the specified inputs is invalid\\"]}\n

Upvotes: 2

Views: 1296

Answers (2)

RIRAY
RIRAY

Reputation: 62

I have modified the stored proc, 1. removed upsert input parameter and make it hard coded, 2. removed line var parsedDoc = JSON.parse(doc); with var parsedDoc = doc;

Now procedure working fine.

Upvotes: 1

Jay Gong
Jay Gong

Reputation: 23782

I assume that you follow this code sample and this code works fine.Based on your error log, i believe your execution never jumps into the upsert branch. Possibly you passed the parameters into SP incorrectly.

No modification for bulkimport SP, for your code instead.

from pydocumentdb import document_client

endpoint = "https://***.documents.azure.com:443/";
primaryKey = "***";

client = document_client.DocumentClient(endpoint, {'masterKey': primaryKey})

options = {"partitionKey": "A"}

def create_cosmos_entity(jobid):
    return {
        'JobID': jobid,
        'id': jobid,
        "name": "A"
    }
bulkdos = []
bulkdos.append(create_cosmos_entity('1'))

param = []
param.append(bulkdos)
param.append(True)

sproc_link = "dbs/db/colls/coll/sprocs/bulkimport"
q = client.ExecuteStoredProcedure(sproc_link, [param], options)

You could refer to this thread:DocumentDB: bulkImport Stored Proc - Getting 400 Error on Array/JSON issue

The "docs" must be an array of array of params, otherwise, the procedure executor will treat them as multiple params of the procedure, not a single-array-param.

Upvotes: 0

Related Questions