codependent
codependent

Reputation: 24452

How to add a StateStore using StateStoreBuilder in a Spring Cloud Stream Kafka Streams application

The native Kafka API allows us to create and add a state store using the StreamsBuilder:

    final StreamsBuilder builder = new StreamsBuilder();
    ...
    final StoreBuilder<WindowStore<String, Long>> dedupStoreBuilder = Stores.windowStoreBuilder(
            Stores.persistentWindowStore(storeName,
                                         retentionPeriod,
                                         windowSize,
                                         false
            ),
            Serdes.String(),
            Serdes.Long());

    builder.addStateStore(dedupStoreBuilder);

I would like to do the same using Spring Cloud Streams but can't figure out the way to access the StreamsBuilder to add the store.

I've tried to retrieve the StreamsBuilderFactoryBean as stated in the doc, hoping the I could get the StreamsBuilder object from it, but the bean doesn't seem to be available:

@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val context: ApplicationContext) {

    @StreamListener
    @SendTo("output")
    fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {

        val streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean::class.java)
        ...
        return xxx

    }

}

Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'stream-builder-process' available

In any case I'm not even sure that's the right way to do it. So, how can we programatically create a StateStore?

Upvotes: 0

Views: 1998

Answers (1)

codependent
codependent

Reputation: 24452

I didn't see the documented procedure because of my Scs version (Fishtown SR3), but the good news is that it's possible to create the State Store declaratively since Germantown:

const val DEDUP_STORE = "dedup-store"

@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration {

    @KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
    @StreamListener
    @SendTo("output")
    fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
        return input.transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)

    }

}

Upvotes: 1

Related Questions