chrisbtk
chrisbtk

Reputation: 69

reduceByKey using Scala object as key

I'm using spark with scala and I've a RDD full of tuple2 containing a complex object as key and a double. The aim is to add the double (the frequency) if the object are identical.

for that I've defined my object as follow :

    case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Ordered[SimpleCoocurrence]{
      def compare(that: SimpleCoocurrence) = {
        if(this.word.equals(that.word)&&this.word_pos.equals(that.word_pos)
           &&this.cooc.equals(that.cooc)&&this.cooc_pos.equals(that.cooc_pos))
          0
        else
          this.toString.compareTo(that.toString)
      }
    }

now I'm trying to use reduceBykey like that :

val coocRDD = sc.parallelize(coocList)
println(coocRDD.count)
coocRDD.map(tup=>tup).reduceByKey(_+_)
println(coocRDD.count)

But, the result shows that the RDD before and after processing a reducebykey contains exactly the same number of elements.

How can I perform a reduceByKey using tuple2[SimpleCoocurrence,Double] ? Is implementing Ordered trait the good way to tell Spark how to compare my objects ? Should I use only tuple2[String,Double] ?

thx,

Upvotes: 6

Views: 3974

Answers (3)

Akshat Chaturvedi
Akshat Chaturvedi

Reputation: 698

You are not storing the results of reduceByKey. Try this instead:

val coocRDD = sc.parallelize(coocList)
println(coocRDD.count)
val result = coocRDD.map(tup=>tup).reduceByKey(_+_)
println(result.count)

Upvotes: 0

chrisbtk
chrisbtk

Reputation: 69

First, I'm dumb...

Next, in case anyone have the same problem and want to use complex scala objects as Key for a reduceByKey on Spark:

Spark knows how to compare two object even if they do not implement Ordered. So the code above is actualy fonctionnal.

The only problem was... that I was printing the same RDD before and after. When I write this, it actualy works well.

val coocRDD = sc.parallelize(coocList)
println(coocRDD.count)
val newRDD = coocRDD.map(tup=>tup).reduceByKey(_+_)
println(newRDD.count)

Upvotes: 0

maasg
maasg

Reputation: 37435

reduceByKey does not use Ordering but hashCode and equals to determine what keys are the same. In particular, the hashPartitioner will group keys by hash, sothat keys with the same hashCode fall on the same partition sothat further reduction can happen on a per-partition.

case classes have a default implementation of equals and hashCode. Probably the test data used has different values of the field distance:Double making each instance an unique object. Using it as key will result in only identical objects being reduced as one.

One way to address this would be to define a key for your case class and an addition method for the object, something like this:

case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Serializable {
   val key = word + word_pos + cooc + cooc_pos
}
object SimpleCoocurrence {
   val add: (SimpleCoocurrence, SimpleCoocurrence) => SimpleCoocurrence = ???
}

val coocList:List[SimpleCoocurrence] = ???
val coocRDD = sc.parallelize(coocList)
val coocByKey = coocRDD.keyBy(_.key)
val addedCooc = coocByKey.reduceByKey(SimpleCoocurrence.add)

(*) code provided as guiding example - not compiled or tested.

Upvotes: 6

Related Questions