Reputation: 5745
Here is a sample KTable
that I've built which is a straightforward aggregation:
String name = stream
.groupByKey()
.aggregate(
() -> new Aggregate(config),
(key, value, aggregate) -> aggregate.addAndReturn(value),
Materialized
.<String, Aggregate>as(Stores.inMemoryKeyValueStore(config.OutputStore()))
.withCachingEnabled()
.withKeySerde(Serdes.String())
.withValueSerde(CustomSerdes.ObjectSerde()))
.filter(((key, value) -> value.isStateChanged()))
.filter((key, value) -> !value.getRecentlyViewed().isEmpty())
.queryableStoreName();
What I need to do is to store the final KTable
(after filtering applied) in the state store rather than the initial KTable
. Currently KTable.queryableStoreName()
returns null
.
My current solution is to apply filter()
, then convert to stream using KTable.toStream()
and finally store as KTable
again which I think it's inefficient. Is there any other solution
Upvotes: 1
Views: 1137
Reputation: 62285
You can enforce the materialization of a KTable
by providing a queryable store name:
.aggregate()
.filter(..., Materialized.as("your-custom-store-name"));
Depending on the version you are using, you might need to specify some generics to make it compile:
Materialized<KEY_TYPE, VALUE_TYPE, KeyValueStore<Bytes, byte[]>>.as("your-custom-store-name"))
Upvotes: 1