Reputation: 3
I'm new to Spark, and I'm trying to achieve the below problem.
Scenario
I am trying to read multiple parquet files (and csv files as well, if possible later on) and load them into single spark dataframe in Python for specific range of dates, I'll explain the condition for selecting the dates later.
Reason: Schema evolution - new columns added in the recent/latest partition, so Union is not possible or I'm unaware. If there is a way to do Union in efficient way, please let me know about that as well.
Files look like this:
s3://dir1/dir2/dir3/files/partition_date=2020-12-25/
# Partitions do not exist for weekend days, i.e., Saturday and Sunday
s3://dir1/dir2/dir3/files/partition_date=2020-12-28/
s3://dir1/dir2/dir3/files/partition_date=2020-12-29/
s3://dir1/dir2/dir3/files/partition_date=2020-12-30/ # Consider this file with new columns in it
s3://dir1/dir2/dir3/files/partition_date=2020-12-31/ # Consider this file with new columns in it
Parquet (and csv, for different folder) resides in each of these files look like this:
s3://dir1/dir2/dir3/files/partition_date=2020-12-31/data_2020-12-31.parquet
Before schema changed, I used to load everything (all of the partitions) that existed in folder s3://dir1/dir2/dir3/files
and then inserting the data into single Spark dataframe using below:
spark_df = spark.read.format('parquet').load('s3://dir1/dir2/dir3/files')
But now, I want to pull files just from specific dates, as specific range of dates won't work because of missing partitions. So I created the list
using the for
loop to just check which partitions exist. This list contains strings of all the dates for which partition exists.
dates = ['2020-12-25','2020-12-26','2020-12-27','2020-12-28','2020-12-29','2020-12-30','2020-12-31']
# I'll retrieve these dates by other efficient ways later on
existing_dates = []
# 'for' loop implementation
existing_dates = ['2020-12-25','2020-12-28','2020-12-29','2020-12-30','2020-12-31']
So here's the task for me:
existing_dates
Upvotes: 0
Views: 1039
Reputation: 9363
base_path = 's3://dir1/dir2/dir3/files'
# Note 1: Extra {{ is to add literal {.
# Note 2: Reading by partitions removes the partition column (partition_date) in returned dataframe by default.
# To keep the partition_date column, add basePath option to set your parquet data path.
df = (spark.read
.option('basePath', base_path)
.parquet(f'{base_path}/partition_date={{{",".join(existing_dates)}}}')
# f'{base_path}/partition_date={{{",".join(existing_dates)}}}' =
# s3://dir1/dir2/dir3/files/partition_date={2020-12-25,2020-12-28,...}
FYI, other syntax [] can do range capturing.
s3://dir1/dir2/dir3/files/partition_date=2020-12-2[5-8]
will capture partitions for 2020-12-25, 2020-12-26, 2020-12-27, 2020-12-28.
mergeSchema
option to align all columns.df = (spark.read
.option('basePath', base_path)
.option('mergeSchema', 'true') # this should handle missing columns
.parquet(f'{base_path}/partition_date={{{",".join(existing_dates)}}}')
Upvotes: 1