Reputation: 11
I’m hoping someone can help with an issue I’m having regarding GlobalKTables in Kafka.
I”m trying to perform a KStream-GlobalKTable join. However, I want to retrieve all entries in the GlobalKTable whose Key or Value contains a string found in my original streaming event. For example, let's say my table has 3 rows with the following keys:
Key: BANK055DEPOSIT value: {some data}
Key: BANK055CREDIT value: {different data}
Key: BANK033CREDIT value: {more different data}
When I do a join on the table to retrieve data, I need to pull back all rows whose key or value contains "055". So I'd want the first 2 rows.
In the database world, this would be equivalent to something like the following:
SELECT * FROM GlobalKTable where table_key.contains("055”) OR table_value.contains(“055”)
I've scoured the official docs and have found no examples of how to do this. I have a suspicion that retrieving N number of rows from a GlobalKTable join is not achievable.
Also, I’m using the streams DSL for this. Unsure if this would be achievable using the Processor API. Any input is appreciated!
Upvotes: 1
Views: 1460
Reputation: 1360
When joining a KStream
with a GlobalKTable
, you can use parts of the key and value of the KStream,
but it must end up matching the entire GlobalKTable
key, so, unfortunately, you can't do what you stated above with a join.
But you should still be able to do something close to this even using the DSL. If you used KStream.transformValues
with a ValueTransformerWithKeySupplier
, you could scan the statestore and extract the records you want based on a substring contained in the stream record. Additionally, you don't necessarily need to scan the entire store but use a range query instead.
EDIT: Here's some code that I got working to demonstrate what I am getting at.
@SuppressWarnings("unchecked")
public class MultiResultJoinExample {
public static void main(String[] args) {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "mult-partial-key-join-results");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
final StreamsBuilder builder = new StreamsBuilder();
final String storeName = "kv-store";
final StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(),
Serdes.String());
builder.addStateStore(keyValueStoreBuilder);
final KStream<String, String> streamToJoinAgainst = builder.stream("to-join-input", Consumed.with(Serdes.String(), Serdes.String() ));
streamToJoinAgainst.transformValues(new StoringValueTransformer(storeName), storeName);
final KStream<String, String> streamNeedingJoin = builder.stream("need-join-input", Consumed.with(Serdes.String(), Serdes.String()));
streamNeedingJoin.flatTransformValues(new FlatMapJoiningTransformer(storeName), storeName).to("output", Produced.with(Serdes.String(), Serdes.String()));
final KafkaStreams streams = new KafkaStreams(builder.build(props), props);
streams.start();
}
static final class FlatMapJoiningTransformer implements ValueTransformerWithKeySupplier<String, String, Iterable<String>> {
final String storeName;
public FlatMapJoiningTransformer(String storeName) {
this.storeName = storeName;
}
@Override
public ValueTransformerWithKey<String, String, Iterable<String>> get() {
return new ValueTransformerWithKey<String, String, Iterable<String>>() {
private KeyValueStore<String, String> kvStore;
@Override
public void init(ProcessorContext<Void, Void> context) {
kvStore = (KeyValueStore<String, String>) context.getStateStore(storeName);
}
@Override
public Iterable<String> transform(String readOnlyKey, String value) {
List<String> results = new ArrayList<>();
final String patternToMatch = readOnlyKey.substring(4, 7);
try (KeyValueIterator<String, String> iter = kvStore.all()) {
while(iter.hasNext()) {
final KeyValue<String, String> kv = iter.next();
if (kv.key.contains(patternToMatch) || kv.value.contains(patternToMatch)){
results.add(kv.value + " - " + value);
}
}
}
return results;
}
@Override
public void close() {
}
};
}
}
static final class StoringValueTransformer implements ValueTransformerWithKeySupplier<String, String, String> {
final String storeName;
public StoringValueTransformer(String storeName) {
this.storeName = storeName;
}
@Override
public ValueTransformerWithKey<String, String, String> get() {
return new ValueTransformerWithKey<String, String, String>(){
private KeyValueStore<String, String> kvStore;
@Override
public void init(ProcessorContext<Void, Void> context) {
kvStore = (KeyValueStore<String, String>)context.getStateStore(storeName);
}
@Override
public String transform(String readOnlyKey, String value) {
kvStore.putIfAbsent(readOnlyKey, value);
return value;
}
@Override
public void close() {
//no-op
}
};
}
}
}
HTH, Bill
Upvotes: 1