Reputation: 532
I have a pipeline running on Dataflow that ingests files containing several thousand records. These files arrive at a steady frequency, which are processed by a stateful ParDo with timers that attempts to throttle the rate of ingest by batching and holding these files until the timer fires, before being expanded into individual record elements via a file processing ParDo, and finally written to BigQuery destinations.
On occasion, either an intermittent event such as an OOM event or autoscaling events, I have seen Dataflow attempting to emit the files in the stateful ParDo after the event resolves, causing duplicate record elements downstream when the file processing ParDo reprocesses the files. I understand that bundles are retried if there is a failure, but do they account for duplicates?
How/What is exactly-once processing achieving in this context, especially with regard to the State/Timer API, since I am seeing duplicates at my destination?
Upvotes: 0
Views: 664
Reputation: 5104
Dataflow achieves exactly once processing by ensuring that data produced from failing workers is not passed downstream (or, more precisely, if work is retried only one successful result is consumed downstream). For example, if stage A of your pipeline is producing elements and stage B is counting them, and workers in stage A fail and are re-tried, duplicate elements will not be counted by stage B (though of course stage B might itself have to be retried). This also applies to state and timers--a given bundle of work is either committed in its entirety (i.e. the set of inputs are marked as consumed, and the set of outputs committed atomically with the consumption/setting of state and timers) or entirely discarded (state/timers is left unconsumed/untouched and the retry will not not be influenced by what happened before.)
What is not exactly once is interactions with external systems (due to the possibility of retries). These are instead at least once, and so to guarantee correctness all such interactions should be idempotent. Sinks often achieve this by assigning a unique id such that multiple writes can be deduplicated in the downstream system. For files, one can write to temporary files, and then rename the "winning" set of shards to the final destination after a barrier. It's not clear from your question what files you're emitting (or ingesting) but hopefully this should be helpful in understanding how the system works.
More specifically, say the initial state is {state: A, timers: [X, Y], inputs: [i, j, k]}. Suppose further that when processing the bundle (these timers and inputs) the state is updated to B, we emit elements m, and n downstream, and we set a timer W.
If the bundle succeeds, the new state will be {state: B, timers: [W], inputs: []} and the elements [m, n] are guaranteed to be passed downstream. Furthermore, any competing retry of this bundle would always fail.
On the other hand, if the bundle fails (even if it "emitted" some of the elements or tried to update the state) the resulting state of the system will be {state: A, timers: [X, Y], inputs: [i, j, k]} for a fresh retry and nothing that was emitted from this failed bundle will be observed downstream.
Another way to look at it is that the set {inputs consumed, timers consumed, state modifications, timers set, outputs to produce downstream} is written to the backing "database" in a single transaction. Only a single successful attempt is ever committed, failed attempts are discarded.
More details can be found at https://beam.apache.org/documentation/runtime/model/
Upvotes: 1