Reputation: 259
I want to execute the SQL by Spark like this.
sparkSession.sql("select * from table")
But I want to have a partition check on the table before execution avoiding fullscan.
If the table is a partitioned table, my program will force users to add a partition filter. If not it's ok to run.
So my question is how to know whether a table is a partitioned table? My thought is that reading info from metastore. but how to get metastore is another problem I encounter. Could someone help?
Upvotes: 2
Views: 12287
Reputation: 94
I know this is late, but this might help someone
spark.sql("describe detail database.table").select("partitionColumns").show(false)
this is give the row with the partitioned columns in a array
Upvotes: 0
Reputation: 1214
val listPartitions = spark.sessionState.catalog.listPartitionNames(TableIdentifier("table_name", Some("db name")))
listPartitions: Seq[String] = ArrayBuffer(partition1=value1, ... ) // partition table
listPartitions: Seq[String] = ArrayBuffer() // not partition table
Upvotes: 0
Reputation: 9417
Assuming that your real goal is to restrict execution of unbounded queries, I think it would be easier to get query's execution plan and look under its FileScan
/ HiveTableScan
leaf nodes to see if any partition filters are being applied. For partitioned tables, number of partitions that query is actually going to scan will also be presented, by the way. So, something like this should do:
scala> val df_unbound = spark.sql("select * from hottab")
df_unbound: org.apache.spark.sql.DataFrame = [id: int, descr: string ... 1 more field]
scala> val plan1 = df_unbound.queryExecution.executedPlan.toString
plan1: String =
"*(1) FileScan parquet default.hottab[id#0,descr#1,loaddate#2] Batched: true, Format: Parquet,
Location: CatalogFileIndex[hdfs://ns1/user/hive/warehouse/hottab],
PartitionCount: 365, PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<id:int,descr:string>
"
scala> val df_filtered = spark.sql("select * from hottab where loaddate='2019-07-31'")
df_filtered: org.apache.spark.sql.DataFrame = [id: int, descr: string ... 1 more field]
scala> val plan2 = df_filtered.queryExecution.executedPlan.toString
plan2: String =
"*(1) FileScan parquet default.hottab[id#17,descr#18,loaddate#19] Batched: true, Format: Parquet,
Location: PrunedInMemoryFileIndex[hdfs://ns1/user/hive/warehouse/hottab/loaddate=2019-07-31],
PartitionCount: 1, PartitionFilters: [isnotnull(loaddate#19), (loaddate#19 = 2019-07-31)],
PushedFilters: [], ReadSchema: struct<id:int,descr:string>
"
This way, you also don't have to deal with SQL parsing to find table name(s) from queries, and to interrogate metastore yourself.
As a bonus, you'll be also able to see if "regular" filter pushdown occurs (for storage formats that support it) in addition to partition pruning.
Upvotes: 2
Reputation: 10082
You can use Scala's Try
class and execute show partitions
on the required table.
val numPartitions = Try(spark.sql("show partitions database.table").count) match {
case Success(v) => v
case Failure(e) => -1
}
Later you can check numPartitions
. If the value is -1
then the table is not partitioned.
Upvotes: 0