ashmit-001
ashmit-001

Reputation: 402

Bulk Upload/Import of JSON files to Azure Cosmos DB from Java Code

I'm generating a JSON file in JAVA. The file contains a list of JSONs. I want to import this file to Azure Cosmos DB as soon as it is created.

Is there some way to achieve it from Java code?

Thanks in advance!

Upvotes: 1

Views: 2769

Answers (1)

Jim Xu
Jim Xu

Reputation: 23141

According to my research, if we want to implement bulk operations with java, we just can use bulk executor Java library. For more details, please refer to the document and article. Regarding how to use bulk executor Java library, please refer to the document.

For example

  1. My .json file
[{
        "id": "1",
        "name": "test1",
        "age": "20"
    }, {
        "id": "2",
        "name": "test2",
        "age": "21"
    }, {
        "id": "3",
        "name": "test3",
        "age": "22"
    }, {
        "id": "4",
        "name": "test4",
        "age": "23"
    },
    {
        "id": "5",
        "name": "test5",
        "age": "24"
    }, {
        "id": "6",
        "name": "test6",
        "age": "25"
    }, {
        "id": "7",
        "name": "test7",
        "age": "26"
    }, {
        "id": "8",
        "name": "test8",
        "age": "27"
    }
]

  1. My pom.xml
<dependency>
      <groupId>com.microsoft.azure</groupId>
      <artifactId>documentdb-bulkexecutor</artifactId>
      <version>2.6.0</version>
    </dependency>

    <dependency>
      <groupId>com.googlecode.json-simple</groupId>
      <artifactId>json-simple</artifactId>
      <version>1.1.1</version>
    </dependency>
  1. Code
 String endpoint="<your cosmos db endpoint>";
        String key="<your key>";
        ConnectionPolicy connectionPolicy = new ConnectionPolicy();
        connectionPolicy.setMaxPoolSize(1000);
        DocumentClient client = new DocumentClient(
                endpoint,
                key,
                connectionPolicy,
                ConsistencyLevel.Session);
        String databaseId="testbulk";
        String collectionId="items";
        String databaseLink = String.format("/dbs/%s", databaseId);
        String collectionLink = String.format("/dbs/%s/colls/%s", "testbulk", collectionId);

        ResourceResponse<Database> databaseResponse = null;
        Database readDatabase = null;
        try {
            databaseResponse = client.readDatabase(databaseLink, null);
            readDatabase = databaseResponse.getResource();

            System.out.println("Database already exists...");

        } catch (DocumentClientException dce) {
            if (dce.getStatusCode() == 404) {
                System.out.println("Attempting to create database since non-existent...");

                Database databaseDefinition = new Database();
                databaseDefinition.setId(databaseId);


                    client.createDatabase(databaseDefinition, null);


                databaseResponse = client.readDatabase(databaseLink, null);
                readDatabase = databaseResponse.getResource();
            } else {
                throw dce;
            }
        }

        ResourceResponse<DocumentCollection> collectionResponse = null;
        DocumentCollection readCollection = null;

        try {
            collectionResponse = client.readCollection(collectionLink, null);
            readCollection = collectionResponse.getResource();

            System.out.println("Collection already exists...");
        } catch (DocumentClientException dce) {
            if (dce.getStatusCode() == 404) {
                System.out.println("Attempting to create collection since non-existent...");

                DocumentCollection collectionDefinition = new DocumentCollection();
                collectionDefinition.setId(collectionId);

                PartitionKeyDefinition partitionKeyDefinition = new PartitionKeyDefinition();
                Collection<String> paths = new ArrayList<String>();
                paths.add("/id");
                partitionKeyDefinition.setPaths(paths);
                collectionDefinition.setPartitionKey(partitionKeyDefinition);

                RequestOptions options = new RequestOptions();
                options.setOfferThroughput(1000000);

                // create a collection
                client.createCollection(databaseLink, collectionDefinition, options);

                collectionResponse = client.readCollection(collectionLink, null);
                readCollection = collectionResponse.getResource();
            } else {
                throw dce;
            }
        }

        System.out.println(readCollection.getId());
        System.out.println(readDatabase.getId());

        ArrayList<String> list = new ArrayList<String>();
        JSONParser jsonParser = new JSONParser();
        try (FileReader reader = new FileReader("e:\\test.json")) {

            //Read JSON file
            Object obj = jsonParser.parse(reader);

            JSONArray jsonArray  = (JSONArray) obj;
            System.out.println(jsonArray);
            // cast jsonarry to string list
            if (jsonArray  != null) {
                int len = jsonArray.size();
                for (int i=0;i<len;i++){
                    list.add(jsonArray.get(i).toString());
                }
            }
            System.out.println(list.get(0));

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ParseException e) {
            e.printStackTrace();
        }
        // Set client's retry options high for initialization
        client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(30);
        client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(9);

       // Builder pattern
        DocumentBulkExecutor.Builder bulkExecutorBuilder = DocumentBulkExecutor.builder().from(
                client,
                databaseId,
                collectionId,
                readCollection.getPartitionKey(),
                20000) ;// throughput you want to allocate for bulk import out of the container's total throughput

         // Instantiate DocumentBulkExecutor
        try {
            DocumentBulkExecutor bulkExecutor = bulkExecutorBuilder.build();
            // Set retries to 0 to pass complete control to bulk executor
            client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(0);
            client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(0);
            BulkImportResponse bulkImportResponse = bulkExecutor.importAll(list, false, false, null);
            System.out.println(bulkImportResponse.getNumberOfDocumentsImported());
        } catch (Exception e) {
            e.printStackTrace();
        }

enter image description here enter image description here

Upvotes: 1

Related Questions