Pawan B
Pawan B

Reputation: 4623

Predicate pushdown on non partitioned parquet data

I have a folder in S3 which contains Parquet data:

bucket_name/folder_name/YEAR/MONTH/DAY

eg:
s3://bucket_name/folder_name/2020/12/10

I am using Apache spark on AWS EMR to read the parquet files.

As the data is not partitioned, is there a way to implement predicate push down filter without partitioning the data?

What performance improvement best practices can be used.

Upvotes: 4

Views: 2112

Answers (4)

Douglas M
Douglas M

Reputation: 1126

  1. You could manually add partitions using the path to each day level subfolder. With this, you would not have to re-write your table, though your Metastore would end up with a lot of partition entries which could slow down your query startup.
ALTER TABLE table_name ADD [IF NOT EXISTS]
  PARTITION
  (partition_col1_name = partition_col1_value
  [,partition_col2_name = partition_col2_value]
  [,...])
  [LOCATION 'location1']
  [PARTITION
  (partition_colA_name = partition_colA_value
  [,partition_colB_name = partition_colB_value
  [,...])]
  [LOCATION 'location2']
  [,...]

Syntax: https://docs.aws.amazon.com/athena/latest/ug/alter-table-add-partition.html

  1. You could CONVERT the table to the Open Delta Lake format (http://delta.io)
-- Convert unpartitioned parquet table at path 'path/to/table'
CONVERT TO DELTA parquet.`path/to/table`

this will add a ./_delta_log folder and a smallish Delta Lake transaction log. Spark would then take advantage of the min-max values stored in the delta log to determine which files to skip over. This would let you skip over dates you're not interested (can be used much more broadly too) See thread: https://delta-users.slack.com/archives/CJ70UCSHM/p1602189649142400?thread_ts=1602098197.114400&cid=CJ70UCSHM

You'd need to include the Delta Lake spark package with your job(s), you'd gain ACID properties and much more.

  1. You could migrate from EMR to Databricks to access many more performance improvements.

Upvotes: 1

Zohar Stiro
Zohar Stiro

Reputation: 241

Spark can also use filter push down to parquets even if the data is not partitioned by the specific predicate. However You will mostly benefit from this if your data is organized in a way which the parquets metadata will help understand if the data you are requesting is inside the parquet or not.

As an example I assume you have a date column and you didn’t partition by a date. As a result you have many files with different dates and you are looking for a specific date in your query so spark and parquet will filter this date while scanning/loading the data. It will help if you will for example sort your data based on this date and this way you will be able to load less files into memory (because less files meet the required filter push down in this way).

Your question is very general and depends on the usecase.

Upvotes: 0

Kafels
Kafels

Reputation: 4059

I'll describe my solution in the code:

import pyspark.sql.functions as f
from pyspark.shell import spark

# Read absolute path and put "/*/*/*" to read all partitions
df = spark.read.parquet("s3://bucket_name/folder_name/*/*/*")

# Get absolute file path
df = df.withColumn('path', f.split(f.input_file_name(), '/'))

# Slice path and recover year / month / day in an array
df = df.withColumn('year_month_day', f.slice(f.col('path'), -4, 3))

# Transform array values to respective columns
df = df.withColumn('year', f.col('year_month_day').getItem(0))
df = df.withColumn('month', f.col('year_month_day').getItem(1))
df = df.withColumn('day', f.col('year_month_day').getItem(2))

# Drop temporary columns
df = df.drop('path', 'year_month_day')

df.show()

# TODO : Make your transformations
# .
# .
# .
# Save partitioned by year, month and day (if you want)
# df.write.partitionBy('year', 'month', 'day').parquet('...')

My directory:

Directory

Output:

+--------+--------+----+-----+---+
|column_a|column_b|year|month|day|
+--------+--------+----+-----+---+
| hello_1| hello_2|2019|   06| 10|
| world_1| world_2|2020|   12| 31|
+--------+--------+----+-----+---+

Upvotes: 4

Pato Navarro
Pato Navarro

Reputation: 300

can't you renamed the folder structure to partition the data?

I believe that if you renamed the folder as:

s3://bucket_name/folder_name/year=2020/month=12/day=10

You can do sth like:

spark.read.parquet(" s3://bucket_name/folder_name/")

And the resulting dataframe will be partitioned by year/month/day

Upvotes: 0

Related Questions