Tobi
Tobi

Reputation: 936

Dataflow Batch Job fails with "Failed to close some writers"

I am running a batch pipeline with the Apache Beam 2.2 SDK via the Cloud Dataflow service. There are 751 text files that I parse using TextIO.readAll() transform, deserialize and write to a date partitioned table in BigQuery.

First thing I noticed is that autoscaling was not really kicking in and left the pipeline at 15 workers, even though I was able to push throughput a lot higher when for example manually setting the number of workers to 250.

My pipeline fails with the following stack trace:

(abed94a6f5139e21): java.io.IOException: Failed to close some writers
at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.finishBundle(WriteBundlesToFiles.java:248)
Suppressed: java.io.IOException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 503 Service Unavailable
Service Unavailable
    at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:431)
    at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java:289)
    at org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.close(TableRowWriter.java:81)
    at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.finishBundle(WriteBundlesToFiles.java:242)
    at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeFinishBundle(Unknown Source)
    at org.apache.beam.runners.core.SimpleDoFnRunner.finishBundle(SimpleDoFnRunner.java:187)
    at com.google.cloud.dataflow.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:407)
    at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:60)
    at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
    at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:330)
    at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:302)
    at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:251)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 503 Service Unavailable
Service Unavailable
    at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
    at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
    at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:432)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
    at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
    ... 4 more

Should I try with even more workers or split the work across several pipelines?

Upvotes: 2

Views: 1377

Answers (1)

Tobi
Tobi

Reputation: 936

Thanks to the comment by jkff it worked flawlessly - after setting --maxNumWorkers=250 (15 seems to be the standard maximum). The error was a transient error that Dataflow would retry several times and in the end, the pipeline ran successfully.

Upvotes: 2

Related Questions