manuel mourato
manuel mourato

Reputation: 799

How to countByKey in JavaPairDStream without foreachRDD?

This might be a very trivial question, but I just don't know what is wrong.

I have a JavaPairDStream,and for each batch interval I want to get the number of keys in my RDD within the stream, so that I can use this number later in the application.

The problem is, I can get the number of keys by doing:

streamGiveKey.foreachRDD(new Function<JavaPairRDD<String, String>, Void>() {
        @Override
        public Void call(JavaPairRDD<String, String> stringStringJavaPairRDD) throws Exception {
            int a= stringStringJavaPairRDD.countByKey().size();
            countPartitions=a;

            System.out.print(a + "\r\n");
            return null;
        }
    });

JavaPairDStream<String,Iterable<String>>groupingEachBoilerValues= streamGiveKey.groupByKey(countPartitions);

In which countPartitions is a global variable that stores the number of keys for one batch interval.

The problem is, the application never reaches groupingEachBoilerValues, it just keeps printing inside the forEachRDD in an endless cycle.

Is there another way for me to do this? Thank you so much.

Upvotes: 1

Views: 616

Answers (1)

WestCoastProjects
WestCoastProjects

Reputation: 63231

You can keep a global count in the driver. Here

long globalCount = 0L;

 .. foreachRDD( 
      ..  globalCount += rdd.count();

This globalCount variable will reside in the driver and will keep being updated after every batch.

Update Skeptics ahoy! The above code is specific to streaming. I am well aware it would not work in standard non-streaming RDD code.

I have created test code encompassing the above approach and the counter works fine. Will post this code in a few minutes.

import org.apache.spark._
import org.apache.spark.streaming._
var globalCount = 0L
val ssc = new StreamingContext(sc, Seconds(4))
val lines = ssc.socketTextStream("localhost", 19999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
lines.count().foreachRDD(rdd => { globalCount += rdd.count; println(globalCount) } )
ssc.start
ssc.awaitTermination

Here it is running

scala> ssc.awaitTermination
-------------------------------------------
Time: 1466366660000 ms
-------------------------------------------

1
-------------------------------------------
Time: 1466366664000 ms
-------------------------------------------

2
-------------------------------------------
Time: 1466366668000 ms
-------------------------------------------

3

Here is a tiny data generator program to test with:

import java.net._
import java.io._
case class ClientThread(sock: Socket) {
  new Thread() {
   override def run() {
      val bos = new BufferedOutputStream(sock.getOutputStream)
      while (true) {
          bos.write(s"Hello there it is ${new java.util.Date().toString}\n".getBytes)
          Thread.sleep(3000)
      }
   }
   }.start
 }
val ssock = new ServerSocket(19999)
while (true) {
  val sock =  ssock.accept()
  ClientThread(sock)
 }

Upvotes: 1

Related Questions