nmr
nmr

Reputation: 763

Reading Hive table partitioned files dynamically in Pyspark

I have a variable in Pyspark like below

range = (
        (load_date='2020-06-10' AND critical in ('2', '3', '4') OR 
        (load_date='2020-06-11' AND critical in ('1', '2', '3', '4') OR
        (load_date='2020-06-12' AND critical in ('1', '2', '3', '4') OR 
        (load_date='2020-06-13' AND critical in ('1', '2', '3') 
        )

Now I want to read hive table partitioned data files that are in format like below

/user/$USER/load_date=2020-06-10/critical=2/some_files
/user/$USER/load_date=2020-06-10/critical=3/some_files
/user/$USER/load_date=2020-06-10/critical=4/some_files
/user/$USER/load_date=2020-06-11/critical=1/some_files
/user/$USER/load_date=2020-06-11/critical=2/some_files
/user/$USER/load_date=2020-06-11/critical=3/some_files
/user/$USER/load_date=2020-06-11/critical=4/some_files
/user/$USER/load_date=2020-06-12/critical=1/some_files
/user/$USER/load_date=2020-06-12/critical=2/some_files
/user/$USER/load_date=2020-06-12/critical=3/some_files
/user/$USER/load_date=2020-06-12/critical=4/some_files
/user/$USER/load_date=2020-06-13/critical=1/some_files
/user/$USER/load_date=2020-06-13/critical=2/some_files
/user/$USER/load_date=2020-06-13/critical=3/some_files

As I have access only to hdfs path of the hive tables but not the actual table I cannot create a data frame directly from the hive tables

Now using these files I want to create a data frame like below.

df = spark.read
        .option("basePath", "/user/$USER")
        .parquet("/user/$USER/load_date=2020-06-10/critical=2/some_files", 
                "/user/$USER/load_date=2020-06-10/critical=3/some_files", 
                "/user/$USER/load_date=2020-06-10/critical=4/some_files", 
                "/user/$USER/load_date=2020-06-11/critical=1/some_files", 
                "/user/$USER/load_date=2020-06-11/critical=2/some_files", 
                "/user/$USER/load_date=2020-06-11/critical=3/some_files", 
                "/user/$USER/load_date=2020-06-11/critical=4/some_files", 
                "/user/$USER/load_date=2020-06-12/critical=1/some_files", 
                "/user/$USER/load_date=2020-06-12/critical=2/some_files", 
                "/user/$USER/load_date=2020-06-12/critical=3/some_files", 
                "/user/$USER/load_date=2020-06-12/critical=4/some_files", 
                "/user/$USER/load_date=2020-06-13/critical=1/some_files", 
                "/user/$USER/load_date=2020-06-13/critical=2/some_files", 
                "/user/$USER/load_date=2020-06-13/critical=3/some_files")

I am able to create the data frame but passing all the file paths in this way is a tedious and not an optimal way right. I want to do this in a simpler dynamic way

I have tried like below

df = spark.read
        .option("basePath", "/user/$USER")
        .parquet("/user/$USER/load_date=*/critical=*")

The above also works and creates the data frame.

But the problem for me is that If I use the above then I may have more data which I don't require.

For example If I have a file path

"/user/$USER/load_date=2020-06-13/critical=4/some_files"

I don't want to read that files present in the above path for now.

requirement

using the range variable extract the load_date and critical values and read only those file paths

How can I achieve that?

Upvotes: 0

Views: 1419

Answers (1)

Som
Som

Reputation: 6338

You can read the root directory using spark.read.parquet(<root-dir>) and then apply where clause. This will push the predicate in the wehreclause to source path.

Let's understand this with an example- Try this-

1. create the files partitioned by batch_id

This is how it looks in my case /Users/sokale/models/run_1/batch_id=73/part-00001-5fa5aebb-a836-43d2-97d2-7cf9bb722c26.c000.snappy.parquet

  val df = spark.range(1,5)
      .withColumn("batch_id", lit(70) + col("id"))

    df.show(false)
    df.printSchema()

    /**
      * +---+--------+
      * |id |batch_id|
      * +---+--------+
      * |1  |71      |
      * |2  |72      |
      * |3  |73      |
      * |4  |74      |
      * +---+--------+
      *
      * root
      * |-- id: long (nullable = false)
      * |-- batch_id: long (nullable = false)
      */

    df.write.partitionBy("batch_id")
      .mode(SaveMode.Overwrite)
      .parquet("/Users/sokale/models/run_1")
    /**
      * $ cd run_1/
      * $ ls -l
      * total 0
      * ............ _SUCCESS
      * ............ batch_id=71
      * ............ batch_id=72
      * ............ batch_id=73
      * ............ batch_id=74
      */

2. Read the parquet file for partition barch_id=73

From the spark doc-

spark.sql.parquet.filterPushdown    default-true    Enables Parquet filter push-down optimization when set to true.

that means df.read.parquet(dir).where(partitionCOndition) only read the the specified partitioned directory using filter push down

 // read only file with batch_id=73
    spark.read.parquet("/Users/sokale/models/run_1").where(col("batch_id").equalTo(73))
      .show(false)

    /**
      * +---+--------+
      * |id |batch_id|
      * +---+--------+
      * |3  |73      |
      * +---+--------+
      */
    // read all partitions
    val readDF = spark.read.parquet("/Users/sokale/models/run_1")
    readDF.show(false)
    readDF.printSchema()

    /**
      * +---+--------+
      * |id |batch_id|
      * +---+--------+
      * |3  |73      |
      * |2  |72      |
      * |1  |71      |
      * |4  |74      |
      * +---+--------+
      *
      * root
      * |-- id: long (nullable = true)
      * |-- batch_id: integer (nullable = true)
      */

Upvotes: 1

Related Questions