Kolban
Kolban

Reputation: 15266

Handling RuntimeException errors in a BigQuery pipeline

When we use a BigQueryIO transform to insert rows, we have an option called:

.withCreateDisposition(CreateDisposition.CREATE_NEVER)

which instructs the pipeline to NOT attempt to create the table if the table doesn't already exist. In my scenario, I want to trap all errors. I attempted to use the following:

var write=mypipline.apply("Write table", BigQueryIO
    .<Employee>write()
    .to(targetTableName_notpresent)
    .withExtendedErrorInfo()
    .withFormatFunction(new EmployeeToTableRow())
    .withSchema(schema)
    .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
    .withTableDescription("My Test Table")
    .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
    .withCreateDisposition(CreateDisposition.CREATE_NEVER)
    .withWriteDisposition(WriteDisposition.WRITE_APPEND));

which tried to insert rows into a non-existent table. What I found was a RuntimeException. Where I am stuck is that I don't know how to handle RuntimeException problems. I don't believe there is anything here I can surround with a try/catch.

This question is similar to this one:

Is it possible to catch a missing dataset java.lang.RuntimeException in a Google Cloud Dataflow pipeline that writes from Pub/Sub to BigQuery?

but I don't think that got a working answer and was focused on a missing Dataset and not a table.

My exception from the fragment above is:

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
POST https://bigquery.googleapis.com/bigquery/v2/projects/XXXX/datasets/jupyter/tables/not_here/insertAll?prettyPrint=false
{
  "code" : 404,
  "errors" : [ {
    "domain" : "global",
    "message" : "Not found: Table XXXX:jupyter.not_here",
    "reason" : "notFound"
  } ],
  "message" : "Not found: Table XXXX:jupyter.not_here",
  "status" : "NOT_FOUND"
}
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
    at .(#126:1)

Upvotes: 0

Views: 710

Answers (1)

Mazlum Tosun
Mazlum Tosun

Reputation: 6582

You can't add a try/catch directly from the BigQueryIO in the Beam job, if the destination table doesn't exist.

I think it's better to delegate this responsability outside of Beam or launch the job only if your table exists.

Usually a tool like Terraform has the responsability to create infrastructure, before to deploy resources and run Beam jobs.

If it's mandatory for you to check the existence of the table, you can create :

  • A Shell script with bq and gcloud cli to check the existence before to launch the job
  • A Python script to check the existence before to launch the job

Python script :

For Python there is the BigQuery Python client :

from google.cloud import bigquery
from google.cloud.exceptions import NotFound

client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to determine existence.
# table_id = "your-project.your_dataset.your_table"

try:
    client.get_table(table_id)  # Make an API request.
    print("Table {} already exists.".format(table_id))
except NotFound:
    print("Table {} is not found.".format(table_id))

BQ Shell script :

bq show <project_id>:<dataset_id>.<table_id>

If the table doesn't exist, catch the error and do not start the Dataflow job.

Upvotes: 2

Related Questions