B. Bogart
B. Bogart

Reputation: 1075

Load files in order with Databricks autoloader

I'm trying to write a python pipeline in Databricks to take CDC data from a postgres, dumped by DMS into s3 as parquet files and ingest it. The file names are numerically ascending unique ids based on datatime (ie 20220630-215325970.csv). Right now autoloader seems to fetch all files at the source in random order. This means that updates to rows in DB may not happen in the correct order.

Here is a screenshot with an example. Update, 1, 2, and 3 were entered sequentially after all other displayed records but they appear in the df in the order below (column 4). enter image description here

I've tried using the latestFirst option to see if I can get the files processed in a predictable order but that option doesn't seem to have any effect.

Here is my code:

dfp = (spark
      .readStream
      .format("cloudFiles")
      .option("cloudfiles.format","parquet")
      .option("latestFirst", 'false') # this doesn't do anything
      .schema(schema)
      .load(filePath))

display(dfp)

Is there a way to load and write files in order by filename or date using autoloader?

Upvotes: 3

Views: 1355

Answers (1)

Shahrukh lodhi
Shahrukh lodhi

Reputation: 394

Autoloader asynchronously discovers and processes the files which made it hard to control the file ingestion sequence.

Workaround:
You can use the following spark option "input_file_name()" to get the file name. This option will create a new column in your dataset having the name of a file that has current row data. Based on your file name which also contains a date you can develop a logic to process files in sequence by date.

from pyspark.sql.functions import input_file_name, current_timestamp

dfp = (spark
      .readStream
      .format("cloudFiles")
      .option("cloudfiles.format","parquet")
      .option("latestFirst", 'false') # this doesn't do anything
      .schema(schema)
      .load(filePath)
      .select(
            "*",
            input_file_name().alias("meta_source_file"),
            current_timestamp().alias("meta_processing_time"),
        )

      )

display(dfp)

Upvotes: 1

Related Questions