Reputation: 307
may be i'm just missing smth, but i just have no more ideas where to look.
i read messages from 2 sources, make a join based on a common key and sink it all to kafka.
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
...
source1
.keyBy(_.searchId)
.connect(source2.keyBy(_.searchId))
.process(new SearchResultsJoinFunction)
.addSink(KafkaSink.sink)
so it perfectly works when i launch it locally and it also works on cluster with Parallelism set to 1, but with 3 not any more.
When i deploy it to 1 job manager and 3 taskmanagers and get every Task in "RUNNING" state, after 2
minutes (when nothing is comming to sink) one of the taskmanagers gets the following log:
https://gist.github.com/zavalit/1b1bf6621bed2a3848a05c1ef84c689c#file-gistfile1-txt-L108
and the whole thing just shuts down.
i'll appreciate any hint. tnx, in an advance.
Upvotes: 0
Views: 647
Reputation: 43409
The problem appears to be that this task manager -- flink-taskmanager-12-2qvcd (10.81.53.209) -- is unable to talk to at least one of the other task managers, namely flink-taskmanager-12-57jzd (10.81.40.124:46240). This is why the job never really starts to run.
I would check in the logs for this other task manager to see what it says, and I would also review your network configuration. Perhaps a firewall is getting in the way?
Upvotes: 1