Reputation: 1143
I have a log compacted topic in Kafka which I am reading in a KTable and creating a store. Whenever I update a keyed message I don't see the latest state when the app is running. But I am able to see the updated state when I restart the streams application. How to get the updated state while the streams application is running without a restart.
Here's the code (in main function) that I am using to iterate the values from the state.
final String queryableStore = metaTable.queryableStoreName();
ReadOnlyKeyValueStore<String,String> metaTableView = null;
try {
metaTableView = waitUntilStoreIsQueryable(queryableStore,QueryableStoreTypes.keyValueStore(),streams);
} catch (InterruptedException e) {
e.printStackTrace();
}
metaTableView.all().forEachRemaining(row -> logger.info("Key: "+row.key+" Value: "+row.value));
Upvotes: 0
Views: 1920
Reputation: 62285
Not sure how you run the code. But if I understand it correctly, you expect that logger.info
prints each update? If this is the case, than this will not work, because .all()
iterates over the KTable once and terminates afterwards. Pay attention, that you are using "Interactive Queries" in your code example that are design for one-time lookups. You would need to re-run the query in your own code -- of course, you would get all record each time.
If you want to get the KTable updates continuously (and only the updates), you need to work with the KTable directly. For example, you could do
KTable table = builder.table(...);
table.toStream().foreach(...);
Each update to the KTable will produce an output record that is processed by foreach()
. Pay attention to record caching; you might want to disable caching to get all updates, otherwise, some updates might not be forwarded as they are subject to caching.
Upvotes: 3