tombrawz
tombrawz

Reputation: 137

Catch BigQuery HttpBadRequestError on Dataflow streaming pipeline

Recently, my Dataflow streaming job throw HttpBadRequestError from BigQuery API due to request size exceeded.

    Error message from worker: generic::unknown: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1246, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 514, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 520, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1288, in finish_bundle
    return self._flush_all_batches()
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1296, in _flush_all_batches
    *[
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1297, in <listcomp>
    self._flush_batch(destination)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1326, in _flush_batch
    passed, errors = self.bigquery_wrapper.insert_rows(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 984, in insert_rows
    result, errors = self._insert_all_rows(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/retry.py", line 236, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 539, in _insert_all_rows
    response = self.client.tabledata.InsertAll(request)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 761, in InsertAll
    return self._RunMethod(
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
    return self.ProcessHttpResponse(method_config, http_response, request)
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
    self.__ProcessHttpResponse(method_config, http_response, request))
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse
    raise exceptions.HttpError.FromResponse(
apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/my-projects/datasets/my-datasets/tables/my-tables/insertAll?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Sat, 20 Feb 2021 14:21:53 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '321', '-content-encoding': 'gzip'}>, content <{
  "error": {
    "code": 400,
    "message": "Request payload size exceeds the limit: 10485760 bytes.",
    "errors": [
      {
        "message": "Request payload size exceeds the limit: 10485760 bytes.",
        "domain": "global",
        "reason": "badRequest"
      }
    ],
    "status": "INVALID_ARGUMENT"
  }
}
>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 258, in _execute
    response = task()
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 315, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 483, in do_instruction
    return getattr(self, request_type)(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 519, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 990, in process_bundle
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 730, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 732, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 733, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/common.py", line 1267, in apache_beam.runners.common.DoFnRunner.finish
  File "apache_beam/runners/common.py", line 1248, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 1246, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 514, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 520, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1288, in finish_bundle
    return self._flush_all_batches()
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1296, in _flush_all_batches
    *[
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1297, in <listcomp>
    self._flush_batch(destination)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1326, in _flush_batch
    passed, errors = self.bigquery_wrapper.insert_rows(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 984, in insert_rows
    result, errors = self._insert_all_rows(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/retry.py", line 236, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 539, in _insert_all_rows
    response = self.client.tabledata.InsertAll(request)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 761, in InsertAll
    return self._RunMethod(
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
    return self.ProcessHttpResponse(method_config, http_response, request)
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
    self.__ProcessHttpResponse(method_config, http_response, request))
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse
    raise exceptions.HttpError.FromResponse(
RuntimeError: apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/my-projects/datasets/my-datasets/tables/my-tables/insertAll?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Sat, 20 Feb 2021 14:21:53 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '321', '-content-encoding': 'gzip'}>, content <{
  "error": {
    "code": 400,
    "message": "Request payload size exceeds the limit: 10485760 bytes.",
    "errors": [
      {
        "message": "Request payload size exceeds the limit: 10485760 bytes.",
        "domain": "global",
        "reason": "badRequest"
      }
    ],
    "status": "INVALID_ARGUMENT"
  }
}
> [while running 'WriteBqTables/WriteBQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)-ptransform-25875']

passed through:
==>
    dist_proc/dax/workflow/worker/fnapi_service.cc:631

I want to use this dead lettering pattern to mitigate such error in case it happened again.

Does the BQ dead lettering pattern also works when HttpBadRequestError occurs? or does it only works when inserting row failed due to schema mismatch? I use Apache Beam SDK version 2.27.0 for python

Thanks in advance

Updated 20201-02-24: I add more stack trace snippets that shows when the error occurs

Upvotes: 0

Views: 490

Answers (1)

Kenn Knowles
Kenn Knowles

Reputation: 6023

Yes, the pattern will work. In general it catches any failure that can be caught (sometimes things fail so badly that the processing stops entirely).

In your specific case, the stacktrace includes this region of BigQueryIO and you can see the failed rows output to the dead letter PCollection just below, here.

Upvotes: 1

Related Questions