LightningStack575
LightningStack575

Reputation: 31

What is the best way to transform many small files to Delta table in pyspark?

I have many small CSV files for each day with same schema. I would like to merge all of these files to a Delta table. Ideally it should only take the new files in the folder and append it to the Delta table.

Unfortunately I'm not sure how, firstly, append all the small files to a Delta table the best way using pyspark and secondly how to only chose the newest CSV files in the folder. Can someone understanding the bronze, silver, gold layers of delta lake help me? Thanks!

Upvotes: 1

Views: 1183

Answers (1)

Anjaneya Tripathi
Anjaneya Tripathi

Reputation: 1459

While you have not mentioned I am assuming that you will running this ingestion databricks (DBR 9.1+) and files are in azure delta lake gen 2 based on your tags

So first about ingesting many small files there is not much optimization that we can do while ingesting only after loading it we can combine or do something with it , next is for keeping track of files in folder and loading only new files from it we can use Auto Loader from databricks as below

spark.readStream
                  .format("cloudFiles")
                  .option("cloudFiles.format", "csv")
                  .option("cloudFiles.schemaLocation", f"{some_local_path}")
                  .option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
                  .load("path_to_your_files")
                  .writeStream
                  .option("checkpointLocation", f"{some_local_path2}")
                  .trigger(availableNow=True)
                  .table("your_delta_table_name")

Here using format cloudfiles we are using Auto loader functionality of databricks , as it supports schema evolution of files we need to mention some schemalocation, schema hints helps in determining default schema of files but schema is not restricted to it while checkpoint location is location where it will keep track of files which are already processed and files which are newly added to be processed

Note that this works based on file names so for example if you had 10 files first day you rand code and 12 files the next day it will check the names of files in checkpoint location and load only 2 files next time whose name is not in the list

Sample Auto loader notebook by databricks here

Upvotes: 1

Related Questions