guru107
guru107

Reputation: 1143

Keeping the local state store updated in a Kafka Streams app

I have a log compacted topic in Kafka which I am reading in a KTable and creating a store. Whenever I update a keyed message I don't see the latest state when the app is running. But I am able to see the updated state when I restart the streams application. How to get the updated state while the streams application is running without a restart.

Here's the code (in main function) that I am using to iterate the values from the state.

final String queryableStore = metaTable.queryableStoreName();

    ReadOnlyKeyValueStore<String,String> metaTableView = null;
    try {
        metaTableView = waitUntilStoreIsQueryable(queryableStore,QueryableStoreTypes.keyValueStore(),streams);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    metaTableView.all().forEachRemaining(row -> logger.info("Key: "+row.key+" Value: "+row.value));

Upvotes: 0

Views: 1920

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

Not sure how you run the code. But if I understand it correctly, you expect that logger.info prints each update? If this is the case, than this will not work, because .all() iterates over the KTable once and terminates afterwards. Pay attention, that you are using "Interactive Queries" in your code example that are design for one-time lookups. You would need to re-run the query in your own code -- of course, you would get all record each time.

If you want to get the KTable updates continuously (and only the updates), you need to work with the KTable directly. For example, you could do

KTable table = builder.table(...);
table.toStream().foreach(...);

Each update to the KTable will produce an output record that is processed by foreach(). Pay attention to record caching; you might want to disable caching to get all updates, otherwise, some updates might not be forwarded as they are subject to caching.

Upvotes: 3

Related Questions