Veha Suwatphisankij
Veha Suwatphisankij

Reputation: 53

How to get current Kafka topic inside Kafka stream?

my scenario is I use make a lot of Kafka topic which share prefix (ex. house.door, house.room ) and consume all topic by using Kafka stream regex topic pattern API. everything look good,I get key and message of the data.

In order to process data I need topic name so I can do join base on topic name, but I do not know how to get topic name inside Kafka stream DSL.

one possible way to solve my problem is save topic name with my message. but It's would be better if I can get topic name directly.

so, How to get current Kafka topic inside Kafka stream ?

Upvotes: 5

Views: 5252

Answers (3)

dmkaratza
dmkaratza

Reputation: 1

FAQ: https://docs.confluent.io/current/streams/faq.html#accessing-record-> metadata-such-as-topic-partition-and-offset-information

I can also confirm that the Processor API gives you access to the kafka record's metadata. It worth highlighting that record's metadata should be extracted from the ProcessorContext under the org.apache.kafka.streams.processor and not the org.apache.kafka.streams.processor.api package, otherwise the metadata are not correct. More specifically, I noticed that the record's offset is always 0 when using the Processor from the latter package.

Upvotes: 0

Raghavendra Acharya
Raghavendra Acharya

Reputation: 171

To add to Matthias J. Sax point i have attached sample code to show, how it can be done.

public static void main(final String[] args) {
    try {
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streamProcessor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.STATE_DIR_CONFIG, "state-store");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        final KStream<String, String> textLines = streamsBuilder.stream(inputTopicList);
        final KStream<String, String> textLines = builder.stream(inputTopiclist);
        textLines.transform(getTopicDetailsTransformer::new)
        .foreach(new ForeachAction<String, String>() {
            public void apply(String key, String value) {
                System.out.println(key + ": " + value);
            }
        });
        textLines.to(outputTopic);
    } catch (Exception e) {
        System.out.println(e);
    }
}
 private static class getTopicDetailsTransformer implements Transformer<String, String, KeyValue<String, String>> {

        private ProcessorContext context;

        @Override
        public void init(final ProcessorContext context) {
             this.context = context;
        }

        public KeyValue<String, String> transform(final String recordKey, final String recordValue) {

          //here i am returning key as topic name.
          return KeyValue.pair(context.topic(), recordValue);
        }

        @Override
        public void close() {
          // Not needed.
        }

      }

Upvotes: 2

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

FAQ: https://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information

Record metadata is accessible through the Processor API. It is also accessible indirectly through the DSL thanks to its Processor API integration.

With the Processor API, you can access record metadata through a ProcessorContext. You can store a reference to the context in an instance field of your processor during Processor#init(), and then query the processor context within Processor#process(), for example (same for Transformer). The context is updated automatically to match the record that is currently being processed, which means that methods such as ProcessorContext#partition() always return the current record’s metadata. Some caveats apply when calling the processor context within scheduled punctuate() function, see the Javadocs for details.

If you use the DSL combined with a custom Transformer, for example, you could transform an input record’s value to also include partition and offset metadata, and subsequent DSL operations such as map or filter could then leverage this information.

Upvotes: 1

Related Questions