Reputation: 17
I have the following DataFlow in ADF which parses a JSON file and uses the ID(s) of each object inside of an ExternalCall (Source). The JSON Properties inside the data returned from ExternalCall are translated (via the cachedMappings sink) into the appropriate SQL (Sink) Columns.
The main purpose of this Dataflow is to utilize the Drifted Schema capability since I will be passing different mappings and Parameters to make requests against different endpoints (from the same service) and to map the responses to the mappings assigned in cachedMappings. So I want to avoid any solutions which suggest hard coding or mapping any schemas directly to any one particular activity.
I am only testing with a small subset of the JSON Properties currently and when I check both the alterArray and the sqlSink DataPreview I can see the data values and columns as expected and the DataPreview Columns all match exactly (including capitalization) the same field names on the sqlSink Table.
ISSUE: When I Debug (or even trigger) the DataFlow the ONLY column that posts is "shop_id". I have tried a number of different things including changing the column types in both ADF and Sql with no success.
AlterArray DataPreview - Correctly Showing 2 Rows will be Upserted
sqlSink DataPreview - Correctly Showing 2 Rows with 5 Columns all as Drifted
sqlSink Settings - Schema Drift Enabled. Table Key values are set with expression split($sinkKeys,',') from value 'Transaction_Id,Order_Id' which splits into existing Columns in the sql Table
I am completely at a loss. The only thing which I have found so far which MAY be a culprit is in the output file (which I can also see by inspecting the flow) is that all of the columns which are getting set as NULL are referencing "body" in the externalCall as the source Column whereas shop_id is (which is assigned later in the addShopId Derived Column) is identified explicitly in this lineage and it happens to be the only one being set with a value.
DataFlow SqlSink Column Lineage
ExternalCall
Just to rule out anything with the drifted columns, at one point I added a "Derived Column" activity which mapped the drifted columns and still no success. Even if that would have worked though it goes against the purpose of this flow which is to pass in "Dynamic" sets of mappings and endpoints to be able to parse different sets of data.
Below is a link to the output JSON file here as well.
Any help or suggestion would be appreciated. Please keep in mind the note above about the purpose of this DataFlow in utilizing the Drifted Schema.
Linked Below is a copy of both the Script Representation and JSON code representation of the data flow.
Script Representation Dataflow Script Representation
JSON Representation Dataflow JSON Representation
Upvotes: 0
Views: 109
Reputation: 5297
You may get issue with the flatten transformation, after External Call activity, you can achieve your requirement using below approach:
Click on import projection in output tab of External Call activity, it will import the data of page with type as shown below:
It will import the data as complex object.
Add derived column activity to the External Call activity create columns of Complex data as shown below:
It will create the columns as required, use select activity, select the required columns, use alter activity according to your requirement, Add SQL sink with insert and upsert options, Debug the pipeline, data will insert into the SQL db successfully as shown below:
Here is the dataflow Json for your reference:
{
"name": "dataflow3",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "DelimitedText2",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "externalCall1",
"linkedService": {
"referenceName": "RestService1",
"type": "LinkedServiceReference"
}
},
{
"name": "derivedColumn1"
},
{
"name": "AlterRow1"
},
{
"name": "derivedColumn2"
},
{
"name": "select1"
}
],
"scriptLines": [
"source(output(",
" ID as string,",
" DeltaValue as string",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false) ~> source1",
"derivedColumn1 call(output(",
" body as (body as string, id as string, title as string, userId as string)",
" ),",
" allowSchemaDrift: true,",
" format: 'rest',",
" store: 'restservice',",
" timeout: 30,",
" requestInterval: 0,",
" httpMethod: 'GET',",
" rowRelativeUrl: 'myid',",
" skipRowRelativeUrl: true,",
" bodyColumnName: 'body',",
" requestFormat: ['type' -> 'json'],",
" responseFormat: ['type' -> 'json', 'documentForm' -> 'singleDocument']) ~> externalCall1",
"source1 derive(myid = toString(ID)) ~> derivedColumn1",
"select1 alterRow(upsertIf((isNull(myid)==false()))) ~> AlterRow1",
"externalCall1 derive(id = body.id,",
" userId = body.userId,",
" title = body.title,",
" body = body.body) ~> derivedColumn2",
"derivedColumn2 select(mapColumn(",
" myid,",
" body,",
" id = derivedColumn2@id,",
" userId,",
" title",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select1",
"AlterRow1 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" deletable:false,",
" insertable:true,",
" updateable:false,",
" upsertable:true,",
" keys:['myid'],",
" format: 'table',",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" errorHandlingOption: 'stopOnFirstError') ~> sink1"
]
}
}
}
Upvotes: 1