Reputation: 4623
I have a folder in S3 which contains Parquet data:
bucket_name/folder_name/YEAR/MONTH/DAY
eg:
s3://bucket_name/folder_name/2020/12/10
I am using Apache spark
on AWS EMR
to read the parquet files.
As the data is not partitioned, is there a way to implement predicate push down filter without partitioning the data?
What performance improvement best practices can be used.
Upvotes: 4
Views: 2112
Reputation: 1126
ALTER TABLE table_name ADD [IF NOT EXISTS]
PARTITION
(partition_col1_name = partition_col1_value
[,partition_col2_name = partition_col2_value]
[,...])
[LOCATION 'location1']
[PARTITION
(partition_colA_name = partition_colA_value
[,partition_colB_name = partition_colB_value
[,...])]
[LOCATION 'location2']
[,...]
Syntax: https://docs.aws.amazon.com/athena/latest/ug/alter-table-add-partition.html
-- Convert unpartitioned parquet table at path 'path/to/table'
CONVERT TO DELTA parquet.`path/to/table`
this will add a ./_delta_log folder and a smallish Delta Lake transaction log. Spark would then take advantage of the min-max values stored in the delta log to determine which files to skip over. This would let you skip over dates you're not interested (can be used much more broadly too) See thread: https://delta-users.slack.com/archives/CJ70UCSHM/p1602189649142400?thread_ts=1602098197.114400&cid=CJ70UCSHM
You'd need to include the Delta Lake spark package with your job(s), you'd gain ACID properties and much more.
Upvotes: 1
Reputation: 241
Spark can also use filter push down
to parquets
even if the data is not partitioned by the specific predicate. However You will mostly benefit from this if your data is organized in a way which the parquets metadata
will help understand if the data you are requesting is inside the parquet or not.
As an example I assume you have a date column and you didn’t partition by a date. As a result you have many files with different dates and you are looking for a specific date in your query so spark and parquet will filter this date while scanning/loading the data. It will help if you will for example sort your data based on this date and this way you will be able to load less files into memory (because less files meet the required filter push down in this way).
Your question is very general and depends on the usecase.
Upvotes: 0
Reputation: 4059
I'll describe my solution in the code:
import pyspark.sql.functions as f
from pyspark.shell import spark
# Read absolute path and put "/*/*/*" to read all partitions
df = spark.read.parquet("s3://bucket_name/folder_name/*/*/*")
# Get absolute file path
df = df.withColumn('path', f.split(f.input_file_name(), '/'))
# Slice path and recover year / month / day in an array
df = df.withColumn('year_month_day', f.slice(f.col('path'), -4, 3))
# Transform array values to respective columns
df = df.withColumn('year', f.col('year_month_day').getItem(0))
df = df.withColumn('month', f.col('year_month_day').getItem(1))
df = df.withColumn('day', f.col('year_month_day').getItem(2))
# Drop temporary columns
df = df.drop('path', 'year_month_day')
df.show()
# TODO : Make your transformations
# .
# .
# .
# Save partitioned by year, month and day (if you want)
# df.write.partitionBy('year', 'month', 'day').parquet('...')
My directory:
Output:
+--------+--------+----+-----+---+
|column_a|column_b|year|month|day|
+--------+--------+----+-----+---+
| hello_1| hello_2|2019| 06| 10|
| world_1| world_2|2020| 12| 31|
+--------+--------+----+-----+---+
Upvotes: 4
Reputation: 300
can't you renamed the folder structure to partition the data?
I believe that if you renamed the folder as:
s3://bucket_name/folder_name/year=2020/month=12/day=10
You can do sth like:
spark.read.parquet(" s3://bucket_name/folder_name/")
And the resulting dataframe will be partitioned by year/month/day
Upvotes: 0