SauGo
SauGo

Reputation: 3

How To Parse JSON Case Insensitive In Azure Data Factory

I am new to Azure Data Factory and we have the following 4 different formats of JSON data stored in the (source) SQL Database:

Source SQL Table (Having JSON Payload)

RequestID Endpoint Payload
0001 /Customer { "iD": 1, "firstName": "Alvin", "lastName" : "Chang", "countryVisited": ["US","CA","JP","UK"] }
0002 /Customer { "Id": 2, "FirstName": "Mike", "LastName" : "Smith", "CountryVisited": ["US","UK"] }
0003 /Customer { "ID": 3, "FIRSTNAME": "Shelly", "LASTNAME" : "Bose", "COUNTRYVISITED": ["JP","AU"] }
0004 /Customer { "id": 4, "firstname": "Chris", "lastname" : "EStrada", "countryvisited": ["IN","CA","UK"] }

We are trying to parse the Payload and store it into another database (sink) table as follows:

Target (Sink) SQL Table:

ID FirstName LastName Countries Visited
1 Alvin Chang US~CA~JP~UK
2 Mike Smith US~UK
3 Shelly Bose JP~AU
4 Chris Estrada IN~CA~UK

The source Table has the following JSON data in different formats. The property names are the same but in different formats (mixed cases). I am not sure how to define the Output Column Type Expression able to parse all 4 different formats.

Format 1 { "iD": 1, "firstName": "Alvin", "lastName" : "Chang", "countryVisited": ["US","CA","JP","UK"] }

Format 2 { "Id": 2, "FirstName": "Mike", "LastName" : "Smith", "CountryVisited": ["US","UK"] }

Format 3 { "ID": 3, "FIRSTNAME": "Shelly", "LASTNAME" : "Bose", "COUNTRYVISITED": ["JP","AU"] }

Format 4 { "id": 4, "firstname": "Chris", "lastname" : "EStrada", "countryvisited": ["IN","CA","UK"] }

Here is my DataFlow for your reference: Data Flow

Upvotes: 0

Views: 132

Answers (1)

Rakesh Govindula
Rakesh Govindula

Reputation: 11514

I am able to achieve your requirement using below approach.

NOTE: In this approach, I have used certain characters in JSON to split and made a list of keys. This approach only works for this case (for this JSON structure), if your JSON structure is different you need to change the characters according to your JSON.

First, I have used derived column transformations to find the list of keys in each row(json string).

For that, I have used below expression and generated a new array column called array. Here, give your number of columns in the JSON string (I have given 4).

slice(split(Payload,'":'),1,4)

It gave me the following array column.

enter image description here

Then, in next derived column I have used below expression to get the list of keys in each JSON string.

mapIndex(array,substringIndex(#item,'"',-1))

enter image description here

Now, I have used another derived column transformation. In this I have replaced each element in keys array with lower(keys).

replace(replace(replace(replace(Payload,keys[1],lower(keys[1])),keys[2],lower(keys[2])),keys[3],lower(keys[3])),keys[4],lower(keys[4]))

As all keys are in same case, now we can use parse transformation on this.

enter image description here

Give the below parse configurations.

enter image description here

(id as integer,
        firstname as string,
        lastname as string,
        countryvisited as string[])

Parse Result:

enter image description here

Now, use select transformation Rule based mapping to only get the required columns.

enter image description here

As you want the last column countryvisited to be a string separated with ~, I took another derived column transformation and used below expression. join() is not supported in dataflow expression, so I used replace as per the requirement.

replace(replace(replace(replace(toString(countryvisited), '[', ''),']',''),'"',''),',','~')

Result:

enter image description here

Add an SQL table as sink and the data will be copied to the SQL table.

My Dataflow JSON for your reference:

{
    "name": "dataflow2",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "AzureSqlTable1",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "AzureSqlTable2",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "derivedColumnForSplitArray"
                },
                {
                    "name": "derivedColumnForKeysArray"
                },
                {
                    "name": "DerivedColumnReplaceKeys"
                },
                {
                    "name": "parse1"
                },
                {
                    "name": "select1"
                },
                {
                    "name": "derivedColumn4"
                }
            ],
            "scriptLines": [
                "source(output(",
                "          RequestID as string,",
                "          Payload as string",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     isolationLevel: 'READ_UNCOMMITTED',",
                "     format: 'table') ~> source1",
                "source1 derive(array = slice(split(Payload,'\":'),1,4)) ~> derivedColumnForSplitArray",
                "derivedColumnForSplitArray derive(keys = mapIndex(array,substringIndex(#item,'\"',-1))) ~> derivedColumnForKeysArray",
                "derivedColumnForKeysArray derive(Payload = replace(replace(replace(replace(Payload,keys[1],lower(keys[1])),keys[2],lower(keys[2])),keys[3],lower(keys[3])),keys[4],lower(keys[4]))) ~> DerivedColumnReplaceKeys",
                "DerivedColumnReplaceKeys parse(Payload = Payload ? (id as integer,",
                "          firstname as string,",
                "          lastname as string,",
                "          countryvisited as string[]),",
                "     format: 'json',",
                "     documentForm: 'singleDocument') ~> parse1",
                "parse1 select(mapColumn(",
                "          each(Payload,match(true()))",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> select1",
                "select1 derive(countryvisited = replace(replace(replace(replace(toString(countryvisited), '[', ''),']',''),'\"',''),',','~')) ~> derivedColumn4",
                "derivedColumn4 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     deletable:false,",
                "     insertable:true,",
                "     updateable:false,",
                "     upsertable:false,",
                "     format: 'table',",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     errorHandlingOption: 'stopOnFirstError') ~> sink1"
            ]
        }
    }
}

Upvotes: 0

Related Questions