Reputation: 83
I am planning to migrate one of our Spark application to Apache Flink. I am trying to understand its fault tolerance feature.
I executed following code, I do not see that Flink actually tries to retry any task(or subtask). This can cause a data loss for me. What should I do to make sure that every failures covered by Flink?
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new FsStateBackend("file:///my-path", false))
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(0, TimeUnit.SECONDS) // delay
))
env.enableCheckpointing(10L)
val text = env.socketTextStream(hostName, port)
text
.map { input =>
List(input)
}.setParallelism(1)
.flatMap { input =>
println(s"running for $input")
List.fill(5)(input.head) ::: {
println("throw exception here")
throw new RuntimeException("some exception")
List("c")
}
}
I expect to see throw exception here
message couple of times on the screen. However, when I use fixedDelayRestart
, it looks like it just ignores this message and continue for others.
Upvotes: 3
Views: 881
Reputation: 18987
It depends how you start the application.
I assume that you are running this out of your IDE. In that case StreamExecutionEnvironment.getExecutionEnvironment
returns a LocalStreamExecutionEnvironment
which runs the program and all of Flink in a single process, i.e., master (in Flink JobManager) and worker (TaskManager) are started as threads in the same JVM process. The exception terminates this single process. Hence, there is no Flink process left that could restart the program.
If you want to run the program with fault tolerance, you need to submit it to a Flink environment, for example one that runs on your local machine. Download the Flink distribution, extract the archive file, and run ./bin/start-cluster.sh
. This will start two processes, a master and a worker process. You can then submit the program to the cluster by creating a remote execution environment with StreamExecutionEnvironment.createRemoteEnvironment
and passing hostname and port as parameters (please check the documentation for details).
Note that the exception will still kill the worker process. So in order to be able to restart the program, you'll need to manually start a worker process. In a production environment, this is typically taken care of by Kubernetes, Yarn, or Mesos.
By the way, we recently added an operations playground to the Flink documentation. It's a Docker-based sandbox environment to play around with Flink's fault-tolerance features. I recommend to check it out: Flink Operations Playground.
Some more hints:
Upvotes: 1