Reputation: 23
we have a requirement to add few columns to the Json data in Cosmos DB, but struggling with the complex data type (Address). Can some one guide me how I can add new column and transform data, as we have huge data(millions of records) to transform.
Here is sample JSON data Request
{
"id": "108-406-004",
"Title": "Mr",
"FirstName": "John",
"LastName": "Smith",
"Addresses": [
{
"Line1": "5 London Road",
"Country": "UNITED KINGDOM"
}
]
}
I need to add new column("Line1LowerCase") to the Address array and set the value as lower(Line1), so the output should be like this:
{
"id": "108-406-004",
"Title": "Mr",
"FirstName": "John",
"LastName": "Smith",
"Addresses": [
{
"Line1": "5 London Road",
"Country": "UNITED KINGDOM",
**"Line1LowerCase": "5 london road"**
}
]
}
Here is what I tried so far, but the Address array is coming like this
"Addresses": {
"Line1": [
"5 London Road"
],
"Country": [
"UNITED KINGDOM"
],
"Addressline1Internal": "[\"5 london road\"]",
}
Used Data factory to import data from Cosmos db dataset, added the derived column step to transform the json data and Sink the result to Cosmos DB dataset.
Upvotes: 0
Views: 4060
Reputation: 94
If you are trying to update the ID "108-406-004" to add a new property, you can use a copy activity with the option "Write behaviour" as "Upsert". Using this option will automatically add the new property to the record in Cosmos DB.
Here is how I tested your case:
Also, there is no mapping in the source and sink datasets:
Source Code:
{
"name": "pipeline3",
"properties": {
"activities": [
{
"name": "Copy data1",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "JsonSource",
"storeSettings": {
"type": "AzureBlobStorageReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "JsonReadSettings"
}
},
"sink": {
"type": "CosmosDbSqlApiSink",
"writeBehavior": "upsert",
"disableMetricsCollection": false
},
"enableStaging": false
},
"inputs": [
{
"referenceName": "LandingJson",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "CosmosDbSqlApiCollection1",
"type": "DatasetReference"
}
]
}
],
"annotations": []
}
}
How to transform the value to lowercase
Data Factory doesn't allow you to transform the property in a copy activity. For that you can use a Data Flow and then call this data flow from your pipeline.
Here is an example of a Data Flow to replace that simple copy activity:
Source from Json
Select to pull the properties you need
Derived Column to transform lowercase
Alter step to allow upsert
Sink into Cosmos DB
Source Code:
{
"name": "dataflow3",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "LandingJson",
"type": "DatasetReference"
},
"name": "SourceJson"
}
],
"sinks": [
{
"dataset": {
"referenceName": "CosmosDbSqlApiCollection1",
"type": "DatasetReference"
},
"name": "SinkCosmos"
}
],
"transformations": [
{
"name": "Select"
},
{
"name": "DerivedColumn"
},
{
"name": "AlterRow"
}
],
"script": "source(output(\n\t\tid as string,\n\t\tTitle as string,\n\t\tFirstName as string,\n\t\tLastName as string,\n\t\tAddresses as (Line1 as string, Country as string)[]\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tignoreNoFilesFound: false,\n\tdocumentForm: 'arrayOfDocuments') ~> SourceJson\nSourceJson select(mapColumn(\n\t\tid,\n\t\tTitle,\n\t\tFirstName,\n\t\tLastName,\n\t\tLine1 = Addresses[1].Line1\n\t),\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> Select\nSelect derive(Line1LowerCase = lower(Line1)) ~> DerivedColumn\nDerivedColumn alterRow(upsertIf(true())) ~> AlterRow\nAlterRow sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tdeletable:false,\n\tinsertable:false,\n\tupdateable:false,\n\tupsertable:true,\n\tformat: 'document',\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> SinkCosmos"
}
}
}
Upvotes: 1