Michał
Michał

Reputation: 636

Is Google Dataflow retrying DoFns when RuntimeException appears?

We have a simple pipeline, where we transforming data from Unbounded data source.

In one step, when we enrich the data from the external service, sometimes RuntimeException is thrown ( it's because Dataflow is so fast ( :p ) and the external services are not aware of this particular data ). After 10s it will be aware, and RuntimeException will not be thrown.

In this in mind we switched completely from using failsafe and we try to rely on native Dataflow mechanism ( according to this: https://cloud.google.com/dataflow/pipelines/troubleshooting-your-pipeline#detecting-an-exception-in-worker-code )

But we've found out, that this is not really working. I mean, the bundle is not redelivered to DoFn, so that our sink does not have all the data that comes to our source.

Also, when running locally, this exception also quits the execution of the whole.

Is that a problem only with this particular type of exception (RuntimeException)? How to force Dataflow to reprocess bundle?

update

The DoFn in which the exception appears:

@DoFn.ProcessElement
public void processElement(ProcessContext c) {
    String txHash = c.element().getHash();
    try {
        LOG.info("TransformId: " + txHash);

        // here the RuntimeException is thrown
        throw new new RuntimeException

        }
    } catch (Exception e) {
        LOG.error("Exception during processing id: " + txHash, e);
        throw e;
    }
}

And logs:

2018-02-22 17:15:53.633 CET
Receiver: 00ff ( this is source, we are receiving id"
2018-02-22 17:15:53.634 CET
TransformId: 00ff ( beginning of the DoFn )
2018-02-22 17:15:53.634 CET
getTxRest invoked: 00ff ( the enriching service )
2018-02-22 17:15:53.638 CET
Exception during processing id: 00ff
2018-02-22 17:15:53.834 CET
Uncaught exception: ( and here are the details that the log name is: "xxx/logs dataflow.googleapis.com%2Fworker"  )

Why am I saying that this is not retried? Because this id 00ff does not exists in the log elsewhere.

Upvotes: 0

Views: 1097

Answers (1)

jkff
jkff

Reputation: 17913

There could be 2 reasons:

  • If getHash() is non-deterministic
  • If you're reading from a custom UnboundedSource that does not provide at-least-once reads. E.g., the source might not support acking at all, or might be incorrectly acking records immediately when they are received, rather than in finalizeCheckpoint().

The second is more likely in this case. When the bundle is retried, it retries reading from the source, and the source doesn't give this record back again.

If the source can not be fixed, as a workaround, you can pass the data from the source through Reshuffle.viaRandomKey() - that will effectively temporarily materialize it so retries will concern only the processing but not the reading, at the expense of small performance overhead.

Upvotes: 1

Related Questions