Zunair Syed
Zunair Syed

Reputation: 93

Deleting record in Kafka StateStore does not work (NullPointerException thrown on .delete(key))

I have a materialized in-memory statestore in my code. I have another separate stream that is supposed to look up and delete records based on some criteria.

I need to allow my stream to access and delete records in the previously built statestore. I have the following code below

@bean
public StreamBuilder myStreamCodeBean(StreamBuilder streamBuilder) {
    //create store supplier
    KeyValueBytesStoreSupplier myStoreSupplier = Stores.inMemoryKeyValueStore("MyStateStore");

    //materialize statstore and enable caching
    Materialized materializedStore = Materialized.<String, MyObject>as(myStoreSupplier)
            .withKeySerde(Serdes.String())
            .withValueSerde(myObjectSerde)
            .withCachingEnabled();

    //other code here that creates KTable, and another stream to consume records into ktable from another topic
    //........

    //another stream that consumes another topic and deletes records in store with some logic
    streamsBuilder
        .stream("someTopicName", someConsumerObject)
        .filter((key, value) -> {
            KeyValueStore<Bytes, byte[]> kvStore = myStoreSupplier.get();
            kvStore.delete(key);  //StateStore never "open" and this throws nullpointerexception (even tho key is NOT null)
            return true;
        }
        .to("some topic name here", producerObject);
    return streamBuilder;
}

The error that is thrown is really generic. The error is that Kafka streams is not running.

Doing some debugging, I found that my statestore isn't "open" when doing delete.

What am I doing wrong here? I can read records using ReadOnlyKeyValueStore but I need to delete so I can't use that.

any help appreciated.

Upvotes: 0

Views: 999

Answers (1)

JavaTechnical
JavaTechnical

Reputation: 9357

State store must be accessed through the processor's context and not using the supplier object.

After creating the store, you need to ensure that it is accessible by the processor in which you are trying to access the store from.


If your store is a local store, then you need to specify which processors are going to access the store.

If your store is a global store, then it is accessible by all the processors in the topology.


You are creating a stream using streamsBuilder.stream() and at least from the code you have posted, you doesn't seem to give your processor access to the state store.

  1. Ensure that you have called addStateStore() in StreamsBuilder
  2. To get the state store in the processor, we need to use context.getStateStore(storeName). You can refer the following example

  3. (I don't think we can access state store in filter() because it is a stateless operation). So, you can use Processor or Transformer for and pass in the state store names (MyStateStore in your case).

Upvotes: 2

Related Questions