Reputation: 151
I have a piece of code that iterates over a KStream and checks if a condition is met. When it is the case, it calls another method to do some other processing. The code looks like this:
stream1.foreach((k, v) -> {
if (someCondition) {
System.out.println("Triggered Join");
joinStreams();
}
}
});
Now, the body of joinStreams()
method looks like the following (just for testing purposes).
private static void joinStreams() {
System.out.println("Started Join");
stream2.foreach((k, v) -> System.out.println("OK"));
}
When joinStreams()
is called, it only prints "Started Join" and hangs for ever. When I call it straightforward from main()
, it prints "Started Join" followed by as many "OK" as there are messages in the stream (which is its normal behavior).
My question is: what might cause this weird result ?
P.S: As I did understand, the problem is with the foreach (from joinStreams()
) inside a foreach (of stream1
).
Upvotes: 0
Views: 2821
Reputation: 3514
You just can't do that.
By calling things like stream1.foreach
, you are building a stream topology. Once the whole topology is built, you start it with something like new KafkaStreams(topology, streamingConfig).start()
.
You have to understand that the body of foreach
will only be called when your stream processing topology is executing (being run). As a consequence, with your code, the call to stream2.foreach
- meant to build the topology - happens after the topology was built and started, this makes no sense.
Upvotes: 3