AswinRajaram
AswinRajaram

Reputation: 1622

Reading Spark Dataframe from Partitioned Parquet data

I have parquet data stored on S3 and Athena table partitioned by id and date. The parquet files are stored in

 s3://bucket_name/table_name/id=x/date=y/

The parquet file contains the partition columns in them (id, date), because of which I am not able to read them using AWS Glue.

I would like to read the data in only a few partitions and hence I am making use of partition predicate as follows:

today = date.today()
yesterday = today - timedelta(days = 1)

predicate = "date = date '" + str(yesterday) +"'"

df =glueContext.create_dynamic_frame_from_catalog(database_name, table_name, push_down_predicate= predicate)

However, since the files already contain the partition columns, I am getting the below error:

AnalysisException: Found duplicate column(s) in the data schema and the partition schema: id, date

Is there a way I can read data from only a few partitions like this? Can I somehow read the data by ignoring id and date columns?

Any sort of help is appreciated :)

Upvotes: 1

Views: 6518

Answers (1)

Cribber
Cribber

Reputation: 2913

Concerning your first question 'Is there a way I can read data from only a few partitions like this?':

You don't need to use predicate in my opinion - the beauty of having partitioned parquet files is that Spark will push any filter which is applied along those partitions down to the file scanning phase. Meaning that Spark is able to skip certain groups by just reading the metadata of the parquet files.

Have a look at the physical execution plan once you execute a df = spark.read()and df.filter(col("date") == '2022-07-19').

You should find something along the lines of

+- FileScan parquet [idxx, ... PushedFilters: [IsNotNull(date), EqualTo(date, 2022-07-19)..

Concerning whether you can read the data by ignoring id and date columns: You can potentially add multiple parquet paths to the read function at the bottom level - which would ignore the date/id columns alltogether (I don't know why you would do that though if you need to filter on them):

df = spark.read.parquet(
       "file:///your/path/date=2022-07-19/id=55/", 
       "file:///your/path/date=2022-07-19/id=40/") 

# Shorter solution: 
df = spark.read.parquet(
       "file:///your/path/date=2022-07-19/id={55, 40}/*")

Upvotes: 4

Related Questions