Reputation: 3
I am new to Azure Data Factory and we have the following 4 different formats of JSON data stored in the (source) SQL Database:
Source SQL Table (Having JSON Payload)
RequestID | Endpoint | Payload |
---|---|---|
0001 | /Customer | { "iD": 1, "firstName": "Alvin", "lastName" : "Chang", "countryVisited": ["US","CA","JP","UK"] } |
0002 | /Customer | { "Id": 2, "FirstName": "Mike", "LastName" : "Smith", "CountryVisited": ["US","UK"] } |
0003 | /Customer | { "ID": 3, "FIRSTNAME": "Shelly", "LASTNAME" : "Bose", "COUNTRYVISITED": ["JP","AU"] } |
0004 | /Customer | { "id": 4, "firstname": "Chris", "lastname" : "EStrada", "countryvisited": ["IN","CA","UK"] } |
We are trying to parse the Payload and store it into another database (sink) table as follows:
Target (Sink) SQL Table:
ID | FirstName | LastName | Countries Visited |
---|---|---|---|
1 | Alvin | Chang | US~CA~JP~UK |
2 | Mike | Smith | US~UK |
3 | Shelly | Bose | JP~AU |
4 | Chris | Estrada | IN~CA~UK |
The source Table has the following JSON data in different formats. The property names are the same but in different formats (mixed cases). I am not sure how to define the Output Column Type Expression able to parse all 4 different formats.
Format 1 { "iD": 1, "firstName": "Alvin", "lastName" : "Chang", "countryVisited": ["US","CA","JP","UK"] }
Format 2 { "Id": 2, "FirstName": "Mike", "LastName" : "Smith", "CountryVisited": ["US","UK"] }
Format 3 { "ID": 3, "FIRSTNAME": "Shelly", "LASTNAME" : "Bose", "COUNTRYVISITED": ["JP","AU"] }
Format 4 { "id": 4, "firstname": "Chris", "lastname" : "EStrada", "countryvisited": ["IN","CA","UK"] }
Here is my DataFlow for your reference:
Upvotes: 0
Views: 132
Reputation: 11514
I am able to achieve your requirement using below approach.
NOTE: In this approach, I have used certain characters in JSON to split and made a list of keys. This approach only works for this case (for this JSON structure), if your JSON structure is different you need to change the characters according to your JSON.
First, I have used derived column transformations to find the list of keys in each row(json string).
For that, I have used below expression and generated a new array column called array
. Here, give your number of columns in the JSON string (I have given 4).
slice(split(Payload,'":'),1,4)
It gave me the following array column.
Then, in next derived column I have used below expression to get the list of keys in each JSON string.
mapIndex(array,substringIndex(#item,'"',-1))
Now, I have used another derived column transformation. In this I have replaced each element in keys array with lower(keys).
replace(replace(replace(replace(Payload,keys[1],lower(keys[1])),keys[2],lower(keys[2])),keys[3],lower(keys[3])),keys[4],lower(keys[4]))
As all keys are in same case, now we can use parse transformation on this.
Give the below parse configurations.
(id as integer,
firstname as string,
lastname as string,
countryvisited as string[])
Parse Result:
Now, use select transformation Rule based mapping to only get the required columns.
As you want the last column countryvisited
to be a string separated with ~
, I took another derived column transformation and used below expression. join()
is not supported in dataflow expression, so I used replace as per the requirement.
replace(replace(replace(replace(toString(countryvisited), '[', ''),']',''),'"',''),',','~')
Result:
Add an SQL table as sink and the data will be copied to the SQL table.
My Dataflow JSON for your reference:
{
"name": "dataflow2",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "AzureSqlTable2",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "derivedColumnForSplitArray"
},
{
"name": "derivedColumnForKeysArray"
},
{
"name": "DerivedColumnReplaceKeys"
},
{
"name": "parse1"
},
{
"name": "select1"
},
{
"name": "derivedColumn4"
}
],
"scriptLines": [
"source(output(",
" RequestID as string,",
" Payload as string",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" isolationLevel: 'READ_UNCOMMITTED',",
" format: 'table') ~> source1",
"source1 derive(array = slice(split(Payload,'\":'),1,4)) ~> derivedColumnForSplitArray",
"derivedColumnForSplitArray derive(keys = mapIndex(array,substringIndex(#item,'\"',-1))) ~> derivedColumnForKeysArray",
"derivedColumnForKeysArray derive(Payload = replace(replace(replace(replace(Payload,keys[1],lower(keys[1])),keys[2],lower(keys[2])),keys[3],lower(keys[3])),keys[4],lower(keys[4]))) ~> DerivedColumnReplaceKeys",
"DerivedColumnReplaceKeys parse(Payload = Payload ? (id as integer,",
" firstname as string,",
" lastname as string,",
" countryvisited as string[]),",
" format: 'json',",
" documentForm: 'singleDocument') ~> parse1",
"parse1 select(mapColumn(",
" each(Payload,match(true()))",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select1",
"select1 derive(countryvisited = replace(replace(replace(replace(toString(countryvisited), '[', ''),']',''),'\"',''),',','~')) ~> derivedColumn4",
"derivedColumn4 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" deletable:false,",
" insertable:true,",
" updateable:false,",
" upsertable:false,",
" format: 'table',",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" errorHandlingOption: 'stopOnFirstError') ~> sink1"
]
}
}
}
Upvotes: 0