Majid Azimi
Majid Azimi

Reputation: 5745

Queryble filtered KTable

Here is a sample KTable that I've built which is a straightforward aggregation:

String name = stream
    .groupByKey()
    .aggregate(
        () -> new Aggregate(config),
        (key, value, aggregate) -> aggregate.addAndReturn(value),
        Materialized
            .<String, Aggregate>as(Stores.inMemoryKeyValueStore(config.OutputStore()))
            .withCachingEnabled()
            .withKeySerde(Serdes.String())
            .withValueSerde(CustomSerdes.ObjectSerde()))
    .filter(((key, value) -> value.isStateChanged()))
    .filter((key, value) -> !value.getRecentlyViewed().isEmpty())
    .queryableStoreName();

What I need to do is to store the final KTable(after filtering applied) in the state store rather than the initial KTable. Currently KTable.queryableStoreName() returns null.

My current solution is to apply filter(), then convert to stream using KTable.toStream() and finally store as KTable again which I think it's inefficient. Is there any other solution

Upvotes: 1

Views: 1137

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

You can enforce the materialization of a KTable by providing a queryable store name:

.aggregate()
.filter(..., Materialized.as("your-custom-store-name"));

Depending on the version you are using, you might need to specify some generics to make it compile:

Materialized<KEY_TYPE, VALUE_TYPE, KeyValueStore<Bytes, byte[]>>.as("your-custom-store-name"))

Upvotes: 1

Related Questions