Reputation: 267
I am streaming fro kafka topic like below;
JavaPairInputDStream<String, String> directKafkaStream =
KafkaUtils.createDirectStream(jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicSet);
directKafkaStream.print();
The output gives like below for one topic:
(null,"04/15/2015","18:44:14")
(null,"04/15/2015","18:44:15")
(null,"04/15/2015","18:44:16")
(null,"04/15/2015","18:44:17")
How do I map topic name and records.
ex: topic is "callData" ,it should something like below and so on
(callData,"04/15/2015","18:44:14")
(callData,"04/15/2015","18:44:15")
(callData,"04/15/2015","18:44:16")
(callData,"04/15/2015","18:44:17")
Upvotes: 3
Views: 742
Reputation: 149538
How do I map topic name and records?
In order to extract the partition information, you'll need to use the overload which accepts a Function
receiving MessageAndMetadata<K, V>
and returning the type you wish to transform to.
It looks like this:
Map<TopicAndPartition, Long> map = new HashMap<>();
map.put(new TopicAndPartition("topicname", 0), 1L);
JavaInputDStream<Map.Entry> stream = KafkaUtils.createDirectStream(
javaContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
Map.Entry.class, // <--- This is the record return type from the transformation.
kafkaParams,
map,
messageAndMetadata ->
new AbstractMap.SimpleEntry<>(messageAndMetadata.topic(),
messageAndMetadata.message()));
Note I used Map.Entry
as Java's replacement for a Tuple2
in Scala. You can provide your own class which has a Partition
and Message
property as well and use the for the transformation. Note the type of the kafka input stream is now JavaInputDStream<Map.Entry>
, as that is what the transformation is returning.
Upvotes: 3