zavalit
zavalit

Reputation: 307

Run Flink with parallelism more than 1

may be i'm just missing smth, but i just have no more ideas where to look.

i read messages from 2 sources, make a join based on a common key and sink it all to kafka.

val env = StreamExecutionEnvironment.getExecutionEnvironment 
env.setParallelism(3) 
... 
source1 
 .keyBy(_.searchId) 
 .connect(source2.keyBy(_.searchId)) 
 .process(new SearchResultsJoinFunction) 
 .addSink(KafkaSink.sink) 

so it perfectly works when i launch it locally and it also works on cluster with Parallelism set to 1, but with 3 not any more.

When i deploy it to 1 job manager and 3 taskmanagers and get every Task in "RUNNING" state, after 2 minutes (when nothing is comming to sink) one of the taskmanagers gets the following log:
https://gist.github.com/zavalit/1b1bf6621bed2a3848a05c1ef84c689c#file-gistfile1-txt-L108

and the whole thing just shuts down.

i'll appreciate any hint. tnx, in an advance.

Upvotes: 0

Views: 647

Answers (1)

David Anderson
David Anderson

Reputation: 43409

The problem appears to be that this task manager -- flink-taskmanager-12-2qvcd (10.81.53.209) -- is unable to talk to at least one of the other task managers, namely flink-taskmanager-12-57jzd (10.81.40.124:46240). This is why the job never really starts to run.

I would check in the logs for this other task manager to see what it says, and I would also review your network configuration. Perhaps a firewall is getting in the way?

Upvotes: 1

Related Questions