Reputation: 320
I'm trying to build a Kafka streams application using the new version of the DSL (v1.0) but I don't see how to configure a stateful stream transformation. A basic but complete example of how to achieve this would be very helpful.
I didn't find any (stateful) transform examples in the source code. According to the documentation the following strategy should be followed:
StateStoreSupplier myStore = Stores.create("myTransformState")
.withKeys(...)
.withValues(...)
.persistent() // optional
.build();
builder.addStore(myStore);
KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState");
However, it's not clear what the type of builder
should be in the example, none of Topology
or StreamsBuilder
has a method addStore
. If I try addStateStore
instead it only accepts an argument of type StoreBuilder
which is not the type of myStore
defined.
Upvotes: 3
Views: 1173
Reputation: 62350
As JavaDocs explain, Stores#create
is deprecated in 1.0.0:
@deprecated use persistentKeyValueStore(String), persistentWindowStore(String, long, int, long, boolean), persistentSessionStore(String, long), lruMap(String, int). or inMemoryKeyValueStore(String)
Thus, in your case you would create a persistent key-value store supplier via Stores.persistentKeyValueStore("myTransformState")
In a second step, you need to create a StoreBuilder via Stores.keyValueStoreBuilder(...)
that takes you previously created store supplier as argument.
Afterwards, you can add the StoreBuilder
to your builder
StreamsBuilder#addStateStore(final StoreBuilder builder)
To connect the store to your transformer you just provide the store name as additional argument as before.
Upvotes: 4