jayz_135
jayz_135

Reputation: 11

Can KStream-GlobalKTable join return multiple matching records for a specific search?

I’m hoping someone can help with an issue I’m having regarding GlobalKTables in Kafka.

I”m trying to perform a KStream-GlobalKTable join. However, I want to retrieve all entries in the GlobalKTable whose Key or Value contains a string found in my original streaming event. For example, let's say my table has 3 rows with the following keys:

Key: BANK055DEPOSIT value: {some data}
 Key: BANK055CREDIT value: {different data}
 Key: BANK033CREDIT value: {more different data}

When I do a join on the table to retrieve data, I need to pull back all rows whose key or value contains "055". So I'd want the first 2 rows.

In the database world, this would be equivalent to something like the following:

SELECT * FROM GlobalKTable where table_key.contains("055”) OR table_value.contains(“055”)

I've scoured the official docs and have found no examples of how to do this. I have a suspicion that retrieving N number of rows from a GlobalKTable join is not achievable.

Also, I’m using the streams DSL for this. Unsure if this would be achievable using the Processor API. Any input is appreciated!

Upvotes: 1

Views: 1460

Answers (1)

bbejeck
bbejeck

Reputation: 1360

When joining a KStream with a GlobalKTable, you can use parts of the key and value of the KStream, but it must end up matching the entire GlobalKTable key, so, unfortunately, you can't do what you stated above with a join.

But you should still be able to do something close to this even using the DSL. If you used KStream.transformValues with a ValueTransformerWithKeySupplier, you could scan the statestore and extract the records you want based on a substring contained in the stream record. Additionally, you don't necessarily need to scan the entire store but use a range query instead.

EDIT: Here's some code that I got working to demonstrate what I am getting at.

@SuppressWarnings("unchecked")
public class MultiResultJoinExample {

    public static void main(String[] args) {
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "mult-partial-key-join-results");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        final StreamsBuilder builder = new StreamsBuilder();

        final String storeName = "kv-store";
        final StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
                        Serdes.String(),
                        Serdes.String());
        builder.addStateStore(keyValueStoreBuilder);

        final KStream<String, String> streamToJoinAgainst = builder.stream("to-join-input", Consumed.with(Serdes.String(), Serdes.String() ));

        streamToJoinAgainst.transformValues(new StoringValueTransformer(storeName), storeName);

        final KStream<String, String> streamNeedingJoin = builder.stream("need-join-input", Consumed.with(Serdes.String(), Serdes.String()));

        streamNeedingJoin.flatTransformValues(new FlatMapJoiningTransformer(storeName), storeName).to("output", Produced.with(Serdes.String(), Serdes.String()));

        final KafkaStreams streams = new KafkaStreams(builder.build(props), props);
        streams.start();
    }

    static final class FlatMapJoiningTransformer implements ValueTransformerWithKeySupplier<String, String, Iterable<String>> {
        final String storeName;

        public FlatMapJoiningTransformer(String storeName) {
            this.storeName = storeName;
        }

        @Override
        public ValueTransformerWithKey<String, String, Iterable<String>> get() {
            return new ValueTransformerWithKey<String, String, Iterable<String>>() {
               private KeyValueStore<String, String> kvStore;
                @Override
                public void init(ProcessorContext<Void, Void> context) {
                    kvStore = (KeyValueStore<String, String>) context.getStateStore(storeName);
                }

                @Override
                public Iterable<String> transform(String readOnlyKey, String value) {
                      List<String> results = new ArrayList<>();
                      final String patternToMatch = readOnlyKey.substring(4, 7);
                      try (KeyValueIterator<String, String> iter =  kvStore.all()) {
                           while(iter.hasNext()) {
                               final KeyValue<String, String> kv = iter.next();
                                if (kv.key.contains(patternToMatch) || kv.value.contains(patternToMatch)){
                                    results.add(kv.value + " - " + value);
                                }
                           }
                      }
                      return results;
                }

                @Override
                public void close() {

                }
            };
        }
    }

    static final class StoringValueTransformer implements ValueTransformerWithKeySupplier<String, String, String> {
        final String storeName;

        public StoringValueTransformer(String storeName) {
            this.storeName = storeName;
        }

        @Override
        public ValueTransformerWithKey<String, String, String> get() {
            return new ValueTransformerWithKey<String, String, String>(){
                private KeyValueStore<String, String> kvStore;
                @Override
                public void init(ProcessorContext<Void, Void> context) {
                       kvStore = (KeyValueStore<String, String>)context.getStateStore(storeName);
                }

                @Override
                public String transform(String readOnlyKey, String value) {
                    kvStore.putIfAbsent(readOnlyKey, value);
                    return value;
                }

                @Override
                public void close() {
                     //no-op
                }
            };
        }
    }
}

HTH, Bill

Upvotes: 1

Related Questions