Reputation: 1055
This is my application that consumes data from a Kafka topic and then computed results are sent to a topic.
@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class WordCountProcessorApplication {
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<?, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-multi"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
How can I print a log befor every consume data from a Kafka topic?
Upvotes: 0
Views: 49
Reputation: 174484
Add a Transformer
at the beginning and end of your topology.
See this discussion where there was a request to automatically add custom transformers to the topology by the framework.
It was decided that the work around to add your own is sufficient.
Upvotes: 1