lolcat
lolcat

Reputation: 75

Reading Hive table from Spark as a Dataset

I am trying to read a hive table in spark as a strongly typed Dataset, and I am noticing that the partitions are not being pruned as opposed to doing a Spark SQL on a dataframe from the same hive table.

case class States(state: String, country: String)
val hiveDS = spark.table("db1.states").as[States]
//no partition pruning
hiveDS.groupByKey(x=>x.country).count().filter(x=>x._1 == "US")

states is partitioned by country, so when I do a count on the above Dataset, the query scans all the partitions. However if I read it as such -

val hiveDF = spark.table("db1.states")
//correct partition pruning
hiveDF.groupByKey("country").count().filter(x=>x._1 == "US")

The partitions are pruned correctly. Can anyone explain why partition information is lost when you map a table to a case class?

Upvotes: 3

Views: 3179

Answers (1)

zero323
zero323

Reputation: 330063

TL;DR Lack of partition pruning in the first case is the expected behavior.

It happens because any operation on an object, unlike operations used with DataFrame DSL / SQL, is a black box, from the the optimizer perspective. To be able to optimize function like x=> x._1 == "US" or x => x.country Spark would have to apply complex and unreliable static analysis, and functionality like this is neither present nor (as far as I know) planned for the future.

The second case shouldn't compile (there is no groupByKey variant which takes strings), so it is not possible to tell, but in general it shouldn't prune either, unless you meant:

hiveDF.groupBy($"country").count().filter($"country" =!= "US")

See also my answer to to Spark 2.0 Dataset vs DataFrame.

Upvotes: 4

Related Questions