Reputation: 9961
I can deploy flow to standalone installation of Apache Flink (with one JobManager and several TaskManagers) without problem:
bin/flink run -m example-app-1.stag.local:6123 -d -p 4 my-flow-fat-jar.jar <flow parameters>
but when I run the same command and deploy to Standalone HA cluster this command raise error:
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds
at org.apache.flink.client.program.Client.runDetached(Client.java:406)
at org.apache.flink.client.program.Client.runDetached(Client.java:366)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75)
at org.apache.flink.client.program.Client.runDetached(Client.java:278)
at org.apache.flink.client.CliFrontend.executeProgramDetached(CliFrontend.java:844)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:330)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 milliseconds
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:221)
at org.apache.flink.client.program.Client.runDetached(Client.java:403)
... 7 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:218)
... 8 more
Active Job Manager write the following errors to log:
2016-04-14 13:54:44,160 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://[email protected]:62784] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2016-04-14 13:54:46,299 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(null,TriggerSavepoint(5de582462f334caee4733c60c6d69fd7)) because the expected leader session ID Some(72630119-fd0a-40e7-8372-45c93781e99f) did not equal the received leader session ID None.
So, I don't understand what can cause such error?
Let me known if required additional information.
P.S.
Deploy from Flink Dashboard works fine for Standalone HA cluster. Such problem appear when I deploy through Flink CLI only.
Update
I clear Zookeeper, clear directories used by Flink on disk and re-deploy Flink Standalone HA cluster. Then I try to run flow use bin/flink run
command. As you can see JobManager write only one line about problem (see flink--jobmanager-0-example-app-1.stag.local.log).
All JobManagers and TaskManagers use the same flink-conf.yaml
:
jobmanager.heap.mb: 1024
jobmanager.web.port: 8081
taskmanager.data.port: 6121
taskmanager.heap.mb: 2048
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
taskmanager.tmp.dirs: /flink/data/task_manager
blob.server.port: 6130
blob.storage.directory: /flink/data/blob_storage
parallelism.default: 4
state.backend: filesystem
state.backend.fs.checkpointdir: s3a://example-flink/checkpoints
restart-strategy: none
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 60s
recovery.mode: zookeeper
recovery.zookeeper.quorum: zookeeper-1.stag.local:2181,zookeeper-2.stag.local:2181,zookeeper-3.stag.local:2181
recovery.zookeeper.path.root: /example/flink
recovery.zookeeper.storageDir: s3a://example-flink/recovery
recovery.jobmanager.port: 6123
fs.hdfs.hadoopconf: /flink/conf
So, seems like Standalone HA cluster configured correctly.
Update 2
FYI: I want to install Standalone HA cluster as described here. Not YARN HA cluster.
Update 3
Here is log created by bin/flink
CLI: flink-username-client-hostname.local.log.
Upvotes: 1
Views: 1502
Reputation: 13346
When starting a Flink cluster in HA mode, the JobManager
address and its leader id are written to the specified ZooKeeper cluster. In order to communicate with the JobManager
you have not only to know the address but also its leader address. Therefore, you have to specify the following parameters in your 'flink-conf.yaml` which is read by the CLI.
recovery.mode: zookeeper
recovery.zookeeper.quorum: address of your cluster
recovery.zookeeper.path.root: ZK path you've started your cluster with
With this information the client knows where it can find the ZooKeeper cluster and where to find the JobManager
address and its leader id.
Upvotes: 3