DatenBergwerker
DatenBergwerker

Reputation: 193

ForEach Loop over Databricks Notebook Output - JSON too large

we have a Azure DataFactory (ADF) pipeline where the first is a Databricks (DB) notebook to poll a DB mounted FS for new files (usually 1 day delta, based on the "added" metadata field). We then do some filtering on that file list and pass it to a ForEach to begin the actual data cleaning / insertion pipeline. This works fine for the daily delta updates, but for a full ingest of all the historical data we run into an error from the Data Factory.

We pass the filtered file list from the first notebook as a json via dbutils.notebook.exit(file_list_dict), where file_list_dict is a Python dictionary containing the filtered paths as an array under a json key like this

{"file_list": [{"path": dbfs_filepath, "type": "File"}, ... ]

For the full ingestion ADF throws an error that json passed by DB notebooks cant exceed 20mb (because it would contain thousands of file paths) and fails the pipeline. I've tried writing the json to a file instead and making the ForEach operator loop over that, but I can't find the right way to do it. The documentation about ForEach only speaks of items from pipeline activities, which here seem to be out of the question since all our steps are essentially databricks notebooks. I've also tried to make an ADF dataset out of the json file I wrote to the FS and loop over that wit the lookup activity, but this also only supports 5k rows.

Is there a simple way to make ForEach loop over file rows that i just dont see?

Pipeline schematic:

<DB file poll notebook & filter> -> <ForEach Operator for file in filelist> -> <run pipeline for individual files>

Upvotes: 0

Views: 1635

Answers (2)

Doug
Doug

Reputation: 1

We had a similar issue (but kind of backwards as we are using ADF to poll the files and bring them into our Azure SQL DB). The connection between ADF and Databricks is via API's which is where the limitation is. We just did an Until inside of the ADF pipeline and passed a set number of files at a time (based on the size of the output json).

Basically Until there are no more files process them kind of thing. Here is a screenshot for the idea kind of.

[enter image description here][1]
[enter image description here][2]
[enter image description here][3]


[1]: https://i.sstatic.net/6RH5b.png
[2]: https://i.sstatic.net/pL6HN.png
[3]: https://i.sstatic.net/5Rf7s.png

Upvotes: 0

Rakesh Govindula
Rakesh Govindula

Reputation: 11454

As lookup has the limitation of 5000 rows, you can try the below workaround for this.

First, try to save your files list as JSON files to a folder of Blob storage with the size of 5000 or below in the databricks.

Then follow the demonstration below:

We can get the JSON files list of the folder using MetaData and ForEach. To loop over each JSON file we need another ForEach, but ForEach inside another ForEach is not supported.

But we can use Execute pipeline inside ForEach and we can use another ForEach in the child pipeline.

GetMeta Data inside Parent pipeline to list the JSON inside the folder:

enter image description here
ForEach of Parent pipeline:

check the sequential and give Meta Data output. @activity('Get Metadata for the JSON files list').output.childItems

enter image description here

For the JSON files create a dataset and give the folder name manually and for filename use the dataset parameter, which we can give the filename in the lookup inside the parent ForEach.

enter image description here

Lookup activity inside Parent ForEach:

give the file name as @string(item().name)

enter image description here

Execute Pipeline activity:

Before this create an array parameter in the child pipeline and pass the look up output inside ForEach to that in the Execute Pipeline activity.

enter image description here



Give look up output @activity('Lookup1').output.value

enter image description here

Now use the ForEach inside the Child Pipeline and give the array parameter to the ForEach as @pipeline().parameters.childparam

enter image description here

You can use which ever activity you want inside this ForEach, here I have used append.

enter image description here

Upvotes: 2

Related Questions