Reputation: 3
ImageI am currently reading a SQL Table which has more than 5000 records. Since Lookup activity doesnt support more than 5000 Records. I had to create a foreach loop which will iterate based on totalrecords/5000 and inside lookup will fetch first 5000 records then for next ietration it will fetch another 5000 and so on. however i am stuck on how to pass the each lookup activity output array to a variable.
My Pipeline look like this.
{
"name": "pipeline2",
"properties": {
"activities": [
{
"name": "GetRowCount_FromMyTable",
"type": "Lookup",
"dependsOn": [],
"policy": {
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": {
"value": "SELECT COUNT(*) as TotalCount FROM MyTable",
"type": "Expression"
},
"queryTimeout": "02:00:00",
"partitionOption": "None"
},
"dataset": {
"referenceName": "ds_sql_extraction",
"type": "DatasetReference"
}
},
"inputs": [
{
"referenceName": "ds_sql_extraction",
"type": "DatasetReference"
}
],
"linkedServiceName": {
"referenceName": "MyDatabase",
"type": "LinkedServiceReference"
}
},
{
"name": "IterativeLookup",
"type": "ForEach",
"dependsOn": [
{
"activity": "GetRowCount_OffSetTable",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@range(0, add(div(activity('GetRowCount_OffSetTable').output.firstRow.TotalCount, 5000), 1))",
"type": "Expression"
},
"activities": [
{
"name": "LookupActivity",
"type": "Lookup",
"dependsOn": [],
"policy": {
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [
{
"name": "LookupIterations",
"value": "@{item()}"
}
],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": {
"value": "SELECT * FROM MyTable ORDER BY OffsetValue OFFSET @{mul(int(item()), 5000)} ROWS FETCH NEXT 5000 ROWS ONLY\n\n",
"type": "Expression"
},
"queryTimeout": "02:00:00",
"partitionOption": "None"
},
"dataset": {
"referenceName": "ds_sql_extraction",
"type": "DatasetReference"
},
"firstRowOnly": false
},
"inputs": [
{
"referenceName": "ds_sql_extraction",
"type": "DatasetReference"
}
]
},
{
"name": "Set variable1",
"type": "SetVariable",
"dependsOn": [
{
"activity": "LookupActivity",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "LookupArray",
"value": {
"value": "@string(item().value)",
"type": "Expression"
}
}
}
]
}
}
],
"variables": {
"LookupArray": {
"type": "Array"
},
"AnotherArray": {
"type": "Array"
},
"LookupString": {
"type": "String"
},
"Stringg": {
"type": "Array"
},
"Test": {
"type": "String"
},
"test2": {
"type": "Array"
},
"test1": {
"type": "String"
},
"NewArraySet": {
"type": "Array"
}
},
"folder": {
"name": "Data_Extraction"
},
"annotations": []
}
}
how to get the two LookupActivity (Iterated) outputs to one single variable?
Upvotes: 0
Views: 282
Reputation: 11464
As you don't have any duplicates in your table, you can union the lookup output array in each iteration with its previous iteration lookup output array to get the final array.
Here, for sample instead of 5000 rows, I took 4 rows as limit where my table consists of total 16 rows and my id
column is equivalent to your OffsetValue
column.
First, create two array variables temp_arr
and res_arr
with empty default values like below.
I have followed same approach and same queries as yours till the second lookup activity.
Inside For-Each, after second lookup activity, take a set variable activity for temp_arr
and give @variables('res_arr')
to it.
Next, take another set variable activity for res_arr
variable and give the below expression.
@union(variables('temp_arr'),activity('Lookup2').output.value)
In ADF, self-referencing variables is not supported, that is the reason why the temp_arr
variable was used.
At the end of the For-loop, the result final array will be stored in the res_arr
variable.
Result:
UPDATE:
Your activities are running parallelly inside For-Each. You need to check the Sequential check box in the For-Each activity to run the activities sequentially.
The output array will be stored in the res_arr
variable. If you want to access the array after For-Each, you need to use this array.
My pipeline JSON for your reference:
Change the datasets name and query and activity names as per your requirement and use it.
{
"name": "lookup concat pipeline",
"properties": {
"activities": [
{
"name": "Lookup1",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT COUNT(*) as TotalCount FROM lookup1",
"queryTimeout": "02:00:00",
"partitionOption": "None"
},
"dataset": {
"referenceName": "lookup_table",
"type": "DatasetReference"
},
"firstRowOnly": true
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Lookup1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@range(0, add(div(activity('Lookup1').output.firstRow.TotalCount, 4), 1))",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Lookup2",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": {
"value": "SELECT * FROM lookup1 ORDER BY id OFFSET @{mul(int(item()), 4)} ROWS FETCH NEXT 4 ROWS ONLY",
"type": "Expression"
},
"queryTimeout": "02:00:00",
"partitionOption": "None"
},
"dataset": {
"referenceName": "lookup_table",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "temp_arr",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Lookup2",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "temp_arr",
"value": {
"value": "@variables('res_arr')",
"type": "Expression"
}
}
},
{
"name": "union temp and and lookup array",
"type": "SetVariable",
"dependsOn": [
{
"activity": "temp_arr",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "res_arr",
"value": {
"value": "@union(variables('temp_arr'),activity('Lookup2').output.value)",
"type": "Expression"
}
}
}
]
}
}
],
"variables": {
"temp_arr": {
"type": "Array"
},
"res_arr": {
"type": "Array"
}
},
"annotations": [],
"lastPublishTime": "2024-05-29T04:16:16Z"
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
Upvotes: 0