Carol
Carol

Reputation: 347

'Exists Condition' for dynamic pattern in Azure Data Factory Dataflow

I am using Azure Data Factory in which a data flow is used. In this dataflow I want to compare two sources, using the 'Exsits' transformation. Both sources have identical column names. Only datarows in source1 that doesn't exist in source2 should be stored in Sink. The problem comes while configuring the Exits conditions. As I want to use the same pipeline for many datasets I want to use the custom expression field and implement late binding to compare the required columns in both sources. So I created an array input parameter ($primaryKeys) that holds the columns that needs to be compared in both the sources. Here is where I get stuck :

Dataflow Diagram

The expression that gives an error:

Exists Condition

Any suggestions how I can get the dynamic pattern in the expression here to work?

Upvotes: 0

Views: 1152

Answers (1)

Saideep Arikontham
Saideep Arikontham

Reputation: 6104

  • I was able to achieve your requirement using 2 dataflows instead of one where I have used one dataflow to build the custom expression and the other to implement your exists logic.

  • I have taken the primitiveKeys array parameter in dataflow where I am passing static value from pipeline for demonstration.

enter image description here

  • In the first dataflow, with the above array parameter, take any dataset as source. Create a derived column transformation with the dynamic value as toString(reduce(map($cols,concat('source1@',#item,' == source2@',#item)),'true() ',#acc+' && '+#item,#result)). The source1 and source2 are the left and right stream names that I will be applying exists transformation on.

enter image description here

  • Now, we need this value. So, I have used cache sink and writing the output to activity output.
  • In the second dataflow, I have created a string parameter custom_expr for which I pass the value generated above.

enter image description here

  • Now in the dataflow where we use custom expression, I have used the dynamic content as toBoolean(expr(toString($custom_expr))).

This will give the results as desired. The following are output images for reference.

  • Source 1 data preview:

enter image description here

  • Source 2 data preview:

enter image description here

  • Does not exist result:

enter image description here

  • The JSON for first dataflow:
{
    "name": "dataflow2",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "DelimitedText6",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "derivedColumn1"
                }
            ],
            "scriptLines": [
                "parameters{",
                "     cols as string[] (['a','b'])",
                "}",
                "source(output(",
                "          id as string,",
                "          first_name as string,",
                "          date as string",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false) ~> source1",
                "source1 derive(tp = toString(reduce(map($cols,concat('source1@',#item,' == source2@',#item)),'true() ',#acc+' && '+#item,#result))) ~> derivedColumn1",
                "derivedColumn1 sink(validateSchema: false,",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     store: 'cache',",
                "     format: 'inline',",
                "     output: true,",
                "     saveOrder: 1) ~> sink1"
            ]
        }
    }
}
  • The JSON for second dataflow:
{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "DelimitedText5",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                },
                {
                    "dataset": {
                        "referenceName": "DelimitedText6",
                        "type": "DatasetReference"
                    },
                    "name": "source2"
                }
            ],
            "sinks": [
                {
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "exists1"
                }
            ],
            "scriptLines": [
                "parameters{",
                "     custom_expr as string ('true()  && source1@id == source2@id && source1@first_name == source2@first_name')",
                "}",
                "source(output(",
                "          id as string,",
                "          first_name as string,",
                "          date as string",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false) ~> source1",
                "source(output(",
                "          id as string,",
                "          first_name as string,",
                "          date as string",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false) ~> source2",
                "source1, source2 exists(toBoolean(expr(toString($custom_expr))),",
                "     negate:true,",
                "     broadcast: 'both')~> exists1",
                "exists1 sink(validateSchema: false,",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     store: 'cache',",
                "     format: 'inline',",
                "     output: true,",
                "     saveOrder: 1) ~> sink1"
            ]
        }
    }
}

NOTE: The only hardcoded values in the above when you try to replicate this would be the left and right stream names to build the expression.

Upvotes: 1

Related Questions