Reputation: 447
I have multiple tables inside a sql database. Of each of them I want to have the column names and types in Azure Data Factory.
In Adf I have multiple tables that need to be copied to (already created)tables with the same name in the sql database. The tables (files) in adf have more columns and also different types than the tables in the sql database. Now if I try to send the data from adf to the sql database table, adf will throw an error saying that the types and columns don't add up. Therefore I want to enforce the schema of the sql table on the corresponding table in adf. (Different schema for a different table).
How can I do this?
Upvotes: 0
Views: 1284
Reputation: 11514
You can try the below approach which involves two level pipelines.
But note that, this approach only works when the source and sink table names are same and existing column names from both are same.
Here, for sample I took the ADLS csv files as source with same names as tables names with .csv
.
First, in the parent pipeline, get the list of table names from your target database with below query using lookup activity (uncheck firstRow).
SELECT name FROM sys.tables;
Give the lookup activity output array @activity('Lookup1').output.value
to a ForEach activity.
Inside ForEach, first we need to get the structure of the target table. For that use the Get meta data activity with parameterized SQL database dataset. Give the @item().name
as table name and select Structure
in it.
Now, this will give the structure of the table. To copy the multiple tables without the Column not found errors, first we need to build a dynamic schema and give that in the copy activity mapping.
To build that schema for every table, use the child pipeline. In child pipeline, create an array parameter and pass the above Get meta data structure array to the child pipeline.
@activity('Get Metadata1').output.structure
In the Child pipeline, first create 3 variables like below.
In first set variable activity, select schema
variable and give this string.
{"type": "TabularTranslator","mappings":
Now, we need to build the mapping array. To do that, use a ForEach activity give the @pipeline().parameters.sqlstructures
parameter to it and make sure to check on Sequential here.
Inside this ForEach, take an append variable activity and give below expression for the mapping
array.
@json(concat('{"source": {"name":"',item().name,'","type": "String"},"sink": {"name": "',item().name,'","type": "',item().logicalType,'"}}'))
After ForEach, take another set variable activity for the variable final_schema
and give the below expression.
@concat(variables('schema'),string(variables('mapping')),',"typeConversion": true,"typeConversionSettings":{"allowDataTruncation": false,"treatBooleanAsNumber": false}}')
The mapping will be generated in a string like this for sample.
Now, return this string to the parent pipeline like below.
In the parent pipeline, take a copy activity, give your source and sink (Use dataset parameter for the table names and give the current table @item().name
in it).
Click on the dynamic content in the mapping and give the below expression using the returned variable.
@json(activity('Execute Pipeline1').output.pipelineReturnValue.Mapping_schema)
Debug the pipeline and whatever might be the extra columns in your source tables, those will be ignored and only the required columns data will be copied in to the target tables.
My result:
My parent pipeline JSON:
{
"name": "Parent",
"properties": {
"activities": [
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Lookup1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Lookup1').output.value",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Get Metadata1",
"type": "GetMetadata",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference",
"parameters": {
"tablename": {
"value": "@item().name",
"type": "Expression"
}
}
},
"fieldList": [
"columnCount",
"structure"
]
}
},
{
"name": "Execute Pipeline1",
"type": "ExecutePipeline",
"dependsOn": [
{
"activity": "Get Metadata1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"pipeline": {
"referenceName": "Child_pipeline",
"type": "PipelineReference"
},
"waitOnCompletion": true,
"parameters": {
"sqlstructures": {
"value": "@activity('Get Metadata1').output.structure",
"type": "Expression"
}
}
}
},
{
"name": "Copy from source to target",
"type": "Copy",
"dependsOn": [
{
"activity": "Execute Pipeline1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
},
"sink": {
"type": "AzureSqlSink",
"writeBehavior": "insert",
"sqlWriterUseTableLock": false
},
"enableStaging": false,
"translator": {
"value": "@json(activity('Execute Pipeline1').output.pipelineReturnValue.Mapping_schema)",
"type": "Expression"
}
},
"inputs": [
{
"referenceName": "SampleSource",
"type": "DatasetReference",
"parameters": {
"filename": {
"value": "@concat(item().name,'.csv')",
"type": "Expression"
}
}
}
],
"outputs": [
{
"referenceName": "AzureSqlTable1",
"type": "DatasetReference",
"parameters": {
"tablename": {
"value": "@item().name",
"type": "Expression"
}
}
}
]
}
]
}
},
{
"name": "Lookup1",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT name FROM sys.tables;",
"queryTimeout": "02:00:00",
"partitionOption": "None"
},
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference",
"parameters": {
"tablename": "sample1"
}
},
"firstRowOnly": false
}
}
],
"annotations": []
}
}
Child pipeline JSON:
{
"name": "Child_pipeline",
"properties": {
"activities": [
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "start schema",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@pipeline().parameters.sqlstructures",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "mapping",
"value": {
"value": "@json(concat('{\"source\": {\"name\":\"',item().name,'\",\"type\": \"String\"},\"sink\": {\"name\": \"',item().name,'\",\"type\": \"',item().logicalType,'\"}}'))",
"type": "Expression"
}
}
}
]
}
},
{
"name": "start schema",
"type": "SetVariable",
"dependsOn": [],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "schema",
"value": "{\"type\": \"TabularTranslator\",\"mappings\":"
}
},
{
"name": "Build final schema",
"type": "SetVariable",
"dependsOn": [
{
"activity": "ForEach1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "final_schema",
"value": {
"value": "@concat(variables('schema'),string(variables('mapping')),',\"typeConversion\": true,\"typeConversionSettings\":{\"allowDataTruncation\": false,\"treatBooleanAsNumber\": false}}')",
"type": "Expression"
}
}
},
{
"name": "Return Final schema to parent",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Build final schema",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "pipelineReturnValue",
"value": [
{
"key": "Mapping_schema",
"value": {
"type": "Expression",
"content": "@variables('final_schema')"
}
}
],
"setSystemVariable": true
}
}
],
"parameters": {
"sqlstructures": {
"type": "array"
}
},
"variables": {
"schema": {
"type": "String"
},
"final_schema": {
"type": "String"
},
"mapping": {
"type": "Array"
}
},
"annotations": []
}
}
Upvotes: 0