sha
sha

Reputation: 643

Apache beam KafkaIO commit offset behaviour

In KafkaIO.expand method, I noticed that the offsets are committed as a separated branched pipeline step. Won't it lead to dropped data? For example, the kafka records may be consumed by a downstream pipeline step halfway before crashing, but the branched commit step would have already committed the offset. So if the whole pipeline restarts, the consumer will not get that data again. Am I missing something?

Upvotes: 1

Views: 695

Answers (1)

Kenn Knowles
Kenn Knowles

Reputation: 6023

This depends on the Beam runner. I will discuss the ones I know about:

Cloud Dataflow checkpoints at each shuffle. So the use of Reshuffle.byRandomKey() just before committing the offset ensures that any data read from Kafka is durably persisted to Dataflow's internal storage before the offset is committed to Kafka.

The Beam Spark runner functions similarly by forcing a checkpoint after each shuffle. This is because Beam allows nondeterminism and other side effects by default, so re-executing stages can cause randomly generated keys to change. Without a checkpoint this would cause data corruption.

The use of "shuffle" as a way to checkpoint is a legacy approach from the early days of Beam. The new approach (in development) is to annotate when a transformation requires its input to be stable. Then a runner can checkpoint exactly when necessary and no more.

Upvotes: 0

Related Questions