Reputation: 19
I have a ADLS container, and my data set is stored in hierarchical manner inside folder( YYYY/MM/DD/Module )
Example: 2023 04 10 Organization File1.Txt File2.txt
2023 04 10 People File3.Txt File4.txt
Ask: I want to pick the latest file from the file container Using Azure Data Factory.
For testing, Currently I have a metadata activity which points to the static folder above (2023 04 10 Organization) I want to parametrize it in ADF so that dynamically it traverses through the folder.
Secondly i have one more activity which traverses the file name [ Get Metadata 2] , the problem is i am only able to fetch the folder name, but not the file name. I tried using field list . But seems like it only fetching the folder name.
The Field list has 3 parameters "Last Modified", "Item name"
Problem Statement:The JSON output for Item name is the folder name instead of file name
Due to data access restrictions i am unable to share the screenshot.
I tried using two metadata activity , one for capturing the count and other [Get Metadata2 ]inside for each which iterates through the list from Get Matadata1.
Upvotes: 0
Views: 850
Reputation: 11549
only on a current single day.. Lets say todays date is 30th so i should only check for files on 30th , if the file is not available then i should check for 29th and pick the one which is latest
If your requirement is only with ADF, you can try the below approach. But it's better to do this using code via Databricks or Functions if you have access to them. This is my sample folder structure:
inputdata
2023
04
19
Module
Module191.csv
Module192.csv
People
People191.csv
People192.csv
18
Module
Module181.csv
Module182.csv
People
People181.csv
People182.csv
03
YYYY/MM/DD
) by checking in utcnow()
and utcnow()-1
...etc in Get Meta data exists condition inside until loop.This is my Parent pipeline(pipeline2
) JSON:
{
"name": "pipeline2",
"properties": {
"activities": [
{
"name": "Counter intiliaze to 0",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Flag_to_false",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "counter",
"value": "0"
}
},
{
"name": "Flag_to_false",
"type": "SetVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "flag",
"value": false
}
},
{
"name": "Until gives latest date folders",
"type": "Until",
"dependsOn": [
{
"activity": "Counter intiliaze to 0",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@variables('flag')",
"type": "Expression"
},
"activities": [
{
"name": "get_date",
"type": "SetVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "get_date",
"value": {
"value": "@addDays(utcnow(), int(variables('counter')), 'yyyy/MM/dd')",
"type": "Expression"
}
}
},
{
"name": "Get Meta data exists",
"type": "GetMetadata",
"dependsOn": [
{
"activity": "get_date",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "sourcefiles",
"type": "DatasetReference",
"parameters": {
"folder": {
"value": "@variables('get_date')",
"type": "Expression"
}
}
},
"fieldList": [
"exists"
],
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "temp",
"type": "SetVariable",
"dependsOn": [
{
"activity": "set_flag",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "temp_counter",
"value": {
"value": "@string(sub(int(variables('counter')), 1))",
"type": "Expression"
}
}
},
{
"name": "increment_counter_using_temp",
"type": "SetVariable",
"dependsOn": [
{
"activity": "temp",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "counter",
"value": {
"value": "@variables('temp_counter')",
"type": "Expression"
}
}
},
{
"name": "set_flag",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Get Meta data exists",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "flag",
"value": {
"value": "@activity('Get Meta data exists').output.exists",
"type": "Expression"
}
}
}
],
"timeout": "0.12:00:00"
}
},
{
"name": "Get Childitems of date folders",
"type": "GetMetadata",
"dependsOn": [
{
"activity": "min modified date",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "sourcefiles",
"type": "DatasetReference",
"parameters": {
"folder": {
"value": "@variables('get_date')",
"type": "Expression"
}
}
},
"fieldList": [
"childItems"
],
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "ForEach ieterates childItems of dates",
"type": "ForEach",
"dependsOn": [
{
"activity": "Get Childitems of date folders",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Get Childitems of date folders').output.childItems",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Get Metadata for files",
"type": "GetMetadata",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "sourcefiles",
"type": "DatasetReference",
"parameters": {
"folder": {
"value": "@concat(variables('get_date'),'/',item().name)",
"type": "Expression"
}
}
},
"fieldList": [
"childItems"
],
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "Execute Pipeline1",
"type": "ExecutePipeline",
"dependsOn": [
{
"activity": "Get Metadata for files",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"pipeline": {
"referenceName": "pipeline3",
"type": "PipelineReference"
},
"waitOnCompletion": true,
"parameters": {
"folder_path": {
"value": "@concat(variables('get_date'),'/',item().name)",
"type": "Expression"
},
"filenames": {
"value": "@activity('Get Metadata for files').output.childItems",
"type": "Expression"
}
}
}
},
{
"name": "If Condition1",
"type": "IfCondition",
"dependsOn": [
{
"activity": "Execute Pipeline1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@greater(int(split(activity('Execute Pipeline1').output.pipelineReturnValue.file_path,'_')[0]), int(variables('LatestModifiedDate')))",
"type": "Expression"
},
"ifTrueActivities": [
{
"name": "Update lastmodified",
"type": "SetVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "LatestModifiedDate",
"value": {
"value": "@split(activity('Execute Pipeline1').output.pipelineReturnValue.file_path,'_')[0]",
"type": "Expression"
}
}
},
{
"name": "Update final file path",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Update lastmodified",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "final_files_path",
"value": {
"value": "@split(activity('Execute Pipeline1').output.pipelineReturnValue.file_path,'_')[1]",
"type": "Expression"
}
}
}
]
}
}
]
}
},
{
"name": "min modified date",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Until gives latest date folders",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "LatestModifiedDate",
"value": {
"value": "@addDays(utcnow(), -365,'yyyyMMddHHmmss')",
"type": "Expression"
}
}
}
],
"variables": {
"counter": {
"type": "String"
},
"flag": {
"type": "Boolean"
},
"get_date": {
"type": "String"
},
"req_date_path": {
"type": "String"
},
"temp_counter": {
"type": "String"
},
"final_files_path": {
"type": "String"
},
"LatestModifiedDate": {
"type": "String"
},
"mod_and_path": {
"type": "String"
}
},
"annotations": []
}
}
This is my Child pipeline(pipeline3
) JSON:
{
"name": "pipeline3",
"properties": {
"activities": [
{
"name": "iterates through file names",
"type": "ForEach",
"dependsOn": [
{
"activity": "min modified date child",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@pipeline().parameters.filenames",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Get Metadata for modifed date",
"type": "GetMetadata",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "Childgetmeta",
"type": "DatasetReference",
"parameters": {
"filename": {
"value": "@concat(pipeline().parameters.folder_path,'/',item().name)",
"type": "Expression"
}
}
},
"fieldList": [
"lastModified",
"itemName"
],
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "If Condition1",
"type": "IfCondition",
"dependsOn": [
{
"activity": "Get Metadata for modifed date",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@greater(int(formatDateTime(activity('Get Metadata for modifed date').output.lastModified,'yyyyMMddHHmmss')), int(variables('lmodified')))",
"type": "Expression"
},
"ifTrueActivities": [
{
"name": "update lmodified",
"type": "SetVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "lmodified",
"value": {
"value": "@formatDateTime(activity('Get Metadata for modifed date').output.lastModified,'yyyyMMddHHmmss')",
"type": "Expression"
}
}
},
{
"name": "concat modified and file path",
"type": "SetVariable",
"dependsOn": [
{
"activity": "update lmodified",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "mod_and_path",
"value": {
"value": "@concat(variables('lmodified'),'_',pipeline().parameters.folder_path,'/',item().name)",
"type": "Expression"
}
}
}
]
}
}
]
}
},
{
"name": "min modified date child",
"type": "SetVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "lmodified",
"value": {
"value": "@addDays(utcnow(), -365,'yyyyMMddHHmmss')",
"type": "Expression"
}
}
},
{
"name": "returning",
"type": "SetVariable",
"dependsOn": [
{
"activity": "iterates through file names",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "pipelineReturnValue",
"value": [
{
"key": "file_path",
"value": {
"type": "Expression",
"content": "@variables('mod_and_path')"
}
}
],
"setSystemVariable": true
}
}
],
"parameters": {
"folder_path": {
"type": "string"
},
"filenames": {
"type": "array"
}
},
"variables": {
"lmodified": {
"type": "String"
},
"mod_and_path": {
"type": "String"
}
},
"annotations": []
}
}
Use the above JSONs to build the pipelines and this is my Result file:
Upvotes: 0