Reputation: 121
I'm using delta lake ("io.delta" %% "delta-core" % "0.4.0") and merge in foreachBatch like:
foreachBatch { (s, batchid) =>
deltaTable.alias("t")
.merge(
s.as("s"),
"s.eventid = t.eventid and t.categories in ('a1', 'a2')")
.whenMatched("s.eventtime < t.eventtime").updateAll()
.whenNotMatched().insertAll()
.execute()
}
The delta table is partitioned on categories. If I add partition filter like 'and t.categories in ('a1', 'a2')', from spark graph I can see the input is not the whole table. I think it did partition pruning. However, if I do: "s.eventid = t.eventid and t.categories=s.categories", it still loads all the data from delta table. I expect it can automatically sense which partitions it should go to do the join, kind of pushdown. Is that possible to have the partition pruning without specifying the specific partition values? I also tried to add ("spark.databricks.optimizer.dynamicPartitionPruning","true") but not work.
Thanks
Upvotes: 6
Views: 7053
Reputation: 3008
You could pass that in two ways. One is static way of passing the values and other is you do dynamically set the partitions in the merge statement.
val categoriesList = List("a1", "a2")
val catergoryPartitionList = categoriesList.mkString("','")
foreachBatch { (s, batchid) =>
deltaTable.alias("t")
.merge(
s.as("s"),
"s.eventid = t.eventid and t.categories in ('$catergoryPartitionList')")
.whenMatched("s.eventtime < t.eventtime").updateAll()
.whenNotMatched().insertAll()
.execute()
}
val selectedCategories = deltaTable.select("categories").dropDuplicates()
val categoriesList = selectedCategories.map(_.getString(0)).collect()
val catergoryPartitionList = categoriesList.mkString("','")
foreachBatch { (s, batchid) =>
deltaTable.alias("t")
.merge(
s.as("s"),
"s.eventid = t.eventid and t.categories in ('$catergoryPartitionList')")
.whenMatched("s.eventtime < t.eventtime").updateAll()
.whenNotMatched().insertAll()
.execute()
}
Upvotes: 3