Neel
Neel

Reputation: 1

Apache Beam KinesisIO Java processing pipeline - application state, error handling & fault-tolerance?

I'm working on my first Apache beam pipeline to process the data streams from AWS Kinesis. I'm familiar with concepts of Kafka on how it handles the consumers' offset/state and have experience in implementing apache storm/spark processing.

After going through the documentation I was successful in creating a working beam pipeline using KinesisIO Java SDK to listen to AWS Kinesis data streams to transform and print the messages. However, would like to know any reference implementation or pointer on how the below areas are handled in apache beam w.r.t. KinesisIO -

  1. How a consumer application is uniquely identified in Kinesis streams (similar to consumer group Id in Kafka) - Am I right to say that it's based on the apache beam's application name and any consumer which uses KCL tracks its state in DynamoDB; is it true always & with apache beam KinesisIO as well?

  2. How to enforce a consumer to start processing the data streams w.r.t. its shards from where it is left off earlier i.e. in case of consumers is restarted or any error exception in processing (similar to offset management w.r.t. each consumer groupId in Kakfa). InitialPositionInStream.TRIM_HORIZON is always starting from the earliest available data stream even if I restart the pipeline after processing a handful of data from Kinesis streams.

  3. How the ack works in Kinesis data streams i.e. how the consumer acks/updates the checkpoint that data streams pulled using getRecords() are processed before incrementing the sequence/position in the shards further? is there any way to control these behavior in consumer application on when to successfully ack the message to save the application state & start from these positions whenever the consumer is restarted?

  4. Impact of business exception (in any of stages in pipeline) while processing data streams on subsequent data pull from Kinesis streams i.e. whether application continues to pull the data or halt the process.

Upvotes: 0

Views: 298

Answers (1)

Alexey Romanenko
Alexey Romanenko

Reputation: 1443

  1. KinesisIO.Read utilise AWS SDK under the hood to read from Kinesis and it retrieves periodically the updates of Shard Iterator to fetch records from Kinesis shard.

  2. Did you try ShardIteratorType#LATEST for that?

  3. See my answer here: https://stackoverflow.com/a/62349838/10687325

  4. If it's unknown exception then the pipeline will be stopped.

Upvotes: 0

Related Questions