Reputation: 1902
I am trying to consume Kafka DirectStream, process the RDDs for each partition and write the processed values to DB. When I try to perform reduceByKey(per partition, that is without the shuffle), I get the following error. Usually on the driver node, we can use sc.parallelize(Iterator) to solve this issue. But I would like to solve it in spark streaming.
value reduceByKey is not a member of Iterator[((String, String), (Int, Int))]
Is there a way to perform transformations on Iterator within the partition?
myKafkaDS
.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val commonIter = rdd.mapPartitionsWithIndex ( (i,iter) => {
val offset = offsetRanges(i)
val records = iter.filter(item => {
(some_filter_condition)
}).map(r1 => {
// Some processing
((field2, field2), (field3, field4))
})
val records.reduceByKey((a,b) => (a._1+b._1, a._2+b._2)) // Getting reduceByKey() is not a member of Iterator
// Code to write to DB
Iterator.empty // I just want to store the processed records in DB. So returning empty iterator
})
}
Is there a more elegant way to do this(process kafka RDDs for each partition and store them in a DB)?
Upvotes: 1
Views: 711
Reputation: 1902
So... We can not use spark transformations within mapPartitionsWithIndex. However using scala transform and reduce methods like groupby helped me solve this issue.
Upvotes: 1
Reputation: 1323
Syntax issues:
1)reduceByKey logic looks ok, please remove val before statement(if not typo) & attach reduceByKey() after map:
.map(r1 => {
// Some processing
((field2, field2), (field3, field4))
}).reduceByKey((a,b) => (a._1+b._1, a._2+b._2))
2)Add iter.next after end of each iteration.
3)iter.empty is wrongly placed. Put after coming out of mapPartitionsWithIndex()
4)Add iterator condition for safety:
val commonIter = rdd.mapPartitionsWithIndex ((i,iter) => if (i == 0 && iter.hasNext){
....
}else iter),true)
Upvotes: 0
Reputation: 41
yours records value is a iterator and Not a RDD. Hence you are unable to invoke reduceByKey on records relation.
Upvotes: 0