Reputation: 525
I'm trying to use Dataflow streaming processing to insert records into BigQuery using Python. Changed files in a storage bucket are read from PubSub then the files are read, transformed and inserted into BigQuery.
However when the pipeline gets to processing around 100 to 200 elements/sec I get errors like the below that I'm exceeding a rate limit and linking to this page. Sometimes the errors mention the tabledata.list
quota, which is 500/sec.
I don't understand why I'm seeing messages about these quotas at all though, as the streaming inserts quota for BigQuery is 1,000,000/sec.
> [while running 'generatedPtransform-52321']
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:332)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1350)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:152)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1073)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -52327: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 498, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1024, in process
schema)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1009, in _create_table_if_needed
additional_create_parameters=self.additional_bq_parameters)
File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/retry.py", line 226, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 712, in get_or_create_table
found_table = self.get_table(project_id, dataset_id, table_id)
File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/retry.py", line 226, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 480, in get_table
response = self.client.tables.Get(request)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 581, in Get
config, request, global_params=global_params)
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse
http_response, method_config=method_config, request=request)
apitools.base.py.exceptions.HttpForbiddenError: HttpError accessing <https://www.googleapis.com/bigquery/v2/projects/bought-by-many/datasets/mongo_landing_zone/tables/service_user_users_users?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Tue, 25 Feb 2020 16:49:25 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '403', 'content-length': '560', '-content-encoding': 'gzip'}>, content <{
"error": {
"code": 403,
"message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
"errors": [
{
"message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
"domain": "usageLimits",
"reason": "rateLimitExceeded"
}
],
"status": "PERMISSION_DENIED"
}
}
>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 167, in _execute
response = task()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 223, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 352, in do_instruction
request.instruction_id)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in process_bundle
data.transform_id].process_encoded(data.data)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 205, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 302, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 304, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 657, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 658, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 878, in apache_beam.runners.common.DoFnRunner.receive
File "apache_beam/runners/common.py", line 885, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 941, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 497, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1028, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 657, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 658, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 878, in apache_beam.runners.common.DoFnRunner.receive
File "apache_beam/runners/common.py", line 885, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 941, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 497, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1028, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 657, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 658, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 878, in apache_beam.runners.common.DoFnRunner.receive
File "apache_beam/runners/common.py", line 885, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 956, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 498, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1024, in process
schema)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1009, in _create_table_if_needed
additional_create_parameters=self.additional_bq_parameters)
File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/retry.py", line 226, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 712, in get_or_create_table
found_table = self.get_table(project_id, dataset_id, table_id)
File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/retry.py", line 226, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 480, in get_table
response = self.client.tables.Get(request)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 581, in Get
config, request, global_params=global_params)
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse
http_response, method_config=method_config, request=request)
RuntimeError: apitools.base.py.exceptions.HttpForbiddenError: HttpError accessing <https://www.googleapis.com/bigquery/v2/projects/bought-by-many/datasets/mongo_landing_zone/tables/service_user_users_users?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Tue, 25 Feb 2020 16:49:25 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '403', 'content-length': '560', '-content-encoding': 'gzip'}>, content <{
"error": {
"code": 403,
"message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
"errors": [
{
"message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
"domain": "usageLimits",
"reason": "rateLimitExceeded"
}
],
"status": "PERMISSION_DENIED"
}
}
The code I'm using is below:
files = (
p
| "read PubSub"
>> beam.io.ReadFromPubSub(
topic=known_args.input_topic, with_attributes=True, id_label=id_label
)
| "decode message" >> beam.Map(lambda pubsub_msg: json.loads(pubsub_msg.data))
| "filter buckets with unknown encodings"
>> beam.Filter(no_encoding_bucket_filter, encodings)
| "get file from bucket" >> beam.ParDo(GetFileFromBucket())
)
policies = (
files
| f"filter for policies"
>> beam.Filter(lambda msg: 'policies' in msg["bucket"])
| f"encode policies"
>> beam.Map(apply_encoding, encodings['policies'], 'policies')
| f"filter out policies that failed to encode"
>> beam.Filter(lambda item: True if item is not None else False)
| f"insert policies to BigQuery"
>> beam.io.WriteToBigQuery(
project=project_id,
table="service_policy_policies",
dataset="mongo_landing_zone",
insert_retry_strategy="RETRY_ON_TRANSIENT_ERROR",
)
)
beam.io.WriteToBigQuery()
does work with streaming data, but from the errors I suspect it's initialising or getting the BigQuery table as an object for every element thats processed, rather than just inserting a row. Am I using it in some incorrect way?
Update 2020-03-11
I managed to improve, but not resolve the situation. I switched from using beam.io.WriteToBigQuery
to writing a custom class called WriteToBigQueryCustom
to do the same thing. I still get errors but only at 500/sec throughput or higher now.
Updated code:
class WriteToBigQueryCustom(beam.DoFn):
"""
Stream insert records into a BigQuery table. Intended to work the same way you'd
expect beam.io.WriteToBigQuery to work for streaming.
Even though beam.io.WriteToBigQuery supports streaming, it seemed to be
initialising the BigQuery connection for every element processed. Was
getting throttled and causing errors about hitting BQ api limits at throughput of
100 elements/sec when the streaming inserts limit is 1,000,000/sec.
"""
def __init__(self, project_id, dataset, table_name):
self.project_id = project_id
self.dataset = dataset
self.table_name = table_name
self.table_id = f"{project_id}.{dataset}.{table_name}"
def start_bundle(self):
self.bq_client = bigquery.Client()
self.table = self.bq_client.get_table(self.table_id)
def process(self, dict_to_insert):
"""Insert a dict to the classes BigQuery table"""
errors = self.bq_client.insert_rows(self.table, [dict_to_insert])
if errors:
logging.error(
f"Hit error uploading row to bigquery table {self.table_id}: "
f"{errors}. Was trying to insert dict: {dict_to_insert}"
)
Upvotes: 4
Views: 1907
Reputation: 81
I ran into the same troubles you did, while running a similar pipeline. It seems there is some sort of bug in the Python/Beam SDK.
https://issues.apache.org/jira/browse/BEAM-6831
Adding create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER helped in my case.
Regards Michael
Upvotes: 3