Reputation: 799
This might be a very trivial question, but I just don't know what is wrong.
I have a JavaPairDStream,and for each batch interval I want to get the number of keys in my RDD within the stream, so that I can use this number later in the application.
The problem is, I can get the number of keys by doing:
streamGiveKey.foreachRDD(new Function<JavaPairRDD<String, String>, Void>() {
@Override
public Void call(JavaPairRDD<String, String> stringStringJavaPairRDD) throws Exception {
int a= stringStringJavaPairRDD.countByKey().size();
countPartitions=a;
System.out.print(a + "\r\n");
return null;
}
});
JavaPairDStream<String,Iterable<String>>groupingEachBoilerValues= streamGiveKey.groupByKey(countPartitions);
In which countPartitions
is a global variable that stores the number of keys for one batch interval.
The problem is, the application never reaches groupingEachBoilerValues
, it just keeps printing inside the forEachRDD in an endless cycle.
Is there another way for me to do this? Thank you so much.
Upvotes: 1
Views: 616
Reputation: 63231
You can keep a global count in the driver. Here
long globalCount = 0L;
.. foreachRDD(
.. globalCount += rdd.count();
This globalCount variable will reside in the driver and will keep being updated after every batch.
Update Skeptics ahoy! The above code is specific to streaming. I am well aware it would not work in standard non-streaming RDD code.
I have created test code encompassing the above approach and the counter works fine. Will post this code in a few minutes.
import org.apache.spark._
import org.apache.spark.streaming._
var globalCount = 0L
val ssc = new StreamingContext(sc, Seconds(4))
val lines = ssc.socketTextStream("localhost", 19999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
lines.count().foreachRDD(rdd => { globalCount += rdd.count; println(globalCount) } )
ssc.start
ssc.awaitTermination
Here it is running
scala> ssc.awaitTermination
-------------------------------------------
Time: 1466366660000 ms
-------------------------------------------
1
-------------------------------------------
Time: 1466366664000 ms
-------------------------------------------
2
-------------------------------------------
Time: 1466366668000 ms
-------------------------------------------
3
Here is a tiny data generator program to test with:
import java.net._
import java.io._
case class ClientThread(sock: Socket) {
new Thread() {
override def run() {
val bos = new BufferedOutputStream(sock.getOutputStream)
while (true) {
bos.write(s"Hello there it is ${new java.util.Date().toString}\n".getBytes)
Thread.sleep(3000)
}
}
}.start
}
val ssock = new ServerSocket(19999)
while (true) {
val sock = ssock.accept()
ClientThread(sock)
}
Upvotes: 1