eagerbeaver
eagerbeaver

Reputation: 137

Is it possible to catch a missing dataset java.lang.RuntimeException in a Google Cloud Dataflow pipeline that writes from Pub/Sub to BigQuery?

I am trying to handle errors in which my Dataflow job attempts to dynamically write to BigQuery table destinations.

I would like to catch the following exception:

java.lang.RuntimeException: unable to obtain dataset for dataset example_dataset in project example_project

in order to create the dataset and then retry writing to BigQuery.

Is it possible to catch exceptions in this manner and if so, do you know where I would need to add the try/catch logic in my code?

Upvotes: 2

Views: 604

Answers (2)

miles212
miles212

Reputation: 383

You can't handle this scenario using try-catch block, as it's an internal BQ api error. Rather I would suggest you to write Retry Transient policy and set the error type. This way you can store the BigQuery write error result in PCollection and then dump that record as your wish. Please refer the below snippet to achieve the same.

WriteResult result = formattedData.get(successRows).setCoder(TableRowJsonCoder.of()).apply("BQ SteamingInserts",
                BigQueryIO.writeTableRows().withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                        .to("audit.db_audit")
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                        .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()).withoutValidation()
                        .withExtendedErrorInfo());

Using above code snippet if something fails because of ddl ops, the data will store in WriteResult.

PCollection<String> failedInserts = result.getFailedInsertsWithErr().apply("BQErrorToTableRow",
                ParDo.of(new BQErrorToString()));

You can get the failed record using the above code snippet. Let me know if that helps :)

Upvotes: 2

aga
aga

Reputation: 3893

Nonexistent BigQuery datasets and/or tables, will be retried indefinitely and may cause a stuck pipeline. BigQueryIO doesn't have a configurable option to automatically create nonexistent BigQuery datasets, it only has an option to create nonexistent BigQuery tables, but the dataset resource that is specified must exist or be created before writing to table code is called.

I also found in the Beam documentation that concludes that

the dataset being written to must already exist

Please, refer to official documentation and see how Java exceptions are handled in Cloud Dataflow and see examples.

The Dataflow service retries failed tasks up to 4 times in batch mode, and an unlimited number of times in streaming mode. In batch mode, your job will fail and in streaming, it may stall indefinitely.

I hope it helps.

Upvotes: 1

Related Questions