Igor Uchôa
Igor Uchôa

Reputation: 379

Read a physically partitioned data using wildcards

I have a persisted dataframe on AWS S3 with the following structure:

s3://bucket/df/
|__ date=2020-02-19/
    |__ FILENAME01.json
    |__ FILENAME02.json
    |__ FILENAME03
    |__ ...
|__ date=2020-02-20/
    |__ FILENAME04.json
    |__ FILENAME05
    |__ ...
|__ ...

If I read this dataframe using the following syntax:

df = spark.read.json("s3://bucket/df)"

The file that doesn't have extensions will be part of my dataframe and this is not desirable. So, I would like to only consider the files with .json extension.

So I decided to read this dataframe filtering the files with suffix *.json. In a practical way, I tried the following:

df = spark.read.json("s3://bucket/df/date=*/*.json")

The spark.read.json worked, but the column date used to partition my dataframe wasn't present. Is there any way to retrieve only files that match to a specific suffix inside partition folders, without losing the partition column?

Upvotes: 2

Views: 2579

Answers (1)

blackbishop
blackbishop

Reputation: 32640

Spark can only discover partitions under the given input path. But here your path contains already the partition date. You can read this from the docs :

Starting from Spark 1.6.0, partition discovery only finds partitions under the given paths by default. For the above example, if users pass path/to/table/gender=male to either SparkSession.read.parquet or SparkSession.read.load, gender will not be considered as a partitioning column. If users need to specify the base path that partition discovery should start with, they can set basePath in the data source options. For example, when path/to/table/gender=male is the path of the data and users set basePath to path/to/table/, gender will be a partitioning column.

You can specify the basePath option :

df = spark.read.option("basePath", "s3://bucket/df/").json("s3://bucket/df/date=*/*.json")

Or alternatively, you can extract back the date partition from the input_file_name using regexp_extract function:

from pyspark.sql import functions as F

df = df.withColumn(
    "date",
    F.regexp_extract(F.input_file_name(), r".*/date=(\d{4}-\d{2}-\d{2})/.*", 1)
)

Upvotes: 3

Related Questions