Reputation: 198
Any Help how can I add state store on Spring cloud
I always receive this error "nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore myStore is not added yet."
Here is the bean definition however it never works
@Bean
public StoreBuilder storeBuilder() {
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("mystore");
StoreBuilder<KeyValueStore<String, MyData>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), StreamsSerde.MyDataSerde());
return storeBuilder;
}
Here is the Serde
public static final class MyDataSerde extends Serdes.WrapperSerde<MyData> {
public MyDataSerde() {
super(new JsonSerializer<>(), new JsonDeserializer<>(MyData.class));
}
}
Here is the data class
public class MyData {
private String name;
private String course;
}
Here is the spring cloud dependencies
springBootVersion = "2.2.5.RELEASE"
set('springCloudVersion', "Hoxton.SR3")
implementation group:"org.springframework.cloud", name: "spring-cloud-stream"
implementation group: "org.springframework.cloud", name: "spring-cloud-stream-binder-kafka-streams"
implementation group: "org.springframework.cloud", name: "spring-cloud-starter-stream-kafka"
Upvotes: 0
Views: 802
Reputation: 198
I found a solution to add the store programmatically on this article
public void initializeStateStores() throws Exception {
StreamsBuilderFactoryBean streamsBuilderFactoryBean =
applicationContext.getBean("&stream-builder-requestListener", StreamsBuilderFactoryBean.class);
StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject();
StoreBuilder<KeyValueStore<String, Long>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName), Serdes.String(), Serdes.Long());
streamsBuilder.addStateStore(keyValueStoreBuilder);
}
Upvotes: 1
Reputation: 5914
You need to add state stores like this when you have to use the lower level processor or transformer API. Did you try to add the state store to your process
or transform
method call? Here is a test that works. Take a look at the process
call and the way the state stores are passed along.
Upvotes: 1