Alka
Alka

Reputation: 267

How to map kafka topic names and respective records in spark streaming

I am streaming fro kafka topic like below;

JavaPairInputDStream<String, String> directKafkaStream = 
    KafkaUtils.createDirectStream(jssc,
                                  String.class, 
                                  String.class,
                                  StringDecoder.class,
                                  StringDecoder.class,
                                  kafkaParams, 
                                  topicSet);

directKafkaStream.print();   

The output gives like below for one topic:

(null,"04/15/2015","18:44:14")
(null,"04/15/2015","18:44:15")
(null,"04/15/2015","18:44:16")
(null,"04/15/2015","18:44:17")  

How do I map topic name and records.
ex: topic is "callData" ,it should something like below and so on

(callData,"04/15/2015","18:44:14")
(callData,"04/15/2015","18:44:15")
(callData,"04/15/2015","18:44:16")
(callData,"04/15/2015","18:44:17")  

Upvotes: 3

Views: 742

Answers (1)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149538

How do I map topic name and records?

In order to extract the partition information, you'll need to use the overload which accepts a Function receiving MessageAndMetadata<K, V> and returning the type you wish to transform to.

It looks like this:

Map<TopicAndPartition, Long> map = new HashMap<>();
map.put(new TopicAndPartition("topicname", 0), 1L);

JavaInputDStream<Map.Entry> stream = KafkaUtils.createDirectStream(
        javaContext,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        Map.Entry.class, // <--- This is the record return type from the transformation.
        kafkaParams,
        map,
        messageAndMetadata -> 
            new AbstractMap.SimpleEntry<>(messageAndMetadata.topic(),
                                          messageAndMetadata.message()));

Note I used Map.Entry as Java's replacement for a Tuple2 in Scala. You can provide your own class which has a Partition and Message property as well and use the for the transformation. Note the type of the kafka input stream is now JavaInputDStream<Map.Entry>, as that is what the transformation is returning.

Upvotes: 3

Related Questions