Shajeer
Shajeer

Reputation: 21

Invoking Kafka Interactive Queries from inside a Stream

I have a particular requirement for invoking an Interactive Query from inside a Stream . This is because I need to create a new Stream which should have data contained inside the State Store. Truncated code below:

tempModifiedDataStream.to(topic.getTransformedTopic(), Produced.with(Serdes.String(), Serdes.String()));

GlobalKTable<String, String> myMetricsTable = builder.globalTable(
    topic.getTransformedTopic(),
    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(
            topic.getTransformedStoreName() /* table/store name */)
        .withKeySerde(Serdes.String()) /* key serde */
        .withValueSerde(Serdes.String()) /* value serde */
);

KafkaStreams streams = new KafkaStreams(builder.build(), kStreamsConfigs());

KStream<String, String> tempAggrDataStream = tempModifiedDataStream
    .flatMap((key, value) -> {
        try {
            List<KeyValue<String, String>> result = new ArrayList<>();

            ReadOnlyKeyValueStore<String, String> keyValueStore =
                streams .store(
                    topic.getTransformedStoreName(),
                    QueryableStoreTypes.keyValueStore());

In the last line, To access the State Store I need to have the KafkaStreams object and the Topology is finalized when I create the KafkaStreams object. The problem with this approach is that the 'tempAggrDataStream' is hence not part of the Topology and that part of the code does not get executed. And I cant move the KafkaStreams definition below as otherwise I can't call the Interactive Query.

I am a bit new to Kafka Streams ; so is this something silly from my side?

Upvotes: 2

Views: 721

Answers (1)

Bartosz Wardziński
Bartosz Wardziński

Reputation: 6613

If you want to achieve sending all content of the topic content after each data modification, I think you should rather use Processor API.

You could create org.apache.kafka.streams.kstream.Transformer with state store. For each processing message it will update state store and send all content to downstream. It is not very efficient, because it will be forwarding for each processing message the whole content of the topic/state store (that can be thousands, millions of records).

If you need only latest value it is enough to set your topic cleanup.policy to compact. And from other site use KTable, which give abstraction of Table (Snapshot of stream)

Sample Transformer code for forwarding whole content of state store is as follow. The whole work is done in transform(String key, String value) method.

public class SampleTransformer
        implements Transformer<String, String, KeyValue<String, String>> {

    private String stateStoreName;
    private KeyValueStore<String, String> stateStore;
    private ProcessorContext context;

    public SampleTransformer(String stateStoreName) {
        this.stateStoreName = stateStoreName;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        this.context = context;
        stateStore = (KeyValueStore) context.getStateStore(stateStoreName);
    }

    @Override
    public KeyValue<String, String> transform(String key, String value) {
        stateStore.put(key, value);
        stateStore.all().forEachRemaining(keyValue -> context.forward(keyValue.key, keyValue.value));
        return null;
    }

    @Override
    public void close() {

    }
}

More information about Processor APi can be found:

How to combine Processor API with Stream DSL can be found:

Upvotes: 1

Related Questions