Reputation: 6085
I have a sample streaming WordCount
example written in Flink (Scala). In it, I want to use externalized checkpointing to restore in case of failure. But it is not working as expected.
My code is as follows:
object WordCount {
def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment
.getExecutionEnvironment
.setStateBackend(new RocksDBStateBackend("file:///path/to/checkpoint", true))
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)
// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)
// prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined.
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// prepare Kafka consumer properties
val kafkaConsumerProperties = new Properties
kafkaConsumerProperties.setProperty("zookeeper.connect", "localhost:2181")
kafkaConsumerProperties.setProperty("group.id", "flink")
kafkaConsumerProperties.setProperty("bootstrap.servers", "localhost:9092")
// set up Kafka Consumer
val kafkaConsumer = new FlinkKafkaConsumer[String]("input", new SimpleStringSchema, kafkaConsumerProperties)
println("Executing WordCount example.")
// get text from Kafka
val text = env.addSource(kafkaConsumer)
val counts: DataStream[(String, Int)] = text
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map((_, 1))
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ((in._1, c), Some(c + in._2))
case None => ((in._1, 1), Some(in._2 + 1))
})
// emit result
println("Printing result to stdout.")
counts.print()
// execute program
env.execute("Streaming WordCount")
}
}
The output I get after running program first time is:
(hi, 1)
(hi, 2)
The output I get after running program second time is:
(hi, 1)
My expectation is that running program second time should give me the following output:
(hi, 3)
Since I am a newbie to Apache Flink, I don't know how to achieve the expected result. Can anyone help me achieve the correct behavior?
Upvotes: 1
Views: 1490
Reputation: 582
I faced same issue earlier, but I was able to make it work using MiniCluster. As mentioned here - http://mail-archives.apache.org/mod_mbox/flink-user/201702.mbox/%3CCAO_f5ND=0f+uBbReSfThMBi-bnY4BjGBozo3fzEsZujiovb_-g@mail.gmail.com%3E
I did not find lot of documentation about MiniCluster in the doc, so not sure it is recommended way or not.
On full restart of the job I had to write a small piece of code to identify latest checkpoint stored under checkpoint directory (/jobId/chk-*) which had _metadata directory. Then used streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(s)
to restore the state from this checkpoint.
Upvotes: 2
Reputation: 18987
Flink only restarts from the latest checkpoint if the application is restarted within the same execution (regular, automatic recovery).
If you cancel a job running in a local exeuction environment in the IDE, you kill the whole cluster and the job cannot be automatically recovered. Instead you need to start it again. In order to restart a new job from a savepoint (or externalized checkpoint), you need to provide a path to the persisted savepoint/checkpoint. Not sure if that is possible with a local execution environment.
IMO it is easier to play around with checkpointing and recovery on a local Flink instance and not within an IDE.
Upvotes: 5