Christos Hadjinikolis
Christos Hadjinikolis

Reputation: 2158

Flink JobExecutionException: akka.client.timeout

I am using Flink v.1.4.0.

I am trying to run a job using the DataSet API through IntelliJ. Note that If I run the same job through the Flink UI the job runs fine. In order to run the job, I need to first specify through environment variables the amount of data that will be processed. When the amount is relatively small, the job runs fine. But as it gets bigger I am beginning to get the following error:

ERROR StatusLogger Log4j2 could not find a logging implementation. Please add log4j-core to the classpath. Using SimpleLogger to log to the console...
31107 [main] ERROR com.company.someLib.SomeClass - Error executing pipeline
org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300)
at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:565)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:539)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:193)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at com.ubs.digital.comms.graph.emailanalyser.EmailAnalyserPipeline.lambda$runPipeline$1(EmailAnalyserPipeline.java:120)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at com.ubs.digital.comms.graph.emailanalyser.EmailAnalyserPipeline.runPipeline(EmailAnalyserPipeline.java:87)
at com.ubs.digital.comms.graph.emailanalyser.EmailAnalyserPipeline.main(EmailAnalyserPipeline.java:65)
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.

I can see that the advice is:

You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.

but I suspect that the problem goes deeper than that. But in order to get there I need to first configure akka.client.timeout. How do I do this in IntelliJ? and how long should the timeout be?

Furthermore, what's really causing this? Do I need to increase my heap memory or something? Thanks.

Upvotes: 2

Views: 2318

Answers (2)

Christos Hadjinikolis
Christos Hadjinikolis

Reputation: 2158

I was able to figure it out and it was not so difficult either. All I had to do was go to Run > Edit Configurations and under the Configucation tab in the Program arguments field, add the following:

-Dakka.client.timeout:600s
-Dakka.ask.timeout:600s

I should note, however, that this did not solve the problem I was having altogether.

Upvotes: 5

Richard Deurwaarder
Richard Deurwaarder

Reputation: 2040

You can set this property via the flink configuration file. See https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#distributed-coordination-via-akka

So in flink-conf.yaml you would add for example:

akka.client.timeout: 10min

But it seems like the data is being processed in the wrong place. Do you perhaps load the data in a constructor rather than in a map or run function?

Upvotes: 0

Related Questions