Reputation: 347
I am using Azure Data Factory in which a data flow is used. In this dataflow I want to compare two sources, using the 'Exsits' transformation. Both sources have identical column names. Only datarows in source1 that doesn't exist in source2 should be stored in Sink. The problem comes while configuring the Exits conditions. As I want to use the same pipeline for many datasets I want to use the custom expression field and implement late binding to compare the required columns in both sources. So I created an array input parameter ($primaryKeys) that holds the columns that needs to be compared in both the sources. Here is where I get stuck :
The expression that gives an error:
Any suggestions how I can get the dynamic pattern in the expression here to work?
Upvotes: 0
Views: 1152
Reputation: 6104
I was able to achieve your requirement using 2 dataflows instead of one where I have used one dataflow to build the custom expression and the other to implement your exists logic.
I have taken the primitiveKeys
array parameter in dataflow where I am passing static value from pipeline for demonstration.
toString(reduce(map($cols,concat('source1@',#item,' == source2@',#item)),'true() ',#acc+' && '+#item,#result))
. The source1 and source2 are the left and right stream names that I will be applying exists
transformation on.custom_expr
for which I pass the value generated above.toBoolean(expr(toString($custom_expr)))
.This will give the results as desired. The following are output images for reference.
{
"name": "dataflow2",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "DelimitedText6",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"name": "sink1"
}
],
"transformations": [
{
"name": "derivedColumn1"
}
],
"scriptLines": [
"parameters{",
" cols as string[] (['a','b'])",
"}",
"source(output(",
" id as string,",
" first_name as string,",
" date as string",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false) ~> source1",
"source1 derive(tp = toString(reduce(map($cols,concat('source1@',#item,' == source2@',#item)),'true() ',#acc+' && '+#item,#result))) ~> derivedColumn1",
"derivedColumn1 sink(validateSchema: false,",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" store: 'cache',",
" format: 'inline',",
" output: true,",
" saveOrder: 1) ~> sink1"
]
}
}
}
{
"name": "dataflow1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "DelimitedText5",
"type": "DatasetReference"
},
"name": "source1"
},
{
"dataset": {
"referenceName": "DelimitedText6",
"type": "DatasetReference"
},
"name": "source2"
}
],
"sinks": [
{
"name": "sink1"
}
],
"transformations": [
{
"name": "exists1"
}
],
"scriptLines": [
"parameters{",
" custom_expr as string ('true() && source1@id == source2@id && source1@first_name == source2@first_name')",
"}",
"source(output(",
" id as string,",
" first_name as string,",
" date as string",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false) ~> source1",
"source(output(",
" id as string,",
" first_name as string,",
" date as string",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false) ~> source2",
"source1, source2 exists(toBoolean(expr(toString($custom_expr))),",
" negate:true,",
" broadcast: 'both')~> exists1",
"exists1 sink(validateSchema: false,",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" store: 'cache',",
" format: 'inline',",
" output: true,",
" saveOrder: 1) ~> sink1"
]
}
}
}
NOTE: The only hardcoded values in the above when you try to replicate this would be the left and right stream names to build the expression.
Upvotes: 1