NaHeon
NaHeon

Reputation: 247

Apache Beam: Why does it write to Spanner twice on REPORT_FAILURES mode?

I found interesting write operation codes while looking at SpannerIO, and want to understand reasons.

On write(WriteToSpannerFn) and REPORT_FAILURES failure mode, it seems trying to write failed mutations twice.
I think it's for logging each mutation's exceptions. Is it a correct assumption, and is there any workaround?
Below, I removed some lines for simplicity.

public void processElement(ProcessContext c) {
  Iterable<MutationGroup> mutations = c.element();
  boolean tryIndividual = false;

  try {
    Iterable<Mutation> batch = Iterables.concat(mutations);
    spannerAccessor.getDatabaseClient().writeAtLeastOnce(batch);
  } catch (SpannerException e) {
    if (failureMode == FailureMode.REPORT_FAILURES) {
      tryIndividual = true;
    } else {
      ...
    }
  }
  if (tryIndividual) {
    for (MutationGroup mg : mutations) {
      try {
        spannerAccessor.getDatabaseClient().writeAtLeastOnce(mg);
      } catch (SpannerException e) {
        LOG.warn("Failed to submit the mutation group", e);
        c.output(failedTag, mg);
      }
    }
  }
}

Upvotes: 1

Views: 367

Answers (1)

RedPandaCurios
RedPandaCurios

Reputation: 2324

So rather than write each Mutation individually to the database, the SpannerIO.write() connector tries to write a batch of Mutations in a single transaction for efficiency.

If just one of these Mutations in the batch fails, then the whole transaction fails, so in REPORT_FAILURES mode, the mutations are re-tried individually to find which Mutation(s) are the problematic ones...

Upvotes: 2

Related Questions