Reputation: 2243
I have the following code
@SpringBootApplication
public class EnrichmentProcessPOC {
public static void main(String[] args) {
SpringApplication.run(EnrichmentProcessPOC.class, args);
}
public static class EnrichmentProcessorApplication {
public static final String INPUT_TOPIC = "PAYMENT_MSG";
public static final String OUTPUT_TOPIC = "PAYMENT_MSG_CIF";
@Bean
public Function<KStream<Bytes, String>, KStream<Bytes, String>> process() {
return input -> input.mapValues(value -> "foo");
}
}
}
My expectation is that this should replace every messaged consumed with "foo", but all it does is replicate the message on the producer topic. What am I missing here, and why is it doing this?
I also tried using a Transformer, but it has the same behavior. What is the simplest way to actually transform a message into "foo"?
Upvotes: 2
Views: 697
Reputation: 2243
Okay, I am an idiot, and the original code works fine, but when I started up the Kafka console consumer and producer, I started them both on the same topic (copy/paste error).
Now that I have them on different topics, my code runs as expected.
Thanks everyone for your input.
Upvotes: 3
Reputation: 174484
Can you show your configuration? This works as expected for me:
@SpringBootApplication
public class So64158395Application {
public static void main(String[] args) {
SpringApplication.run(So64158395Application.class, args);
}
@Bean
public Function<KStream<byte[], byte[]>, KStream<byte[], byte[]>> process() {
return input -> input.map((key, value) -> KeyValue.pair(key, "foo".getBytes()));
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("in", "bar", "baz");
};
}
@KafkaListener(id = "outListener", topics = "out")
void listen(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println(key + ":" + value);
}
}
spring.cloud.stream.bindings.process-in-0.destination=in
spring.cloud.stream.bindings.process-in-0.group=group
spring.cloud.stream.bindings.process-out-0.destination=out
Result:
bar:foo
Upvotes: 1
Reputation: 1
Have you tried using the function "mapValues" instead of map?
Here is the difference between the two according to the documentation -
map
Transform each record of the input stream into a new record in the output stream >(both key and value type can be altered arbitrarily).
mapValues
Transform the value of each input record into a new value (with possible new >type) of the output record.
Upvotes: 0