Reputation: 3787
I have a Kafka Streaming App that with 2 data sources: Events and Users.
I have a 4 topics: Events, Users, Users2, User-Events
Users2 is the same as Users and is used to demonstrate the GlobalKTable.
The Events topic uses timestamp mode, so when the timestamp-field date is reached, the KStream will receive the Event record.
At this point, I want to create a User-Event record for every User-ID in the Users KTable along with the new Event-ID; but I don't know how to iterate through the GlobalKTable nor the KTable to achieve this.
public Topology createTopology() {
final Serde<String> serde = Serdes.String();
final StreamsBuilder builder = new StreamsBuilder();
final GlobalKTable<String, String> gktUsers =
builder.globalTable("Users",
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("user-store")
.withKeySerde(Serdes.String()).withValueSerde(serde));
final KTable<String, String> ktUsers = builder.table("Users2");
builder.stream("Events", Consumed.with(Serdes.String(), serde))
.peek((k, v) -> {
// This is called when a new Event record becomes current.
// How do I iterate through gktUsers at this point
// and output a User-ID and an Event-ID to the User-Events topic?
// This type of iteration doesn't work either.
ktUsers.toStream().foreach(new ForeachAction<String, String>() {
@Override
public void apply(String s, String s2) {
log.info("{} {}", s, s2);
}
});
})
builder.build();
}
Upvotes: 0
Views: 2904
Reputation: 191874
You need to get that statestore to iterate it
ReadOnlyKeyValueStore<String, String> keyValueStore = streams.store("user-store", QueryableStoreTypes.keyValueStore()));
String value = store.get("key");
KeyValueIterator<String, String> range = keyValueStore.all();
https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html
Or, maybe you should rather be doing a stream-table join between users and events
Upvotes: 1