veryltdbeard
veryltdbeard

Reputation: 320

How to transform with state?

I'm trying to build a Kafka streams application using the new version of the DSL (v1.0) but I don't see how to configure a stateful stream transformation. A basic but complete example of how to achieve this would be very helpful.

I didn't find any (stateful) transform examples in the source code. According to the documentation the following strategy should be followed:

 StateStoreSupplier myStore = Stores.create("myTransformState")
 .withKeys(...)
 .withValues(...)
 .persistent() // optional
 .build();

 builder.addStore(myStore);

 KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState");

However, it's not clear what the type of builder should be in the example, none of Topology or StreamsBuilder has a method addStore. If I try addStateStore instead it only accepts an argument of type StoreBuilder which is not the type of myStore defined.

Upvotes: 3

Views: 1173

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

As JavaDocs explain, Stores#create is deprecated in 1.0.0:

@deprecated use persistentKeyValueStore(String), persistentWindowStore(String, long, int, long, boolean), persistentSessionStore(String, long), lruMap(String, int). or inMemoryKeyValueStore(String)

Thus, in your case you would create a persistent key-value store supplier via Stores.persistentKeyValueStore("myTransformState")

In a second step, you need to create a StoreBuilder via Stores.keyValueStoreBuilder(...) that takes you previously created store supplier as argument.

Afterwards, you can add the StoreBuilder to your builder

StreamsBuilder#addStateStore(final StoreBuilder builder)

To connect the store to your transformer you just provide the store name as additional argument as before.

Upvotes: 4

Related Questions