Reputation: 3270
I'm trying to figure out how to eliminate duplicates in cross join given two dataset.
For example client (clientDs)
clientDs = sc.parallelize(List('c1', 'c2', 'c3'))
clientMatrixDs = clientDs.join(clientDs)
clientProduct.show()
-- output
c1, c1
c1, c2
c1, c3
c2, c1
c2, c2
c2, c3
c3, c1
c3, c2
c3, c3
In this case, both (c1,c2) and (c2,c1) are duplicates and I need to de-dupe it
I'm not sure how to do this
Looking for some ideas here.
Upvotes: 1
Views: 1379
Reputation: 5315
RDD
s have a method distinct()
, which should work as the equivalent on, say List
. I don't know about its performances, however.
EDIT
However, this won't work since (a, b) != (b, a)
in scala. So you have to swap some of your elements to make sure you don't get any doubles.
If you have some ordering on your type, you can map all your pairs to their ordered equivalent. For instance, map (2, 1)
to (1, 2)
and (3, 4)
to (3, 4)
. Then you can use distinct
, which will remove all duplicates.
val distinctPairsRDD: RDD[(T, T)] = rdd.map{
case (a, b) if a <= b => (a, b)
case (a, b) => (b, a)
}.distinct()
If you do not have such an ordering, you may replace your pairs by some Set
s, which are unordered. So you can map your RDD
as follows:
val distinctRDD: RDD[Set[T]] = rdd.map {
case (a, b) => Set(a, b)
}.distinct()
However, this will lose the type you had, so you may need to go back to pairs after that. To do so, remember that Set
themselves do not have duplicates, so if you had a pair with the same element twice, it will be mapped to a set with only one element.
So you must do the following:
val distinctPairs: RDD[(T, T)] = distinctRdd.collect {
case Set(a) => (a, a)
case Set(a, b) => (a, b)
}
The collect
could be replaced by a map
, but it may induce some MatchError if you change your code later on. This makes all other cases (if you happen to have an empty set or a set with more that two elements) to be discarded, so make sure of what you prefer on future changes (rather a RuntimeError, or discarded elements).
TL;DR
Try to order the element in your pairs, to get unicity. If this does not work use Set
s, but it will be more complicated.
Upvotes: 1
Reputation: 30300
In your example, you have a bunch of Tuple2[String, String]
s, and the problem is that you want an equality that Tuple2
just doesn't have where (a, b) == (b, a)
. This is why distinct
doesn't work for you. You therefore have to provide your own custom equality.
The thing is you don't want to override Tuple2
's version of equals
because that can be dangerous, so you can provide a custom equals somewhere:
def customEquals(tuple1: (String, String), tuple2: (String, String)) = {
tuple1 == tuple2 || (tuple1._1 == tuple2._2 && tuple1._2 == tuple2._1)
}
Then you can use this function in a filter
to get rid of your custom definition of duplicates:
val deduped = clientMatrixDs.filter {
var seq = Seq.empty[(String, String)]
tuple =>
if (seq.exists(customEquals(tuple, _))) {
false
} else {
seq :+= tuple
true
}
}
Upvotes: 2