Reputation: 24452
The native Kafka API allows us to create and add a state store using the StreamsBuilder:
final StreamsBuilder builder = new StreamsBuilder();
...
final StoreBuilder<WindowStore<String, Long>> dedupStoreBuilder = Stores.windowStoreBuilder(
Stores.persistentWindowStore(storeName,
retentionPeriod,
windowSize,
false
),
Serdes.String(),
Serdes.Long());
builder.addStateStore(dedupStoreBuilder);
I would like to do the same using Spring Cloud Streams but can't figure out the way to access the StreamsBuilder
to add the store.
I've tried to retrieve the StreamsBuilderFactoryBean
as stated in the doc, hoping the I could get the StreamsBuilder
object from it, but the bean doesn't seem to be available:
@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val context: ApplicationContext) {
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
val streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean::class.java)
...
return xxx
}
}
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'stream-builder-process' available
In any case I'm not even sure that's the right way to do it. So, how can we programatically create a StateStore
?
Upvotes: 0
Views: 1998
Reputation: 24452
I didn't see the documented procedure because of my Scs version (Fishtown SR3), but the good news is that it's possible to create the State Store declaratively since Germantown:
const val DEDUP_STORE = "dedup-store"
@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration {
@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
return input.transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)
}
}
Upvotes: 1