shi
shi

Reputation: 19

Getting the latest file from a hierarchical folder in ADF( Azure Data Factory)

I have a ADLS container, and my data set is stored in hierarchical manner inside folder( YYYY/MM/DD/Module )

Example: 2023 04 10 Organization File1.Txt File2.txt

2023 04 10 People File3.Txt File4.txt

Ask: I want to pick the latest file from the file container Using Azure Data Factory.

For testing, Currently I have a metadata activity which points to the static folder above (2023 04 10 Organization) I want to parametrize it in ADF so that dynamically it traverses through the folder.

Secondly i have one more activity which traverses the file name [ Get Metadata 2] , the problem is i am only able to fetch the folder name, but not the file name. I tried using field list . But seems like it only fetching the folder name.

The Field list has 3 parameters "Last Modified", "Item name"

Problem Statement:The JSON output for Item name is the folder name instead of file name

Due to data access restrictions i am unable to share the screenshot.

I tried using two metadata activity , one for capturing the count and other [Get Metadata2 ]inside for each which iterates through the list from Get Matadata1.

Upvotes: 0

Views: 850

Answers (1)

Rakesh Govindula
Rakesh Govindula

Reputation: 11549

only on a current single day.. Lets say todays date is 30th so i should only check for files on 30th , if the file is not available then i should check for 29th and pick the one which is latest

If your requirement is only with ADF, you can try the below approach. But it's better to do this using code via Databricks or Functions if you have access to them. This is my sample folder structure:

inputdata
    2023
        04
            19
                Module
                    Module191.csv
                    Module192.csv
                People
                    People191.csv
                    People192.csv
            18
                Module
                    Module181.csv
                    Module182.csv
                People
                    People181.csv
                    People182.csv
        03
  • In this first I got the latest date folder path(YYYY/MM/DD) by checking in utcnow() and utcnow()-1...etc in Get Meta data exists condition inside until loop.
  • Then using that date path, I got the child items(sub folders) and given to a ForEach. Inside ForEach for every subfolder I have used another Get Meta for child items(file names inside sub folder).
  • Here, to iterate these child items, we need another ForEach. But nested loops are not supported in ADF currenlty. So, use another pipeline with Execute pipeline activity.
  • In another pipeline, pass this child items to for each activity. Inside ForEach use Get Meta data activity( for file names) to get the Last modified date of file name. Then Get the latest file using if activity.
  • Outside the ForEach, return the last modfied date and file path to parent pipeline.
  • Here, the child pipeline is used to get the latest file and its date from every sub folder.
  • Using that return values in every iteration, get the latest file based on thier dates in the parent pipeline.
  • It means parent pipeline compares the latest files from every subfolder and gives the latest file in the total heirarchy.

This is my Parent pipeline(pipeline2) JSON:

{
    "name": "pipeline2",
    "properties": {
        "activities": [
            {
                "name": "Counter intiliaze to 0",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "Flag_to_false",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "variableName": "counter",
                    "value": "0"
                }
            },
            {
                "name": "Flag_to_false",
                "type": "SetVariable",
                "dependsOn": [],
                "userProperties": [],
                "typeProperties": {
                    "variableName": "flag",
                    "value": false
                }
            },
            {
                "name": "Until gives latest date folders",
                "type": "Until",
                "dependsOn": [
                    {
                        "activity": "Counter intiliaze to 0",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "expression": {
                        "value": "@variables('flag')",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "get_date",
                            "type": "SetVariable",
                            "dependsOn": [],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "get_date",
                                "value": {
                                    "value": "@addDays(utcnow(), int(variables('counter')), 'yyyy/MM/dd')",
                                    "type": "Expression"
                                }
                            }
                        },
                        {
                            "name": "Get Meta data exists",
                            "type": "GetMetadata",
                            "dependsOn": [
                                {
                                    "activity": "get_date",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "dataset": {
                                    "referenceName": "sourcefiles",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "folder": {
                                            "value": "@variables('get_date')",
                                            "type": "Expression"
                                        }
                                    }
                                },
                                "fieldList": [
                                    "exists"
                                ],
                                "storeSettings": {
                                    "type": "AzureBlobFSReadSettings",
                                    "enablePartitionDiscovery": false
                                },
                                "formatSettings": {
                                    "type": "DelimitedTextReadSettings"
                                }
                            }
                        },
                        {
                            "name": "temp",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "set_flag",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "temp_counter",
                                "value": {
                                    "value": "@string(sub(int(variables('counter')), 1))",
                                    "type": "Expression"
                                }
                            }
                        },
                        {
                            "name": "increment_counter_using_temp",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "temp",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "counter",
                                "value": {
                                    "value": "@variables('temp_counter')",
                                    "type": "Expression"
                                }
                            }
                        },
                        {
                            "name": "set_flag",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "Get Meta data exists",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "flag",
                                "value": {
                                    "value": "@activity('Get Meta data exists').output.exists",
                                    "type": "Expression"
                                }
                            }
                        }
                    ],
                    "timeout": "0.12:00:00"
                }
            },
            {
                "name": "Get Childitems of date folders",
                "type": "GetMetadata",
                "dependsOn": [
                    {
                        "activity": "min modified date",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "dataset": {
                        "referenceName": "sourcefiles",
                        "type": "DatasetReference",
                        "parameters": {
                            "folder": {
                                "value": "@variables('get_date')",
                                "type": "Expression"
                            }
                        }
                    },
                    "fieldList": [
                        "childItems"
                    ],
                    "storeSettings": {
                        "type": "AzureBlobFSReadSettings",
                        "enablePartitionDiscovery": false
                    },
                    "formatSettings": {
                        "type": "DelimitedTextReadSettings"
                    }
                }
            },
            {
                "name": "ForEach ieterates childItems of dates",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "Get Childitems of date folders",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@activity('Get Childitems of date folders').output.childItems",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "Get Metadata for files",
                            "type": "GetMetadata",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "dataset": {
                                    "referenceName": "sourcefiles",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "folder": {
                                            "value": "@concat(variables('get_date'),'/',item().name)",
                                            "type": "Expression"
                                        }
                                    }
                                },
                                "fieldList": [
                                    "childItems"
                                ],
                                "storeSettings": {
                                    "type": "AzureBlobFSReadSettings",
                                    "enablePartitionDiscovery": false
                                },
                                "formatSettings": {
                                    "type": "DelimitedTextReadSettings"
                                }
                            }
                        },
                        {
                            "name": "Execute Pipeline1",
                            "type": "ExecutePipeline",
                            "dependsOn": [
                                {
                                    "activity": "Get Metadata for files",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "pipeline": {
                                    "referenceName": "pipeline3",
                                    "type": "PipelineReference"
                                },
                                "waitOnCompletion": true,
                                "parameters": {
                                    "folder_path": {
                                        "value": "@concat(variables('get_date'),'/',item().name)",
                                        "type": "Expression"
                                    },
                                    "filenames": {
                                        "value": "@activity('Get Metadata for files').output.childItems",
                                        "type": "Expression"
                                    }
                                }
                            }
                        },
                        {
                            "name": "If Condition1",
                            "type": "IfCondition",
                            "dependsOn": [
                                {
                                    "activity": "Execute Pipeline1",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "expression": {
                                    "value": "@greater(int(split(activity('Execute Pipeline1').output.pipelineReturnValue.file_path,'_')[0]), int(variables('LatestModifiedDate')))",
                                    "type": "Expression"
                                },
                                "ifTrueActivities": [
                                    {
                                        "name": "Update lastmodified",
                                        "type": "SetVariable",
                                        "dependsOn": [],
                                        "userProperties": [],
                                        "typeProperties": {
                                            "variableName": "LatestModifiedDate",
                                            "value": {
                                                "value": "@split(activity('Execute Pipeline1').output.pipelineReturnValue.file_path,'_')[0]",
                                                "type": "Expression"
                                            }
                                        }
                                    },
                                    {
                                        "name": "Update final file path",
                                        "type": "SetVariable",
                                        "dependsOn": [
                                            {
                                                "activity": "Update lastmodified",
                                                "dependencyConditions": [
                                                    "Succeeded"
                                                ]
                                            }
                                        ],
                                        "userProperties": [],
                                        "typeProperties": {
                                            "variableName": "final_files_path",
                                            "value": {
                                                "value": "@split(activity('Execute Pipeline1').output.pipelineReturnValue.file_path,'_')[1]",
                                                "type": "Expression"
                                            }
                                        }
                                    }
                                ]
                            }
                        }
                    ]
                }
            },
            {
                "name": "min modified date",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "Until gives latest date folders",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "variableName": "LatestModifiedDate",
                    "value": {
                        "value": "@addDays(utcnow(), -365,'yyyyMMddHHmmss')",
                        "type": "Expression"
                    }
                }
            }
        ],
        "variables": {
            "counter": {
                "type": "String"
            },
            "flag": {
                "type": "Boolean"
            },
            "get_date": {
                "type": "String"
            },
            "req_date_path": {
                "type": "String"
            },
            "temp_counter": {
                "type": "String"
            },
            "final_files_path": {
                "type": "String"
            },
            "LatestModifiedDate": {
                "type": "String"
            },
            "mod_and_path": {
                "type": "String"
            }
        },
        "annotations": []
    }
}

This is my Child pipeline(pipeline3) JSON:

{
    "name": "pipeline3",
    "properties": {
        "activities": [
            {
                "name": "iterates through file names",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "min modified date child",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@pipeline().parameters.filenames",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "Get Metadata for modifed date",
                            "type": "GetMetadata",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "dataset": {
                                    "referenceName": "Childgetmeta",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "filename": {
                                            "value": "@concat(pipeline().parameters.folder_path,'/',item().name)",
                                            "type": "Expression"
                                        }
                                    }
                                },
                                "fieldList": [
                                    "lastModified",
                                    "itemName"
                                ],
                                "storeSettings": {
                                    "type": "AzureBlobFSReadSettings",
                                    "enablePartitionDiscovery": false
                                },
                                "formatSettings": {
                                    "type": "DelimitedTextReadSettings"
                                }
                            }
                        },
                        {
                            "name": "If Condition1",
                            "type": "IfCondition",
                            "dependsOn": [
                                {
                                    "activity": "Get Metadata for modifed date",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "expression": {
                                    "value": "@greater(int(formatDateTime(activity('Get Metadata for modifed date').output.lastModified,'yyyyMMddHHmmss')), int(variables('lmodified')))",
                                    "type": "Expression"
                                },
                                "ifTrueActivities": [
                                    {
                                        "name": "update lmodified",
                                        "type": "SetVariable",
                                        "dependsOn": [],
                                        "userProperties": [],
                                        "typeProperties": {
                                            "variableName": "lmodified",
                                            "value": {
                                                "value": "@formatDateTime(activity('Get Metadata for modifed date').output.lastModified,'yyyyMMddHHmmss')",
                                                "type": "Expression"
                                            }
                                        }
                                    },
                                    {
                                        "name": "concat modified and file path",
                                        "type": "SetVariable",
                                        "dependsOn": [
                                            {
                                                "activity": "update lmodified",
                                                "dependencyConditions": [
                                                    "Succeeded"
                                                ]
                                            }
                                        ],
                                        "userProperties": [],
                                        "typeProperties": {
                                            "variableName": "mod_and_path",
                                            "value": {
                                                "value": "@concat(variables('lmodified'),'_',pipeline().parameters.folder_path,'/',item().name)",
                                                "type": "Expression"
                                            }
                                        }
                                    }
                                ]
                            }
                        }
                    ]
                }
            },
            {
                "name": "min modified date child",
                "type": "SetVariable",
                "dependsOn": [],
                "userProperties": [],
                "typeProperties": {
                    "variableName": "lmodified",
                    "value": {
                        "value": "@addDays(utcnow(), -365,'yyyyMMddHHmmss')",
                        "type": "Expression"
                    }
                }
            },
            {
                "name": "returning",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "iterates through file names",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "variableName": "pipelineReturnValue",
                    "value": [
                        {
                            "key": "file_path",
                            "value": {
                                "type": "Expression",
                                "content": "@variables('mod_and_path')"
                            }
                        }
                    ],
                    "setSystemVariable": true
                }
            }
        ],
        "parameters": {
            "folder_path": {
                "type": "string"
            },
            "filenames": {
                "type": "array"
            }
        },
        "variables": {
            "lmodified": {
                "type": "String"
            },
            "mod_and_path": {
                "type": "String"
            }
        },
        "annotations": []
    }
}

Use the above JSONs to build the pipelines and this is my Result file:

enter image description here

Upvotes: 0

Related Questions