Reputation: 103
First I want to say that am totally new to Beam world. I'm working on an Apache Beam focused task and my main data source is a Kinesis stream. In there, when I consuming the streaming data, I noticed that the same set of data is coming when I restart the program(my consumer application). This is my code,
String awsStreamName = KinesisStream.getProperty("stream.name");
String awsAccessKey = KinesisStream.getProperty("access.key");
String awsSecretKey = KinesisStream.getProperty("secret.key");
String awsRegion = KinesisStream.getProperty("aws.region");
Regions region = Regions.fromName(awsRegion);
return KinesisIO.read()
.withStreamName(awsStreamName)
.withInitialPositionInStream(InitialPositionInStream.LATEST)
.withAWSClientsProvider(awsAccessKey, awsSecretKey, region);
Simply what I want is, I need to start reading the data where I left reading. Really appreciate if someone can provide some resources as well.
Also I found a similar question but it did not help me - Apache Beam KinesisIO Java processing pipeline - application state, error handling & fault-tolerance?
Upvotes: 3
Views: 375
Reputation: 6033
To start at a different position in the stream you can use either of these:
Upvotes: 1
Reputation: 935
UnboundedSource
s in Beam such as KinesisIO.read()
support checkpointing using CheckpointMark
s to resume from the latest checkpoint once restarting your application.
These checkpoints have to be persisted to durable storage. However, the specifics how this is done depend on the Beam runner your are using, e.g. Dataflow, Apache Flink or Apache Spark.
I recommend reading the documentation of your respective runtime on checkpointing and check the pipeline options of the corresponding Beam runner.
For example, in the case of Apache Flink you will have to enable checkpointing via checkpointingInterval
(FlinkPipelineOptions) and additionally configure checkpointing in Flink.
Upvotes: 2