Reputation: 8465
I'm currently trying to implement some algorithms in both, Apache Spark and Apache Flink. When executing the algorithms, I have to do some kind of set difference/subtraction operations.
While there is a built-in subtract
operation for Apache Spark, I couldn't find something similar in Apache Flink (1.0.3 and 1.1.0-SNAPSHOT).
So my question is, given two DataSet objects d1, d2
both containing the same type T
, what is the most efficient way to apply set difference, i.e. d1\d2
?
val d1: DataSet[T] = ...
val d2: DataSet[T] = ...
val d_diff: DataSet[T] = ???
Probably there is some way to it via coGroup
val d_diff = d1.coGroup(d2).where(0).equalTo(0) {
(l, r, out: Collector[T]) => {
val rightElements = r.toSet
for (el <- l)
if (!rightElements.contains(el)) out.collect(el)
}
}
but I'm wondering whether that's the correct way or even best-practice or does anybody know some more efficient way?
Upvotes: 2
Views: 584
Reputation: 2664
The DataSet API does not provide methods for it as it only contains the very basic set of operations. The Table API in 1.1 will have a set minus operator. You can see how it is implemented here.
leftDataSet
.coGroup(rightDataSet)
.where("*")
.equalTo("*")
.`with`(coGroupFunction)
Using this CoGroupFunction. So yes, you are on the right track.
Upvotes: 4