MaatDeamon
MaatDeamon

Reputation: 9761

Kafka Stream custom State Store

I have been readying the doc about state store but it is still not clear to me if it can fit my purpose. I would like to use some Distributed Graph Database as as a state store that other external application can consume from. Is that possible, what effort does that involve and can anyone point me to the class/code that will need to be extended for that functionality to happen.

Upvotes: 1

Views: 2063

Answers (1)

Nishu Tayal
Nishu Tayal

Reputation: 20810

You can implement custom state store using Processor API as described here :
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores

  • Your custom state store must implement StateStore.
  • You must have an interface to represent the operations available on the store.
  • You must provide an implementation of StoreBuilder for creating instances of your store.
  • It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.

Implementation will look something like this :

public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> {
  // implementation of the actual store
}

// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V> {
  void write(K Key, V value);
}

// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V> {
  V read(K key);
}

public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>> {
  // implementation of the supplier for MyCustomStore
}

In order to make it queryable;

  • Provide an implementation of QueryableStoreType.
  • Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.

Example :

public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>> {

  // Only accept StateStores that are of type MyCustomStore
  public boolean accepts(final StateStore stateStore) {
    return stateStore instanceOf MyCustomStore;
  }

  public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName) {
      return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
  }

}

Upvotes: 5

Related Questions