Tanveer Dayan
Tanveer Dayan

Reputation: 506

How to display current accumulator value updated in DStream?

I am processing a java jar. The accumulator adds up the stream values. The problem is, I want to display the value in my UI every time it increments or in a specific periodic interval.

But, Since the accumulators value can only be got from the Driver program, I am not able to access this value until the process finishes its execution. any idea on how i can access this value periodically?

My code is as given below

package com.spark;

import java.util.HashMap;
import java.util.Map;

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

public class KafkaSpark {

    /**
     * @param args
     */
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("Simple Application");
        conf.setMaster("local");
        JavaStreamingContext jssc = new JavaStreamingContext(conf,
                new Duration(5000));
        final Accumulator<Integer> accum = jssc.sparkContext().accumulator(0);
        Map<String, Integer> topicMap = new HashMap<String, Integer>();
        topicMap.put("test", 1);
        JavaPairDStream<String, String> lines = KafkaUtils.createStream(jssc,
                "localhost:2181", "group1", topicMap);

        JavaDStream<Integer> map = lines
                .map(new Function<Tuple2<String, String>, Integer>() {

                    public Integer call(Tuple2<String, String> v1)
                            throws Exception {
                        if (v1._2.contains("the")) {
                            accum.add(1);
                            return 1;
                        }
                        return 0;
                    }
                });

        map.print();
        jssc.start();
        jssc.awaitTermination();
        System.out.println("*************" + accum.value());
        System.out.println("done");
    }
}

I am streaming data using Kafka.

Upvotes: 2

Views: 4547

Answers (2)

user6269973
user6269973

Reputation: 11

jssc.start();
while(true) {
    System.out.println("current:" + accum.value());
    Thread.sleep(1000);
}

Upvotes: 0

mithra
mithra

Reputation: 1141

In spark only when jssc.star() is called the actual code starts to execute. Now the control is with spark it starts to run the loop, all you system.out.println will be called only once. and will not be executed with the loop everytime.

For out put operations check the documentation

you can either use

print() forEachRDD() save as object text or hadoop file

Hope this helps

Upvotes: 1

Related Questions