Reputation: 647
Inside of my pipeline, I'm running a lookup to gather tables from one database to transfer to another.
select table_schema,table_name from information_schema.tables where table_schema = 'schema1' OR TABLE_SCHEMA = 'schema2'
Then I do a copy activity for each of those tables. And at the moment I've got the sink option set to "insert". However, I would like to try and use upsert, but I've got to identify "key column(s)" in order to do so. What's the best way to try and get these dynamically? And which columns are best used for this? Columns that will contain a unique value for that row?
Upvotes: 0
Views: 3199
Reputation: 5317
Create database linked services for Look Up, Source and Destination Create pipeline and perform look up activity to get list of tables in source database with query option using below query.
SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.TABLES
WHERE TABLE_TYPE = 'BASE TABLE' and TABLE_SCHEMA = 'dbo'
After successful execution of lookup connect foreach activity enable sequential and add @activity('Lookup1').output.value
as dynamic content to the for each add look up activity to foreach with query execution using below query to get key column of tables
SELECT column_name
FROM information_schema.key_column_usage
after successful execution of lookup connect copy data activity create Source sql dataset with created linked and create two parameters named Schema and tableName to retrieve tables of database and given below values:
schema: @item().TABLE_SCHEMA
tableName: @item().TABLE_NAME
Create sink dataset and create above parameters and give same value. Enable upsert option and add dynamic content as @createArray(activity('Lookup2').output.firstRow.column_name)
for key column
Debug the pipeline it executing successfully and updating the tables in target successfully.
Pipeline Json:
{
"name": "Pipeline 2",
"properties": {
"activities": [
{
"name": "Lookup1",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.TABLES\nWHERE TABLE_TYPE = 'BASE TABLE' and TABLE_SCHEMA = 'dbo'",
"queryTimeout": "02:00:00",
"partitionOption": "None"
},
"dataset": {
"referenceName": "AzureSqlTable3",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Lookup1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Lookup1').output.value",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Lookup2",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT column_name\nFROM information_schema.key_column_usage",
"queryTimeout": "02:00:00",
"partitionOption": "None"
},
"dataset": {
"referenceName": "AzureSqlTable3",
"type": "DatasetReference"
}
}
},
{
"name": "Copy_3q5",
"type": "Copy",
"dependsOn": [
{
"activity": "Lookup2",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [
{
"name": "Source",
"value": "files//input.xml"
},
{
"name": "Destination",
"value": "dbo.input"
}
],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"queryTimeout": "02:00:00",
"partitionOption": "None"
},
"sink": {
"type": "AzureSqlSink",
"writeBehavior": "upsert",
"upsertSettings": {
"useTempDB": true,
"keys": {
"value": "@createArray(activity('Lookup2').output.firstRow.column_name)",
"type": "Expression"
}
},
"sqlWriterUseTableLock": false,
"disableMetricsCollection": false
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
}
}
},
"inputs": [
{
"referenceName": "DestinationDataset_3q5",
"type": "DatasetReference",
"parameters": {
"tableName": {
"value": "@item().TABLE_NAME",
"type": "Expression"
},
"schema": {
"value": "@item().TABLE_SCHEMA",
"type": "Expression"
}
}
}
],
"outputs": [
{
"referenceName": "AzureSqlTable2",
"type": "DatasetReference",
"parameters": {
"schema": {
"value": "@item().TABLE_SCHEMA",
"type": "Expression"
},
"tableName": {
"value": "@item().TABLE_NAME",
"type": "Expression"
}
}
}
]
}
]
}
}
],
"variables": {
"ok": {
"type": "String"
}
},
"annotations": [],
"lastPublishTime": "2023-04-19T11:45:57Z"
},
"type": "Microsoft.Synapse/workspaces/pipelines"
}
Upvotes: 1
Reputation: 1400
For the key columns, you need to use a column or combination of columns that uniquely identify a row in that table.
You can add this key list to the result of the query above, either by getting them from metadata (if you have a primary key or a unique index on that table), or by keeping them in some management table, and querying that table.
Then you can use dynamic content in ADF to provide the key columns on the sink tab. Please note that the key column should be of type array (because its a list of columns).
Upvotes: 1