Eric Kolotyluk
Eric Kolotyluk

Reputation: 2243

What is the simplest way to transform messages in Spring Cloud Kafka?

I have the following code

@SpringBootApplication
public class EnrichmentProcessPOC {

    public static void main(String[] args) {
        SpringApplication.run(EnrichmentProcessPOC.class, args);
    }

    public static class EnrichmentProcessorApplication {

        public static final String INPUT_TOPIC = "PAYMENT_MSG";
        public static final String OUTPUT_TOPIC = "PAYMENT_MSG_CIF";

        @Bean
        public Function<KStream<Bytes, String>, KStream<Bytes, String>> process() {

            return input -> input.mapValues(value -> "foo");
        }
    }
}

My expectation is that this should replace every messaged consumed with "foo", but all it does is replicate the message on the producer topic. What am I missing here, and why is it doing this?

I also tried using a Transformer, but it has the same behavior. What is the simplest way to actually transform a message into "foo"?

Upvotes: 2

Views: 697

Answers (3)

Eric Kolotyluk
Eric Kolotyluk

Reputation: 2243

Okay, I am an idiot, and the original code works fine, but when I started up the Kafka console consumer and producer, I started them both on the same topic (copy/paste error).

Now that I have them on different topics, my code runs as expected.

Thanks everyone for your input.

Upvotes: 3

Gary Russell
Gary Russell

Reputation: 174484

Can you show your configuration? This works as expected for me:

@SpringBootApplication
public class So64158395Application {

    public static void main(String[] args) {
        SpringApplication.run(So64158395Application.class, args);
    }

    @Bean
    public Function<KStream<byte[], byte[]>, KStream<byte[], byte[]>> process() {
        return input -> input.map((key, value) -> KeyValue.pair(key, "foo".getBytes()));
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("in", "bar", "baz");
        };
    }

    @KafkaListener(id = "outListener", topics = "out")
    void listen(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        System.out.println(key + ":" + value);
    }

}
spring.cloud.stream.bindings.process-in-0.destination=in
spring.cloud.stream.bindings.process-in-0.group=group
spring.cloud.stream.bindings.process-out-0.destination=out

Result:

bar:foo

Upvotes: 1

Have you tried using the function "mapValues" instead of map?

Here is the difference between the two according to the documentation -

map

Transform each record of the input stream into a new record in the output stream >(both key and value type can be altered arbitrarily).

mapValues

Transform the value of each input record into a new value (with possible new >type) of the output record.

Upvotes: 0

Related Questions