Reputation: 402
I'm generating a JSON
file in JAVA
. The file contains a list of JSONs
. I want to import this file to Azure Cosmos DB
as soon as it is created.
Is there some way to achieve it from Java code?
Thanks in advance!
Upvotes: 1
Views: 2769
Reputation: 23141
According to my research, if we want to implement bulk operations with java, we just can use bulk executor Java library. For more details, please refer to the document and article. Regarding how to use bulk executor Java library, please refer to the document.
For example
.json
file[{
"id": "1",
"name": "test1",
"age": "20"
}, {
"id": "2",
"name": "test2",
"age": "21"
}, {
"id": "3",
"name": "test3",
"age": "22"
}, {
"id": "4",
"name": "test4",
"age": "23"
},
{
"id": "5",
"name": "test5",
"age": "24"
}, {
"id": "6",
"name": "test6",
"age": "25"
}, {
"id": "7",
"name": "test7",
"age": "26"
}, {
"id": "8",
"name": "test8",
"age": "27"
}
]
pom.xml
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>documentdb-bulkexecutor</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
String endpoint="<your cosmos db endpoint>";
String key="<your key>";
ConnectionPolicy connectionPolicy = new ConnectionPolicy();
connectionPolicy.setMaxPoolSize(1000);
DocumentClient client = new DocumentClient(
endpoint,
key,
connectionPolicy,
ConsistencyLevel.Session);
String databaseId="testbulk";
String collectionId="items";
String databaseLink = String.format("/dbs/%s", databaseId);
String collectionLink = String.format("/dbs/%s/colls/%s", "testbulk", collectionId);
ResourceResponse<Database> databaseResponse = null;
Database readDatabase = null;
try {
databaseResponse = client.readDatabase(databaseLink, null);
readDatabase = databaseResponse.getResource();
System.out.println("Database already exists...");
} catch (DocumentClientException dce) {
if (dce.getStatusCode() == 404) {
System.out.println("Attempting to create database since non-existent...");
Database databaseDefinition = new Database();
databaseDefinition.setId(databaseId);
client.createDatabase(databaseDefinition, null);
databaseResponse = client.readDatabase(databaseLink, null);
readDatabase = databaseResponse.getResource();
} else {
throw dce;
}
}
ResourceResponse<DocumentCollection> collectionResponse = null;
DocumentCollection readCollection = null;
try {
collectionResponse = client.readCollection(collectionLink, null);
readCollection = collectionResponse.getResource();
System.out.println("Collection already exists...");
} catch (DocumentClientException dce) {
if (dce.getStatusCode() == 404) {
System.out.println("Attempting to create collection since non-existent...");
DocumentCollection collectionDefinition = new DocumentCollection();
collectionDefinition.setId(collectionId);
PartitionKeyDefinition partitionKeyDefinition = new PartitionKeyDefinition();
Collection<String> paths = new ArrayList<String>();
paths.add("/id");
partitionKeyDefinition.setPaths(paths);
collectionDefinition.setPartitionKey(partitionKeyDefinition);
RequestOptions options = new RequestOptions();
options.setOfferThroughput(1000000);
// create a collection
client.createCollection(databaseLink, collectionDefinition, options);
collectionResponse = client.readCollection(collectionLink, null);
readCollection = collectionResponse.getResource();
} else {
throw dce;
}
}
System.out.println(readCollection.getId());
System.out.println(readDatabase.getId());
ArrayList<String> list = new ArrayList<String>();
JSONParser jsonParser = new JSONParser();
try (FileReader reader = new FileReader("e:\\test.json")) {
//Read JSON file
Object obj = jsonParser.parse(reader);
JSONArray jsonArray = (JSONArray) obj;
System.out.println(jsonArray);
// cast jsonarry to string list
if (jsonArray != null) {
int len = jsonArray.size();
for (int i=0;i<len;i++){
list.add(jsonArray.get(i).toString());
}
}
System.out.println(list.get(0));
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
// Set client's retry options high for initialization
client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(30);
client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(9);
// Builder pattern
DocumentBulkExecutor.Builder bulkExecutorBuilder = DocumentBulkExecutor.builder().from(
client,
databaseId,
collectionId,
readCollection.getPartitionKey(),
20000) ;// throughput you want to allocate for bulk import out of the container's total throughput
// Instantiate DocumentBulkExecutor
try {
DocumentBulkExecutor bulkExecutor = bulkExecutorBuilder.build();
// Set retries to 0 to pass complete control to bulk executor
client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(0);
client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(0);
BulkImportResponse bulkImportResponse = bulkExecutor.importAll(list, false, false, null);
System.out.println(bulkImportResponse.getNumberOfDocumentsImported());
} catch (Exception e) {
e.printStackTrace();
}
Upvotes: 1