Reputation: 363
I have separately test my error code in scala shell
scala> val p6 = sc.parallelize(List( ("a","b"),("b","c")))
p6: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> val p7 = p6.map(a => ((a._1+a._2), (a._1, a._2, 1)))
p7: org.apache.spark.rdd.RDD[(String, (String, String, Int))] = MapPartitionsRDD[11] at map at <console>:26
scala> val p8 = p7.reduceByKey( (a,b) => (a._1,(a._2, a._3+b._3)))
<console>:28: error: type mismatch;
found : (String, (String, Int))
required: (String, String, Int)
val p8 = p7.reduceByKey( (a,b) => (a._1,(a._2, a._3+b._3)))
I want to use a._1
as the key so that I can further use join
operator, and it is required to be (key, value) pairs. But my question is, why there is a required
type while I am using reducing function? I think the format is set by ourselves instead of something regulated. Am I wrong?
Also, if I am wrong, then why it is (String, String, Int)
required? Why it is not something else?
ps: I know (String, String, Int)
is the value type in (a._1+a._2), (a._1, a._2, 1))
which is the map function, but the official example shows that the reduce funtion (a, b) => (a._1 + b._1, a._2 + b._2)
is valid. And I think all of these including my code above should be valid
Upvotes: 1
Views: 924
Reputation: 35249
Take a look at the types. Reduce by key is method on RDD[(K, V)]
with signature:
def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
In other words, both input arguments and the return argument have to be of the same type.
In your case p7
is
RDD[(String, (String, String, Int))]
where K
is String
and V
is (String, String, Int)
, so the function used with reduceByKey
must be
((String, String, Int), (String, String, Int)) => (String, String, Int)
A valid function would be:
p7.reduceByKey( (a,b) => (a._1, a._2, a._3 + b._3))
which would give you
(bc,(b,c,1))
(ab,(a,b,1))
as a result.
If you want to change the type in byKey
method you have to use aggregateByKey
or combineByKey
.
Upvotes: 3
Reputation: 41987
your p7
is of p7: org.apache.spark.rdd.RDD[(String, (String, String, Int))]
but in your reduceByKey
you have used (a._1,(a._2, a._3+b._3))
which is of type (String, (String, Int))
The output type of p8
should also be p8: org.apache.spark.rdd.RDD[(String, (String, String, Int))]
so defining like the following should work for you
val p8 = p7.reduceByKey( (a,b) => (a._1, a._2, a._3+b._3))
You can read my answer in pyspark for more detail on how reduceByKey works
Upvotes: 2