Reputation: 5276
I'm converting my Tasks from AppEngine TaskQueues to Google Cloud Tasks.
The one having problems is an hourly cron job that checks a S3 Bucket for new files. The cron job launches a new task per file found. Those tasks then download their respective files and launch a new task per record in their file.
It it during this fan-out that some of the calls to create_task()
seem to fail with ServiceUnavailable: 503 (https://googleapis.dev/python/cloudtasks/latest/gapic/v2/api.html#google.cloud.tasks_v2.CloudTasksClient.create_task)
Heres one
Traceback (most recent call last):
...
File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/src/utils/gc_tasks.py", line 72, in _gc_create_task
_ = _tasks_client.create_task(parent=_queue_path(DEFAULT_QUEUE), task=task)
File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/cloud/tasks_v2/gapic/cloud_tasks_client.py", line 1512, in create_task
request, retry=retry, timeout=timeout, metadata=metadata
File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/gapic_v1/method.py", line 143, in __call__
return wrapped_func(*args, **kwargs)
File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/retry.py", line 273, in retry_wrapped_func
on_error=on_error,
File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/retry.py", line 182, in retry_target
return target()
File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/timeout.py", line 214, in func_with_timeout
return func(*args, **kwargs)
File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable
six.raise_from(exceptions.from_grpc_error(exc), exc)
File "/base/alloc/tmpfs/dynamic_runtimes/python27g/ebb3af67a06047b6/python27/python27_lib/versions/third_party/six-1.12.0/six/__init__.py", line 737, in raise_from
raise value
ServiceUnavailable: 503 {
"created":"@1583436423.131570193",
"description":"Delayed close due to in-progress write",
"file":"third_party/apphosting/python/grpcio/v1_0_0/src/core/ext/transport/chttp2/transport/chttp2_transport.c",
"file_line":412,
"grpc_status":14,
"referenced_errors":[{
"created":"@1583436423.131561040",
"description":"OS Error",
"errno":32,
"file":"third_party/apphosting/python/grpcio/v1_0_0/src/core/lib/iomgr/tcp_posix.c",
"file_line":393,
"os_error":"Broken pipe",
"syscall":"sendmsg"}
]}
Here's another
Traceback (most recent call last):
...
File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/src/utils/pt_gc_tasks.py", line 72, in _gc_create_task
_ = _tasks_client.create_task(parent=_queue_path(DEFAULT_QUEUE), task=task)
File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/cloud/tasks_v2/gapic/cloud_tasks_client.py", line 1512, in create_task
request, retry=retry, timeout=timeout, metadata=metadata
File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/gapic_v1/method.py", line 143, in __call__
return wrapped_func(*args, **kwargs)
File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/retry.py", line 273, in retry_wrapped_func
on_error=on_error,
File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/retry.py", line 182, in retry_target
return target()
File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/timeout.py", line 214, in func_with_timeout
return func(*args, **kwargs)
File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable
six.raise_from(exceptions.from_grpc_error(exc), exc)
File "/base/alloc/tmpfs/dynamic_runtimes/python27g/ebb3af67a06047b6/python27/python27_lib/versions/third_party/six-1.12.0/six/__init__.py", line 737, in raise_from
raise value
ServiceUnavailable: 503 {
"created":"@1583407622.505288938",
"description":"Endpoint read failed",
"file":"third_party/apphosting/python/grpcio/v1_0_0/src/core/ext/transport/chttp2/transport/chttp2_transport.c",
"file_line":1807,
"grpc_status":14,
"occurred_during_write":0,
"referenced_errors":[{
"created":"@1583407622.505108366",
"description":"Secure read failed",
"file":"third_party/apphosting/python/grpcio/v1_0_0/src/core/lib/security/transport/secure_endpoint.c",
"file_line":158,
"referenced_errors":[{
"created":"@1583407622.505106550",
"description":"Socket closed",
"file":"third_party/apphosting/python/grpcio/v1_0_0/src/core/lib/iomgr/tcp_posix.c",
"file_line":259}
]}
]}
Am I enqueuing too many tasks at the same time? What can I do to deal with this?
Upvotes: 2
Views: 2422
Reputation: 5276
After quite a bit of digging, it seems that "503 Service Unavailable" is a pretty common error across the google-cloud SDKs for all of GCPs services.
The solution is to enable retry logic. google-cloud-core
(which google-cloud-tasks
depends on) has an existing retry mechanism, but it wasn't configured for task creation.
retry_codes_name
was set to non_idempotent
instead of idempotent
"CreateTask": {
"timeout_millis": 10000,
"retry_codes_name": "non_idempotent",
"retry_params_name": "default",
},
My guess is that this could cause duplicate tasks to get enqueued. But if you specify a task name, google-cloud-tasks
should prevent those duplicates from getting enqueued.
So I passed a Retry
object to .create_task()
without providing an arg for predicate
, which causes it to default to if_transient_error()
which will retry for the following errors: exceptions.InternalServerError
, exceptions.TooManyRequests
, exceptions.ServiceUnavailable
Below is a snippet of my code to create tasks
from google.api_core import retry
from google.api_core.exceptions import AlreadyExists
from google.cloud import tasks
_tasks_client = tasks.CloudTasksClient()
def my_create_task_function(my_queue_path, task_object):
try:
_tasks_client.create_task(
parent=my_queue_path,
task=task_object,
retry=retry.Retry( # Copies the default retry config from retry_params in google.cloud.tasks_v2.gapic.cloud_tasks_client_config
initial=.1,
maximum=60,
multiplier=1.3,
deadline=600))
except AlreadyExists:
logging.warn("found existing task")
There is also a logger available that you can adjust the level of so that you can see log statements for when it actually does a retry.
If you do the following:
logging.getLogger('google.api_core.retry').setLevel(logging.DEBUG)
You should then see messages like this in your logs when it kicks in:
Upvotes: 5
Reputation: 64
The HTTP Error 503. The service is unavailable occurs if the Application Pool of the corresponding Wep Application is Stopped or Disabled or Paused. or The given user Identity of Application Pool may be invalid due to expired password or locked.
Upvotes: 0
Reputation: 5829
Both of the errors you shared appear to have different causes from the text in their descriptions, but both could indeed be linked to a overload of tasks in your queue.
What you could do to workaround that is to set some rate limits to lower the load, or you could set retry parameters, since apparently it only occurs to few tasks. Either way you choose to go you can find how to's in the Cloud Task Configuring Queue Documentation.
Upvotes: 1