Perry45
Perry45

Reputation: 86

How to Use Spring Cloud Stream Kafka Stream Binder with GlobalKTable

I'm working on a Spring Cloud Stream application where I need to write from a existing topic into a global store. So I want that each instance of my app / pod is consuming and writing the whole existing topic into a KeyValueStore. The important thing is I just want to use the Store and no addtional function.

So I know if I stream from an input to an output topic I can do it like this (https://github.com/spring-cloud/spring-cloud-stream/issues/2933)

But I don't have this function in the first place, so I tried to use a Consumer with the GlobalKTable like so:

@Configuration
@Log4j2
public class KafkaStreamsConfig {

    @Bean
    public Consumer<GlobalKTable<UUID, String>> globalResultConsumer() {
            return input -> log.info(
                    "GlobalResultConsumer is consuming in: {}",
                    input.queryableStoreName()
                );
    }
}

And:

  cloud:
    function:
      definition: globalResultConsumer
    stream:
      bindings:
        globalResultConsumer-in-0:
          destination: result-topic
      kafka:
        binder:
          brokers: localhost:9092
        streams:
          bindings:
            globalResultConsumer-in-0:
              consumer:
                materializedAs: global-result-store
                keySerde: org.apache.kafka.common.serialization.Serdes$UUIDSerde
                valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                configuration:
                  specific.avro.reader: true

It seems to be working fine, but is that 100% correct configuration? And how I could log the consumed messages which are written into the store?

Upvotes: 0

Views: 190

Answers (0)

Related Questions