Reputation: 69
I am reading from an unbound source (Kafka) and writing its wordcount to other Kafka topic. Now I want to perform checkpoint in beam Pipeline. I have followed all the instructions in the apache beam documentation but checkpoint directory is not created even after that.
below are the parameters for I used for the pipeline:-
--runner=FlinkRunner
--streaming=true
--parallelism=2
--checkpointingInterval=1000
--checkpointTimeoutMillis=5000
--minPauseBetweenCheckpoints=500
--externalizedCheckpointsEnabled=true
--retainExternalizedCheckpointsOnCancellation=true
can anyone help me out with checkpointing?
Upvotes: 4
Views: 1850
Reputation: 19
I know it is old, but want to agree with your answer. we built a dockerized flink in 2019 and beam and running with these options
--runner=FlinkRunner --streaming=true --checkpointingInterval=30000 --env=dev
and we have configured in conf.yml with rocksdb as backend.
Upvotes: 0
Reputation: 69
I have worked on the solution, so one is you can change the checkpoint.state.dir path in flink-conf.yaml of link cluster and other is by using flinkPipelineOptions-
@Description(
"Sets the state backend factory to use in streaming mode. "
+ "Defaults to the flink cluster's state.backend configuration.")
Class<? extends FlinkStateBackendFactory> getStateBackendFactory();
void setStateBackendFactory(Class<? extends FlinkStateBackendFactory> stateBackendFactory);
and by setting setStateBackendFactory (i have done using custom class)
static class bakend implements FlinkStateBackendFactory{
@Override
public StateBackend createStateBackend(FlinkPipelineOptions options) {
return new FsStateBackend("file:///Users/myPc/word-count-beam/src/checkpoint/");
}
}
this will create a checkpointDir also you also need to set a value of checkpointinginterval for checkpointing to be enabled.
Upvotes: 0