Reputation: 506
I am processing a java jar. The accumulator adds up the stream values. The problem is, I want to display the value in my UI every time it increments or in a specific periodic interval.
But, Since the accumulators value can only be got from the Driver program, I am not able to access this value until the process finishes its execution. any idea on how i can access this value periodically?
My code is as given below
package com.spark;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class KafkaSpark {
/**
* @param args
*/
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Simple Application");
conf.setMaster("local");
JavaStreamingContext jssc = new JavaStreamingContext(conf,
new Duration(5000));
final Accumulator<Integer> accum = jssc.sparkContext().accumulator(0);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put("test", 1);
JavaPairDStream<String, String> lines = KafkaUtils.createStream(jssc,
"localhost:2181", "group1", topicMap);
JavaDStream<Integer> map = lines
.map(new Function<Tuple2<String, String>, Integer>() {
public Integer call(Tuple2<String, String> v1)
throws Exception {
if (v1._2.contains("the")) {
accum.add(1);
return 1;
}
return 0;
}
});
map.print();
jssc.start();
jssc.awaitTermination();
System.out.println("*************" + accum.value());
System.out.println("done");
}
}
I am streaming data using Kafka.
Upvotes: 2
Views: 4547
Reputation: 11
jssc.start();
while(true) {
System.out.println("current:" + accum.value());
Thread.sleep(1000);
}
Upvotes: 0
Reputation: 1141
In spark only when jssc.star() is called the actual code starts to execute. Now the control is with spark it starts to run the loop, all you system.out.println will be called only once. and will not be executed with the loop everytime.
For out put operations check the documentation
you can either use
print() forEachRDD() save as object text or hadoop file
Hope this helps
Upvotes: 1