Reputation: 557
Is it possible to use interactive query (InteractiveQueryService) within Spring Cloud Stream the class with @EnableBinding annotation or within the method with @StreamListener? I tried instantiating ReadOnlyKeyValueStore within provided KStreamMusicSampleApplication class and process method but its always null.
My @StreamListener method is listening to a bunch of KTables and KStreams and during the process topology e.g filtering, I have to check whether the key from a KStream already exists in a particular KTable.
I tried to figure out how to scan an incoming KTable to check if a key already exists but no luck. Then I came across InteractiveQueryService whose get() method could be used to check if a key exists inside a state store materializedAs from a KTable. The problem is that I can't access it from with the process topology (@EnableBinding or @StreamListener). It can only be accessed from outside these annotation e.g RestController.
Is there a way to scan an incoming KTable to check for the existence of a key or value? if not then can we access InteractiveQueryService within the process topology?
Upvotes: 1
Views: 1643
Reputation: 5904
InteractiveQueryService
in Spring Cloud Stream is not available to be used within the actual topology in your StreamListener
. As you mentioned, it is supposed to be used outside of your main topology. However, with the use case you described, you still can use the state store from your main flow. For example, if you have an incoming KStream
and a KTable
which is materialized as a state store, then you can call process
on the KStream
and access the state store that way. Here is a rough code to achieve that. You need to convert this to fit into your specific use case, but here is the idea.
ReadOnlyKeyValueStore<Object, String> store;
input.process(() -> new Processor<Object, Product>() {
@Override
public void init(ProcessorContext processorContext) {
store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");
}
@Override
public void process(Object key, Object value) {
//find the key
store.get(key);
}
@Override
public void close() {
if (state != null) {
state.close();
}
}
}, "my-store");
Upvotes: 4