paiego
paiego

Reputation: 3787

How to Iterate through a GlobalKTable or KTable in Kafka Streaming App

I have a Kafka Streaming App that with 2 data sources: Events and Users.

I have a 4 topics: Events, Users, Users2, User-Events

Users2 is the same as Users and is used to demonstrate the GlobalKTable.

The Events topic uses timestamp mode, so when the timestamp-field date is reached, the KStream will receive the Event record.

At this point, I want to create a User-Event record for every User-ID in the Users KTable along with the new Event-ID; but I don't know how to iterate through the GlobalKTable nor the KTable to achieve this.

public Topology createTopology() {

    final Serde<String> serde = Serdes.String();
    final StreamsBuilder builder = new StreamsBuilder();

    final GlobalKTable<String, String> gktUsers =
            builder.globalTable("Users",
                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("user-store")
                            .withKeySerde(Serdes.String()).withValueSerde(serde));

    final KTable<String, String> ktUsers = builder.table("Users2");

    builder.stream("Events", Consumed.with(Serdes.String(), serde))
            .peek((k, v) -> {
                // This is called when a new Event record becomes current.
                // How do I iterate through gktUsers at this point
                // and output a User-ID and an Event-ID to the User-Events topic?

                //  This type of iteration doesn't work either.
                ktUsers.toStream().foreach(new ForeachAction<String, String>() {
                    @Override
                    public void apply(String s, String s2) {
                        log.info("{} {}", s, s2);
                    }
                });
            })
            
    builder.build();
}

Upvotes: 0

Views: 2904

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191874

You need to get that statestore to iterate it

ReadOnlyKeyValueStore<String, String> keyValueStore = streams.store("user-store", QueryableStoreTypes.keyValueStore()));

String value = store.get("key");

KeyValueIterator<String, String> range = keyValueStore.all(); 

https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html

Or, maybe you should rather be doing a stream-table join between users and events

Upvotes: 1

Related Questions