Reputation: 33
I'm trying create a Savepoint with Apache Flink 1.2 in HDFS. I'm Running Flink in a local cluster on my machine. HDFS is running in a virtual machine. I managed to write to HDFS within the Flink Streaming Job, but the savepoint won't do that. My savepoint path is hdfs://hadoop:54310/savepoint/testpoint
which I specified in the UI before submitting the task.
It gives me following error message: (Invalid Path)
org.apache.flink.client.program.ProgramInvocationException: Failed to submit the job to the job manager
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleJsonRequest(JarRunHandler.java:64)
at org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler.handleRequest(AbstractJsonRequestHandler.java:41)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:98)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:90)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:44)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:159)
at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: JobSubmission failed: Invalid path 'hdfs://hadoop:54310/savepoint/testpoint'.
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:453)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleJsonRequest(JarRunHandler.java:62)
... 34 more
Caused by: java.lang.IllegalArgumentException: Invalid path 'hdfs://hadoop:54310/savepoint/testpoint'.
at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.createFsInputStream(SavepointStore.java:182)
at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepoint(SavepointStore.java:131)
at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:64)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1359)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1341)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1341)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Configuration inside the job:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(60*1000)
// Setting up State Backend
// hdfs://hadoop:54310/checkpoint
env.setStateBackend(new FsStateBackend(lioncubConf.hdfsCheckpoint))
// Tries 3 times in 10 Seconds and waits for 5 Min
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)))
I don't exactly know what I'm doing wrong. Inside the job I managed to write to HDFS (without having a savepoint). Thus HDFS can't be the problem. I also tried to define a directory hdfs://hadoop:54310/savepoint
which also doesn't work. Any ideas what's wrong with that path?
Upvotes: 1
Views: 1041
Reputation: 13346
The problem is that you're trying to submit a job with a savepoint path which does not exist. Please check whether you have taken a savepoint which is stored under hdfs://hadoop:54310/savepoint/testpoint
.
In order to trigger a savepoint you have to use the CLI and call bin/flink savepoint :jobId [:targetDirectory]
where the targetDirectory
is an optional parameter. For more information see the savepoint guide.
Upvotes: 1