BlakeB9
BlakeB9

Reputation: 647

How to dynamically use upsert in ADF to an Azure SQL database

Inside of my pipeline, I'm running a lookup to gather tables from one database to transfer to another.

select table_schema,table_name from information_schema.tables where table_schema = 'schema1' OR TABLE_SCHEMA = 'schema2'

Then I do a copy activity for each of those tables. And at the moment I've got the sink option set to "insert". However, I would like to try and use upsert, but I've got to identify "key column(s)" in order to do so. What's the best way to try and get these dynamically? And which columns are best used for this? Columns that will contain a unique value for that row?

Upvotes: 0

Views: 3199

Answers (2)

Bhavani
Bhavani

Reputation: 5317

Create database linked services for Look Up, Source and Destination Create pipeline and perform look up activity to get list of tables in source database with query option using below query.

SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.TABLES
WHERE TABLE_TYPE = 'BASE TABLE' and TABLE_SCHEMA = 'dbo'

After successful execution of lookup connect foreach activity enable sequential and add @activity('Lookup1').output.value as dynamic content to the for each add look up activity to foreach with query execution using below query to get key column of tables

SELECT column_name
FROM information_schema.key_column_usage

after successful execution of lookup connect copy data activity create Source sql dataset with created linked and create two parameters named Schema and tableName to retrieve tables of database and given below values:

schema: @item().TABLE_SCHEMA
tableName: @item().TABLE_NAME

Create sink dataset and create above parameters and give same value. Enable upsert option and add dynamic content as @createArray(activity('Lookup2').output.firstRow.column_name) for key column

enter image description here

Debug the pipeline it executing successfully and updating the tables in target successfully.

enter image description here

Pipeline Json:

{
    "name": "Pipeline 2",
    "properties": {
        "activities": [
            {
                "name": "Lookup1",
                "type": "Lookup",
                "dependsOn": [],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "AzureSqlSource",
                        "sqlReaderQuery": "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.TABLES\nWHERE TABLE_TYPE = 'BASE TABLE' and TABLE_SCHEMA = 'dbo'",
                        "queryTimeout": "02:00:00",
                        "partitionOption": "None"
                    },
                    "dataset": {
                        "referenceName": "AzureSqlTable3",
                        "type": "DatasetReference"
                    },
                    "firstRowOnly": false
                }
            },
            {
                "name": "ForEach1",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "Lookup1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@activity('Lookup1').output.value",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "Lookup2",
                            "type": "Lookup",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "source": {
                                    "type": "AzureSqlSource",
                                    "sqlReaderQuery": "SELECT column_name\nFROM information_schema.key_column_usage",
                                    "queryTimeout": "02:00:00",
                                    "partitionOption": "None"
                                },
                                "dataset": {
                                    "referenceName": "AzureSqlTable3",
                                    "type": "DatasetReference"
                                }
                            }
                        },
                        {
                            "name": "Copy_3q5",
                            "type": "Copy",
                            "dependsOn": [
                                {
                                    "activity": "Lookup2",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [
                                {
                                    "name": "Source",
                                    "value": "files//input.xml"
                                },
                                {
                                    "name": "Destination",
                                    "value": "dbo.input"
                                }
                            ],
                            "typeProperties": {
                                "source": {
                                    "type": "AzureSqlSource",
                                    "queryTimeout": "02:00:00",
                                    "partitionOption": "None"
                                },
                                "sink": {
                                    "type": "AzureSqlSink",
                                    "writeBehavior": "upsert",
                                    "upsertSettings": {
                                        "useTempDB": true,
                                        "keys": {
                                            "value": "@createArray(activity('Lookup2').output.firstRow.column_name)",
                                            "type": "Expression"
                                        }
                                    },
                                    "sqlWriterUseTableLock": false,
                                    "disableMetricsCollection": false
                                },
                                "enableStaging": false,
                                "translator": {
                                    "type": "TabularTranslator",
                                    "typeConversion": true,
                                    "typeConversionSettings": {
                                        "allowDataTruncation": true,
                                        "treatBooleanAsNumber": false
                                    }
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "DestinationDataset_3q5",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "tableName": {
                                            "value": "@item().TABLE_NAME",
                                            "type": "Expression"
                                        },
                                        "schema": {
                                            "value": "@item().TABLE_SCHEMA",
                                            "type": "Expression"
                                        }
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "AzureSqlTable2",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "schema": {
                                            "value": "@item().TABLE_SCHEMA",
                                            "type": "Expression"
                                        },
                                        "tableName": {
                                            "value": "@item().TABLE_NAME",
                                            "type": "Expression"
                                        }
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "variables": {
            "ok": {
                "type": "String"
            }
        },
        "annotations": [],
        "lastPublishTime": "2023-04-19T11:45:57Z"
    },
    "type": "Microsoft.Synapse/workspaces/pipelines"
}

Upvotes: 1

Chen Hirsh
Chen Hirsh

Reputation: 1400

For the key columns, you need to use a column or combination of columns that uniquely identify a row in that table.

You can add this key list to the result of the query above, either by getting them from metadata (if you have a primary key or a unique index on that table), or by keeping them in some management table, and querying that table.

Then you can use dynamic content in ADF to provide the key columns on the sink tab. Please note that the key column should be of type array (because its a list of columns).

Upvotes: 1

Related Questions