Reputation: 86
I'm working on a Spring Cloud Stream application where I need to write from a existing topic into a global store. So I want that each instance of my app / pod is consuming and writing the whole existing topic into a KeyValueStore. The important thing is I just want to use the Store and no addtional function.
So I know if I stream from an input to an output topic I can do it like this (https://github.com/spring-cloud/spring-cloud-stream/issues/2933)
But I don't have this function in the first place, so I tried to use a Consumer with the GlobalKTable like so:
@Configuration
@Log4j2
public class KafkaStreamsConfig {
@Bean
public Consumer<GlobalKTable<UUID, String>> globalResultConsumer() {
return input -> log.info(
"GlobalResultConsumer is consuming in: {}",
input.queryableStoreName()
);
}
}
And:
cloud:
function:
definition: globalResultConsumer
stream:
bindings:
globalResultConsumer-in-0:
destination: result-topic
kafka:
binder:
brokers: localhost:9092
streams:
bindings:
globalResultConsumer-in-0:
consumer:
materializedAs: global-result-store
keySerde: org.apache.kafka.common.serialization.Serdes$UUIDSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
configuration:
specific.avro.reader: true
It seems to be working fine, but is that 100% correct configuration? And how I could log the consumed messages which are written into the store?
Upvotes: 0
Views: 190