Priyanka.Patil
Priyanka.Patil

Reputation: 1197

No output after using the Spark Streaming

HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");

String topics = "test4";
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(" ")));


JavaDStream<String> stream1 = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class,
    StringDecoder.class, kafkaParams, topicsSet)
    .transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
      @Override
      public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) {
        rdd.saveAsTextFile("output");
        return rdd;
      }
    }).map(new Function<Tuple2<String, String>, String>() {
      @Override
      public String call(Tuple2<String, String> kv) {
        return kv._2();
      }
    });
stream1.print();
jssc.start();
jssc.awaitTermination();

Cross checked that there is valid data in the topic "test4".

enter image description here

I am expecting strings that are streamed from the kafka cluster, to be printed in the console.No exceptions in console,but also no output. Anything I'm missing here?

Upvotes: 1

Views: 1294

Answers (2)

Erica
Erica

Reputation: 1728

Have you tried to produce data in your topic after the streaming application is started?

By default direct stream use the configuration auto.offset.reset = largest, it means that when there is no initial offset it automatically reset to the largest offset, so basically you will be able to read only the new messages entering in the topic after the streaming application is started.

Upvotes: 5

Patrick
Patrick

Reputation: 55

As ccheneson says, it could be because you're missing .start() and .awaitTermination()

Or it could be because transformations in Spark are lazy, which means that you need to add an action to get the results. e.g.

stream1.print();

Or it could be because the map is being performed on the executor(s), so the output would be in the executor's log, rather than the driver's log.

Upvotes: 1

Related Questions