Reputation: 1197
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
String topics = "test4";
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(" ")));
JavaDStream<String> stream1 = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParams, topicsSet)
.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) {
rdd.saveAsTextFile("output");
return rdd;
}
}).map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> kv) {
return kv._2();
}
});
stream1.print();
jssc.start();
jssc.awaitTermination();
Cross checked that there is valid data in the topic "test4".
I am expecting strings that are streamed from the kafka cluster, to be printed in the console.No exceptions in console,but also no output. Anything I'm missing here?
Upvotes: 1
Views: 1294
Reputation: 1728
Have you tried to produce data in your topic after the streaming application is started?
By default direct stream use the configuration auto.offset.reset = largest, it means that when there is no initial offset it automatically reset to the largest offset, so basically you will be able to read only the new messages entering in the topic after the streaming application is started.
Upvotes: 5
Reputation: 55
As ccheneson says, it could be because you're missing .start()
and .awaitTermination()
Or it could be because transformations in Spark are lazy, which means that you need to add an action to get the results. e.g.
stream1.print();
Or it could be because the map
is being performed on the executor(s), so the output would be in the executor's log, rather than the driver's log.
Upvotes: 1