Ali Yeganeh
Ali Yeganeh

Reputation: 1055

Spring Cloud Stream’s Apache Kafka

This is my application that consumes data from a Kafka topic and then computed results are sent to a topic.

@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class WordCountProcessorApplication {

    @StreamListener("input")
    @SendTo("output")
    public KStream<?, WordCount> process(KStream<?, String> input) {
        return input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("WordCounts-multi"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
    }

    public static void main(String[] args) {
        SpringApplication.run(WordCountProcessorApplication.class, args);
    }

How can I print a log befor every consume data from a Kafka topic?

Upvotes: 0

Views: 49

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

Add a Transformer at the beginning and end of your topology.

See this discussion where there was a request to automatically add custom transformers to the topology by the framework.

It was decided that the work around to add your own is sufficient.

Upvotes: 1

Related Questions