Prasad
Prasad

Reputation: 103

Apache Beam KinesisIO Java - Consume the data in a kinesis stream from where it left

First I want to say that am totally new to Beam world. I'm working on an Apache Beam focused task and my main data source is a Kinesis stream. In there, when I consuming the streaming data, I noticed that the same set of data is coming when I restart the program(my consumer application). This is my code,

    String awsStreamName = KinesisStream.getProperty("stream.name");
    String awsAccessKey = KinesisStream.getProperty("access.key");
    String awsSecretKey = KinesisStream.getProperty("secret.key");
    String awsRegion = KinesisStream.getProperty("aws.region");
    Regions region = Regions.fromName(awsRegion);

    return KinesisIO.read()
            .withStreamName(awsStreamName)
            .withInitialPositionInStream(InitialPositionInStream.LATEST)
            .withAWSClientsProvider(awsAccessKey, awsSecretKey, region);

Simply what I want is, I need to start reading the data where I left reading. Really appreciate if someone can provide some resources as well.

Also I found a similar question but it did not help me - Apache Beam KinesisIO Java processing pipeline - application state, error handling & fault-tolerance?

Upvotes: 3

Views: 375

Answers (2)

Kenn Knowles
Kenn Knowles

Reputation: 6033

To start at a different position in the stream you can use either of these:

Upvotes: 1

Moritz
Moritz

Reputation: 935

UnboundedSources in Beam such as KinesisIO.read() support checkpointing using CheckpointMarks to resume from the latest checkpoint once restarting your application.

These checkpoints have to be persisted to durable storage. However, the specifics how this is done depend on the Beam runner your are using, e.g. Dataflow, Apache Flink or Apache Spark.

I recommend reading the documentation of your respective runtime on checkpointing and check the pipeline options of the corresponding Beam runner.

For example, in the case of Apache Flink you will have to enable checkpointing via checkpointingInterval (FlinkPipelineOptions) and additionally configure checkpointing in Flink.

Upvotes: 2

Related Questions