user1574155
user1574155

Reputation: 23

Add new column to Cosmos DB Json data using azure data factory

we have a requirement to add few columns to the Json data in Cosmos DB, but struggling with the complex data type (Address). Can some one guide me how I can add new column and transform data, as we have huge data(millions of records) to transform.

Here is sample JSON data Request

{
    "id": "108-406-004",
    "Title": "Mr",
    "FirstName": "John",
    "LastName": "Smith",
    "Addresses": [
        {
        "Line1": "5 London Road",
        "Country": "UNITED KINGDOM"
        }
    ]
}

I need to add new column("Line1LowerCase") to the Address array and set the value as lower(Line1), so the output should be like this:

{
    "id": "108-406-004",
    "Title": "Mr",
    "FirstName": "John",
    "LastName": "Smith",
    "Addresses": [
        {
        "Line1": "5 London Road",
        "Country": "UNITED KINGDOM",
        **"Line1LowerCase": "5 london road"**
        }
    ]
}

Here is what I tried so far, but the Address array is coming like this 
"Addresses": {
        "Line1": [
            "5 London Road"
        ],
        "Country": [
            "UNITED KINGDOM"
        ],
        "Addressline1Internal": "[\"5 london road\"]",
    }

Used Data factory to import data from Cosmos db dataset, added the derived column step to transform the json data and Sink the result to Cosmos DB dataset.

Upvotes: 0

Views: 4060

Answers (1)

Diego Eick Moreira
Diego Eick Moreira

Reputation: 94

If you are trying to update the ID "108-406-004" to add a new property, you can use a copy activity with the option "Write behaviour" as "Upsert". Using this option will automatically add the new property to the record in Cosmos DB.

Write behaviour

Here is how I tested your case:

  1. My cosmos DB had already a record for your first json example (without the property "Line1LowerCase".

before

  1. New copy activity create with the option "Upsert" and without any mapping in the copy activity.

final result

  1. After running the copy activity using the sson with the new property it adds this new property to cosmos automatically:

copy dataset

Also, there is no mapping in the source and sink datasets: source dataset

sink dataset

Source Code:

{
    "name": "pipeline3",
    "properties": {
        "activities": [
            {
                "name": "Copy data1",
                "type": "Copy",
                "dependsOn": [],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "JsonSource",
                        "storeSettings": {
                            "type": "AzureBlobStorageReadSettings",
                            "recursive": true,
                            "enablePartitionDiscovery": false
                        },
                        "formatSettings": {
                            "type": "JsonReadSettings"
                        }
                    },
                    "sink": {
                        "type": "CosmosDbSqlApiSink",
                        "writeBehavior": "upsert",
                        "disableMetricsCollection": false
                    },
                    "enableStaging": false
                },
                "inputs": [
                    {
                        "referenceName": "LandingJson",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "CosmosDbSqlApiCollection1",
                        "type": "DatasetReference"
                    }
                ]
            }
        ],
        "annotations": []
    }
}

How to transform the value to lowercase

Data Factory doesn't allow you to transform the property in a copy activity. For that you can use a Data Flow and then call this data flow from your pipeline.

Here is an example of a Data Flow to replace that simple copy activity:

Source from Json

source

Select to pull the properties you need

select 1

Derived Column to transform lowercase

derived

Alter step to allow upsert

alter

Sink into Cosmos DB

sink

Source Code:

{
    "name": "dataflow3",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "LandingJson",
                        "type": "DatasetReference"
                    },
                    "name": "SourceJson"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "CosmosDbSqlApiCollection1",
                        "type": "DatasetReference"
                    },
                    "name": "SinkCosmos"
                }
            ],
            "transformations": [
                {
                    "name": "Select"
                },
                {
                    "name": "DerivedColumn"
                },
                {
                    "name": "AlterRow"
                }
            ],
            "script": "source(output(\n\t\tid as string,\n\t\tTitle as string,\n\t\tFirstName as string,\n\t\tLastName as string,\n\t\tAddresses as (Line1 as string, Country as string)[]\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tignoreNoFilesFound: false,\n\tdocumentForm: 'arrayOfDocuments') ~> SourceJson\nSourceJson select(mapColumn(\n\t\tid,\n\t\tTitle,\n\t\tFirstName,\n\t\tLastName,\n\t\tLine1 = Addresses[1].Line1\n\t),\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> Select\nSelect derive(Line1LowerCase = lower(Line1)) ~> DerivedColumn\nDerivedColumn alterRow(upsertIf(true())) ~> AlterRow\nAlterRow sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tdeletable:false,\n\tinsertable:false,\n\tupdateable:false,\n\tupsertable:true,\n\tformat: 'document',\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> SinkCosmos"
        }
    }
}

Upvotes: 1

Related Questions