Reputation: 354
Input: abc.tar.gz -> un tar - > Folder: abc
Folder structure of abc:
root folder: abc contains csv files that are generated from 100 cities every 5 minutes in day.
Number of csv files: 100 cities * 12 files per hour * 24 hours = 28800 csv files
abc/
city1_0005.csv
city1_0010.csv
..
city1_2355.csv
..
..
city2_0005.csv
city2_0010.csv
..
city2_2355.csv
..
..
city100_0005.csv
city100_0010.csv
Functional Requirement:
technical requirement: Read and process the files parallelly for better performance
I have developed below code to process data sequentially. I am looking for ways to optimize it .
staging_path="abfss://xyz/abc"
#using databricks utils to get the list of files in folder
filesProp = dbutils.fs.ls(staging_adls_path)
#extracting the city names from list of filenames
filesSet =set()
for file in filesProp:
filesSet.add(file.name.split('-')[0])
#empty list to store dataframes
dictionary_df = {}
#reading 1 city data and inserting to table
for fileName in filesSet:
filePath = staging_path+fileName+"*"
print(filePath)
dictionary_df[fileName] = spark.read.options(header='True', delimiter=',').csv(filePath)
dictionary_df[fileName].write.saveAsTable(fileName)
Upvotes: 1
Views: 1833
Reputation: 526
This is how I would solve this scenario
using a shell script to move city based csvs to / specific folders
This will ensure the files with same schema are under same root folder
/abc/
city1/
20211021/city1_0005
20211021/city1_0010
...
city2/
20211021/city2_0005
20211021/city2_0010
Since you are already on Azure and Databricks I would recommend you to use CloudFiles data format which will give you better performance while scanning raw files in parallel from your data lake compared to open source structured streaming + csv option
using structured streaming with foreachBatch() and Trigger(once=True) will process only incremental files since last execution maintaining details of processed files under checkpoint_location path
process_multiple_csvs_different_schema function accepts a microbatch and it will pick columns from respective csv file and write to corresponding city tables
from pyspark.sql import functions as F
tmp_db = "test_multiple_csv_schema"
spark.sql(f"create database if not exists {tmp_db}")
base_path = <your_base_mount_path_root_folder_for_csvs>
checkpoint_location = f"{base_path}/checkpoint/multiplecsvs"
input_path = f"{base_path}/multiplecsvs/"
schema_location = f"{base_path}/schema/multiplecsvs"
staging_checkpoint_path = f"{base_path}/staging/checkpoint/multiplecsvs"
staging_data_path = f"{base_path}/staging/data/multiplecsvs"
input_format = "csv"
def process_multiple_csvs_different_schema(batch_df):
df = (
batch_df
.withColumn("table",F.split(F.col("input_file_name"),"\.csv")[0])
.withColumn("table_path",F.split(F.col("table"),"/"))
.withColumn("table_name",F.split(F.col("table"),"/")[F.size(F.col("table_path"))-1])
.drop("table","table_path")
)
list_of_cities = df.select("table_name").distinct().collect()
list_of_cities = [city[0] for city in list_of_cities]
for city in list_of_cities:
print(f"processing data for {city}")
city_df = df.where(f"table_name='{city}'")
input_file_name = city_df.limit(1).select("input_file_name").collect()[0][0]
df_schema = spark.read.option("format",input_format).option("header",True).load(input_file_name,format=input_format)
select_columns = df_schema.columns
city_df.select(select_columns).withColumn("processed_time",F.current_timestamp()).write.option("mergeSchema",True).option("mode","append").format("delta").saveAsTable(f"{tmp_db}.{city}")
raw_df = (spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format",input_format)
.option("cloudFiles.schemaLocation",schema_location)
.load(input_path)
)
(
raw_df.withColumn("input_file_name",F.input_file_name())
.writeStream
.option("checkpointLocation",staging_checkpoint_path)
.option("mergeSchema",True)
.option("format","delta")
.outputMode("append")
.trigger(once=True)
.start(staging_data_path)
.awaitTermination()
)
staging_df = spark.readStream.format("delta").load(staging_data_path)
(
staging_df.writeStream
.option("checkpointLocation",checkpoint_location)
.option("format","delta")
.trigger(once=True)
.foreachBatch(lambda batch_df,batch_id:process_multiple_csvs_different_schema(batch_df))
.start()
.awaitTermination()
)
Upvotes: 1