santhosh
santhosh

Reputation: 1902

Spark Streaming - how to use reduceByKey within a partition on the Iterator

I am trying to consume Kafka DirectStream, process the RDDs for each partition and write the processed values to DB. When I try to perform reduceByKey(per partition, that is without the shuffle), I get the following error. Usually on the driver node, we can use sc.parallelize(Iterator) to solve this issue. But I would like to solve it in spark streaming.

value reduceByKey is not a member of Iterator[((String, String), (Int, Int))]

Is there a way to perform transformations on Iterator within the partition?

myKafkaDS
  .foreachRDD { rdd =>
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    val commonIter = rdd.mapPartitionsWithIndex ( (i,iter) => {

      val offset = offsetRanges(i)

      val records = iter.filter(item => {
        (some_filter_condition)
      }).map(r1 => {
        // Some processing
        ((field2, field2), (field3, field4))
      })

      val records.reduceByKey((a,b) => (a._1+b._1, a._2+b._2)) // Getting reduceByKey() is not a member of Iterator
      // Code to write to DB    
      Iterator.empty // I just want to store the processed records in DB. So returning empty iterator
    })
}

Is there a more elegant way to do this(process kafka RDDs for each partition and store them in a DB)?

Upvotes: 1

Views: 711

Answers (3)

santhosh
santhosh

Reputation: 1902

So... We can not use spark transformations within mapPartitionsWithIndex. However using scala transform and reduce methods like groupby helped me solve this issue.

Upvotes: 1

KiranM
KiranM

Reputation: 1323

Syntax issues:

1)reduceByKey logic looks ok, please remove val before statement(if not typo) & attach reduceByKey() after map:

.map(r1 => {
    // Some processing
    ((field2, field2), (field3, field4))
  }).reduceByKey((a,b) => (a._1+b._1, a._2+b._2))

2)Add iter.next after end of each iteration.

3)iter.empty is wrongly placed. Put after coming out of mapPartitionsWithIndex()

4)Add iterator condition for safety:

val commonIter = rdd.mapPartitionsWithIndex ((i,iter) => if (i == 0 && iter.hasNext){
....
}else iter),true)

Upvotes: 0

dragon5
dragon5

Reputation: 41

yours records value is a iterator and Not a RDD. Hence you are unable to invoke reduceByKey on records relation.

Upvotes: 0

Related Questions