MGM
MGM

Reputation: 27

compare values of two pair of RDDs based on key in scala

Is there a way to compare values of 2 pair RDDs based on key.
For example:

RDD1: RDD[(Int, String)] = {(1, "ABC"), (2, "XYZ"), (3, "PQR")}
RDD2: RDD[(Int, String)] = {(5, "AAA"), (2, "XYZ"), (3, "LMN")}

My task is to get the number of keys present in both rdds where values are different.
The join gives rdd with keys 2 and 3. But i want final output as 3 Because the values are different.

I've tried the below approach:

var diff = rdd1.join(rdd2).map{case(k,(s1,s2))=> if (s1!=s2) (k,s1)}

So I can get count of diff. But it gives both keys 2 and 3. I guess I need else condition to skip the matching value.
Can anyone please suggest how I can achieve desired result? (or using any different approach).

Upvotes: 1

Views: 1010

Answers (2)

vrk
vrk

Reputation: 56

Use leftInnerJoin to where key present in first rdd and count on it.

val diff = rdd1.leftOuterJoin(rdd2).count()

Upvotes: -1

Tzach Zohar
Tzach Zohar

Reputation: 37832

Use collect, which takes a partial function and removes values for which that partial function isn't defined. Then, include the "if" as a guard in the case statement, instead of part of the case result expression:

val diff = rdd1.join(rdd2).collect {
  case (k, (s1, s2)) if s1 != s2 => (k, s1)
}

This way, the partial function won't be defined where s1 == s2, which would make collect filter them out.

An equivalent (but slightly less elegant) solution would use map and filter - map alone would always return the same number of records as the input:

val diff = rdd1.join(rdd2).filter {
  case (k, (s1, s2)) => s1 != s2
}.map {
  case (k, (s1, s2)) => (k, s1)
}

Upvotes: 2

Related Questions