Anouer Hermassi
Anouer Hermassi

Reputation: 151

(Kafka) Streams - foreach inside foreach

I have a piece of code that iterates over a KStream and checks if a condition is met. When it is the case, it calls another method to do some other processing. The code looks like this:

stream1.foreach((k, v) -> {
        if (someCondition) {
            System.out.println("Triggered Join");
            joinStreams();
        }
    }
});

Now, the body of joinStreams() method looks like the following (just for testing purposes).

private static void joinStreams() {
    System.out.println("Started Join");
    stream2.foreach((k, v) -> System.out.println("OK"));
}

When joinStreams() is called, it only prints "Started Join" and hangs for ever. When I call it straightforward from main(), it prints "Started Join" followed by as many "OK" as there are messages in the stream (which is its normal behavior). My question is: what might cause this weird result ? P.S: As I did understand, the problem is with the foreach (from joinStreams()) inside a foreach (of stream1).

Upvotes: 0

Views: 2821

Answers (1)

Frederic A.
Frederic A.

Reputation: 3514

You just can't do that.

By calling things like stream1.foreach, you are building a stream topology. Once the whole topology is built, you start it with something like new KafkaStreams(topology, streamingConfig).start().

You have to understand that the body of foreach will only be called when your stream processing topology is executing (being run). As a consequence, with your code, the call to stream2.foreach - meant to build the topology - happens after the topology was built and started, this makes no sense.

Upvotes: 3

Related Questions