Reputation: 4778
Cannot get custom store connected to my Transformer in Spring Cloud Stream Binder Kafka 3.x (functional style) following examples from here.
I am defining a KeyValueStore
as a @Bean with type StoreBuilder<KeyValueStore<String,Long>>
:
@Bean
public StoreBuilder<KeyValueStore<String,Long>> myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.String(),
Serdes.Long());
}
@Bean
@DependsOn({"myStore"})
public MyTransformer myTransformer() {
return new MyTransformer("my-store");
}
In debugger I can see that the beans get initialised. In my stream processor function then:
return myStream -> {
return myStream
.peek(..)
.transform(() -> myTransformer())
...
MyTransformer
is declared as
public class MyTransformer implements Transformer<String, MyEvent, KeyValue<KeyValue<String,Long>, MyEvent>> {
...
@Override
public void init(final ProcessorContext context) {
this.context = context;
this.myStore = context.getStateStore(storeName);
}
Getting the following error when application context starts up from my unit test:
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor KSTREAM-TRANSFORM-0000000002 has no access to StateStore my-store as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA.
In the application startup logs when running my unit test, I can see that the store seems to get created:
2021-04-06 00:44:43.806 INFO [ main] .k.s.AbstractKafkaStreamsBinderProcessor : state store my-store added to topology
I'm already using pretty much every feature of the Spring Cloud Stream Binder Kafka in my app and from my unit test, everything works very well. Unexpectedly, I got stuck at adding the custom KeyValueStore to my Transformer. It would be great, if you could spot an error in my setup.
The versions I'm using right now:
org.springframework.boot:spring-boot:jar:2.4.4
org.springframework.kafka:spring-kafka:jar:2.6.7
org.springframework.kafka:spring-kafka-test:jar:2.6.7
org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:jar:3.0.4.RELEASE
org.apache.kafka:kafka-streams:jar:2.7.0
I've just tried with
org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:jar:3.1.3-SNAPSHOT
and the issue seems to persist.
Upvotes: 0
Views: 1276
Reputation: 5924
In your processor function, when you call .transform(() -> myTransformer())
, you also need to provide the state store names in order for this to be connected to that transformer. There are some overloaded transform
methods in the KStream
API that takes state store names as a vararg. I wonder if this is the issue that you are running into. You may want to change that call to .transform(() -> myTransformer(), "myStore")
.
Upvotes: 4