Reputation: 763
I have a variable in Pyspark
like below
range = (
(load_date='2020-06-10' AND critical in ('2', '3', '4') OR
(load_date='2020-06-11' AND critical in ('1', '2', '3', '4') OR
(load_date='2020-06-12' AND critical in ('1', '2', '3', '4') OR
(load_date='2020-06-13' AND critical in ('1', '2', '3')
)
Now I want to read hive
table partitioned data files that are in format like below
/user/$USER/load_date=2020-06-10/critical=2/some_files
/user/$USER/load_date=2020-06-10/critical=3/some_files
/user/$USER/load_date=2020-06-10/critical=4/some_files
/user/$USER/load_date=2020-06-11/critical=1/some_files
/user/$USER/load_date=2020-06-11/critical=2/some_files
/user/$USER/load_date=2020-06-11/critical=3/some_files
/user/$USER/load_date=2020-06-11/critical=4/some_files
/user/$USER/load_date=2020-06-12/critical=1/some_files
/user/$USER/load_date=2020-06-12/critical=2/some_files
/user/$USER/load_date=2020-06-12/critical=3/some_files
/user/$USER/load_date=2020-06-12/critical=4/some_files
/user/$USER/load_date=2020-06-13/critical=1/some_files
/user/$USER/load_date=2020-06-13/critical=2/some_files
/user/$USER/load_date=2020-06-13/critical=3/some_files
As I have access only to hdfs
path of the hive tables but not the actual table I cannot create a data frame directly from the hive
tables
Now using these files I want to create a data frame like below.
df = spark.read
.option("basePath", "/user/$USER")
.parquet("/user/$USER/load_date=2020-06-10/critical=2/some_files",
"/user/$USER/load_date=2020-06-10/critical=3/some_files",
"/user/$USER/load_date=2020-06-10/critical=4/some_files",
"/user/$USER/load_date=2020-06-11/critical=1/some_files",
"/user/$USER/load_date=2020-06-11/critical=2/some_files",
"/user/$USER/load_date=2020-06-11/critical=3/some_files",
"/user/$USER/load_date=2020-06-11/critical=4/some_files",
"/user/$USER/load_date=2020-06-12/critical=1/some_files",
"/user/$USER/load_date=2020-06-12/critical=2/some_files",
"/user/$USER/load_date=2020-06-12/critical=3/some_files",
"/user/$USER/load_date=2020-06-12/critical=4/some_files",
"/user/$USER/load_date=2020-06-13/critical=1/some_files",
"/user/$USER/load_date=2020-06-13/critical=2/some_files",
"/user/$USER/load_date=2020-06-13/critical=3/some_files")
I am able to create the data frame but passing all the file paths in this way is a tedious and not an optimal way right. I want to do this in a simpler dynamic way
I have tried like below
df = spark.read
.option("basePath", "/user/$USER")
.parquet("/user/$USER/load_date=*/critical=*")
The above also works and creates the data frame.
But the problem for me is that If I use the above then I may have more data which I don't require.
For example If I have a file path
"/user/$USER/load_date=2020-06-13/critical=4/some_files"
I don't want to read that files present in the above path for now.
requirement
using the range
variable extract the load_date
and critical
values and read only those file paths
How can I achieve that?
Upvotes: 0
Views: 1419
Reputation: 6338
You can read the root directory using spark.read.parquet(<root-dir>)
and then apply where
clause. This will push the predicate in the wehreclause to source path.
Let's understand this with an example- Try this-
batch_id
This is how it looks in my case
/Users/sokale/models/run_1/batch_id=73/part-00001-5fa5aebb-a836-43d2-97d2-7cf9bb722c26.c000.snappy.parquet
val df = spark.range(1,5)
.withColumn("batch_id", lit(70) + col("id"))
df.show(false)
df.printSchema()
/**
* +---+--------+
* |id |batch_id|
* +---+--------+
* |1 |71 |
* |2 |72 |
* |3 |73 |
* |4 |74 |
* +---+--------+
*
* root
* |-- id: long (nullable = false)
* |-- batch_id: long (nullable = false)
*/
df.write.partitionBy("batch_id")
.mode(SaveMode.Overwrite)
.parquet("/Users/sokale/models/run_1")
/**
* $ cd run_1/
* $ ls -l
* total 0
* ............ _SUCCESS
* ............ batch_id=71
* ............ batch_id=72
* ............ batch_id=73
* ............ batch_id=74
*/
barch_id=73
From the spark doc-
spark.sql.parquet.filterPushdown default-true Enables Parquet filter push-down optimization when set to true.
that means df.read.parquet(dir).where(partitionCOndition)
only read the the specified partitioned directory using filter push down
// read only file with batch_id=73
spark.read.parquet("/Users/sokale/models/run_1").where(col("batch_id").equalTo(73))
.show(false)
/**
* +---+--------+
* |id |batch_id|
* +---+--------+
* |3 |73 |
* +---+--------+
*/
// read all partitions
val readDF = spark.read.parquet("/Users/sokale/models/run_1")
readDF.show(false)
readDF.printSchema()
/**
* +---+--------+
* |id |batch_id|
* +---+--------+
* |3 |73 |
* |2 |72 |
* |1 |71 |
* |4 |74 |
* +---+--------+
*
* root
* |-- id: long (nullable = true)
* |-- batch_id: integer (nullable = true)
*/
Upvotes: 1