Farag Zakaria
Farag Zakaria

Reputation: 198

StateStore is never added on Spring cloud

Any Help how can I add state store on Spring cloud

I always receive this error "nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore myStore is not added yet."

Here is the bean definition however it never works

@Bean
  public StoreBuilder storeBuilder() {
    KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("mystore");
    StoreBuilder<KeyValueStore<String, MyData>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), StreamsSerde.MyDataSerde());
    return storeBuilder;
  }

Here is the Serde

public static final class MyDataSerde extends Serdes.WrapperSerde<MyData> {
    public MyDataSerde() {
      super(new JsonSerializer<>(), new JsonDeserializer<>(MyData.class));
    }
  }

Here is the data class

public class MyData {
  private String name;
  private String course;
}

Here is the spring cloud dependencies

springBootVersion = "2.2.5.RELEASE"
set('springCloudVersion', "Hoxton.SR3")

implementation group:"org.springframework.cloud", name: "spring-cloud-stream"
    implementation group: "org.springframework.cloud", name: "spring-cloud-stream-binder-kafka-streams"
    implementation group: "org.springframework.cloud", name: "spring-cloud-starter-stream-kafka" 

Upvotes: 0

Views: 802

Answers (2)

Farag Zakaria
Farag Zakaria

Reputation: 198

I found a solution to add the store programmatically on this article

public void initializeStateStores() throws Exception {
   StreamsBuilderFactoryBean streamsBuilderFactoryBean =
         applicationContext.getBean("&stream-builder-requestListener", StreamsBuilderFactoryBean.class);
   StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject();
   StoreBuilder<KeyValueStore<String, Long>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName), Serdes.String(), Serdes.Long());
   streamsBuilder.addStateStore(keyValueStoreBuilder);
}

https://medium.com/@daniyaryeralin/utilizing-kafka-streams-processor-api-and-implementing-custom-aggregator-6cb23d00eaa7

Upvotes: 1

sobychacko
sobychacko

Reputation: 5914

You need to add state stores like this when you have to use the lower level processor or transformer API. Did you try to add the state store to your process or transform method call? Here is a test that works. Take a look at the process call and the way the state stores are passed along.

Upvotes: 1

Related Questions