Reputation: 69
I'm using spark with scala and I've a RDD full of tuple2 containing a complex object as key and a double. The aim is to add the double (the frequency) if the object are identical.
for that I've defined my object as follow :
case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Ordered[SimpleCoocurrence]{
def compare(that: SimpleCoocurrence) = {
if(this.word.equals(that.word)&&this.word_pos.equals(that.word_pos)
&&this.cooc.equals(that.cooc)&&this.cooc_pos.equals(that.cooc_pos))
0
else
this.toString.compareTo(that.toString)
}
}
now I'm trying to use reduceBykey like that :
val coocRDD = sc.parallelize(coocList)
println(coocRDD.count)
coocRDD.map(tup=>tup).reduceByKey(_+_)
println(coocRDD.count)
But, the result shows that the RDD before and after processing a reducebykey contains exactly the same number of elements.
How can I perform a reduceByKey using tuple2[SimpleCoocurrence,Double] ? Is implementing Ordered trait the good way to tell Spark how to compare my objects ? Should I use only tuple2[String,Double] ?
thx,
Upvotes: 6
Views: 3974
Reputation: 698
You are not storing the results of reduceByKey. Try this instead:
val coocRDD = sc.parallelize(coocList)
println(coocRDD.count)
val result = coocRDD.map(tup=>tup).reduceByKey(_+_)
println(result.count)
Upvotes: 0
Reputation: 69
First, I'm dumb...
Next, in case anyone have the same problem and want to use complex scala objects as Key for a reduceByKey on Spark:
Spark knows how to compare two object even if they do not implement Ordered. So the code above is actualy fonctionnal.
The only problem was... that I was printing the same RDD before and after. When I write this, it actualy works well.
val coocRDD = sc.parallelize(coocList)
println(coocRDD.count)
val newRDD = coocRDD.map(tup=>tup).reduceByKey(_+_)
println(newRDD.count)
Upvotes: 0
Reputation: 37435
reduceByKey
does not use Ordering but hashCode
and equals
to determine what keys are the same. In particular, the hashPartitioner
will group keys by hash, sothat keys with the same hashCode fall on the same partition sothat further reduction can happen on a per-partition.
case classes have a default implementation of equals
and hashCode
. Probably the test data used has different values of the field distance:Double
making each instance an unique object. Using it as key will result in only identical objects being reduced as one.
One way to address this would be to define a key for your case class
and an addition method for the object, something like this:
case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Serializable {
val key = word + word_pos + cooc + cooc_pos
}
object SimpleCoocurrence {
val add: (SimpleCoocurrence, SimpleCoocurrence) => SimpleCoocurrence = ???
}
val coocList:List[SimpleCoocurrence] = ???
val coocRDD = sc.parallelize(coocList)
val coocByKey = coocRDD.keyBy(_.key)
val addedCooc = coocByKey.reduceByKey(SimpleCoocurrence.add)
(*) code provided as guiding example - not compiled or tested.
Upvotes: 6