rober710
rober710

Reputation: 385

Set pub-sub library http client's timeouts

I'm trying to test a service locally, which at startup creates some Pub-Sub topics in Google Cloud. I'm using my personal credentials to connect to Google's service (I'm not using the pub-sub emulator). However, it seems user credentials are very capped on the API usage allowed to them, and it looks like they are throttled a lot. The problem I'm having is that after a request is sent to create a topic, the application hangs for about 10 minutes, and it cannot be stopped (by pressing Ctrl+C in the terminal) and I have to kill it.

The warning that pops up when the application is starting up is:

Your application has authenticated using end user credentials from Google Cloud SDK. We recommend that most server applications use service accounts instead. If your application continues to use end user credentials from Cloud SDK, you might receive a "quota exceeded" or "API not enabled" error. For more information about service accounts, see https://cloud.google.com/docs/authentication/.

After the app is running for 10 minutes, I get the following error, which indicates the http client has a very long timeout and Google's API takes a very long time to respond:

Error creating PubSub topic: com.google.api.gax.rpc.UnavailableException: io.grpc.StatusRuntimeException: UNAVAILABLE: Credentials failed to obtain metadata
        at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:69)
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
        at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
        at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
        at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1083)
        at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
        at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1174)
        at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
        at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
        at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:563)
        at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:413)
        at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:721)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
        at java.base/java.lang.Thread.run(Thread.java:832)
        Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
                at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
                at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
                at com.google.cloud.pubsub.v1.TopicAdminClient.createTopic(TopicAdminClient.java:284)
                at com.google.cloud.pubsub.v1.TopicAdminClient.createTopic(TopicAdminClient.java:206)
                at com.myapp.integrations.common.pubsub.KenectPubSub.createPubSubTopic(KenectPubSub.java:104)
                at com.myapp.integrations.hub.config.BeanConfiguration_ProducerMethod_getPubSub_8c1027f866ef011e10384d59fcdcf03ffcde3048_ClientProxy.createPubSubTopic(BeanConfiguration_ProducerMethod_getPubSub_8c1027f866ef011e10384d59fcdcf03ffcde3048_ClientProxy.zig:358)
                at com.myapp.integrations.hub.router.IntegrationsRouterImpl.createNewTopic(IntegrationsRouterImpl.java:166)
                at com.myapp.integrations.hub.router.IntegrationsRouterImpl.lambda$init$0(IntegrationsRouterImpl.java:89)
                at java.base/java.lang.Iterable.forEach(Iterable.java:75)
                at com.myapp.integrations.hub.router.IntegrationsRouterImpl.init(IntegrationsRouterImpl.java:82)
                at com.myapp.integrations.hub.router.IntegrationsRouterImpl_Bean.create(IntegrationsRouterImpl_Bean.zig:242)
                at com.myapp.integrations.hub.router.IntegrationsRouterImpl_Bean.create(IntegrationsRouterImpl_Bean.zig:258)
                at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:96)
                at io.quarkus.arc.impl.AbstractSharedContext.access$000(AbstractSharedContext.java:14)
                at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:29)
                at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:26)
                at io.quarkus.arc.impl.LazyValue.get(LazyValue.java:26)
                at io.quarkus.arc.impl.ComputingCache.computeIfAbsent(ComputingCache.java:69)
                at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:26)
                at com.myapp.integrations.hub.router.IntegrationsRouterImpl_ClientProxy.arc$delegate(IntegrationsRouterImpl_ClientProxy.zig:92)
                at com.myapp.integrations.hub.router.IntegrationsRouterImpl_ClientProxy.arc_contextualInstance(IntegrationsRouterImpl_ClientProxy.zig:110)
                at com.myapp.integrations.hub.router.IntegrationsRouterImpl_Observer_Synthetic_d70cd75bf32ab6598217b9a64a8473d65e248c05.notify(IntegrationsRouterImpl_Observer_Synthetic_d70cd75bf32ab6598217b9a64a8473d65e248c05.zig:94)
                at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:282)
                at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:267)
                at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:69)
                at io.quarkus.arc.runtime.LifecycleEventRunner.fireStartupEvent(LifecycleEventRunner.java:23)
                at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:60)
                at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent-858218658.deploy_0(LifecycleEventsBuildStep$startupEvent-858218658.zig:81)
                at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent-858218658.deploy(LifecycleEventsBuildStep$startupEvent-858218658.zig:40)
                at io.quarkus.runner.ApplicationImpl.doStart(ApplicationImpl.zig:763)
                at io.quarkus.runtime.Application.start(Application.java:90)
                at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:95)
                at io.quarkus.runtime.Quarkus.run(Quarkus.java:62)
                at io.quarkus.runtime.Quarkus.run(Quarkus.java:38)
                at io.quarkus.runtime.Quarkus.run(Quarkus.java:104)
                at com.myapp.integrations.hub.Main.main(Main.java:9)
                at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.base/java.lang.reflect.Method.invoke(Method.java:564)
                at io.quarkus.runner.bootstrap.StartupActionImpl$3.run(StartupActionImpl.java:134)
                ... 1 more
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: Credentials failed to obtain metadata
        at io.grpc.Status.asRuntimeException(Status.java:533)
        ... 13 more
Caused by: com.google.api.client.http.HttpResponseException: 400 Bad Request
{
  "error": "invalid_grant",
  "error_description": "Bad Request"
}
        at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1113)
        at com.google.auth.oauth2.UserCredentials.refreshAccessToken(UserCredentials.java:203)
        at com.google.auth.oauth2.OAuth2Credentials.refresh(OAuth2Credentials.java:157)
        at com.google.auth.oauth2.OAuth2Credentials.getRequestMetadata(OAuth2Credentials.java:145)
        at com.google.auth.oauth2.UserCredentials.getRequestMetadata(UserCredentials.java:281)
        at com.google.auth.Credentials.blockingGetToCallback(Credentials.java:112)
        at com.google.auth.Credentials$1.run(Credentials.java:98)
        ... 6 more

I checked the docs and there's no indication on how to customize the timeout for the library's http client. Does anybody know how to do that? I'm using google-cloud-pubsub:1.108.3

Please note that this question is about being able to set the timeouts in the http client. I know that using a service account's credentials will solve the issue, but I'm interested in knowing if the library offers the option to change the http client's settings.

Upvotes: 0

Views: 665

Answers (1)

Malaman
Malaman

Reputation: 314

I don't see the setting of timeouts discussed anywhere in Cloud Pub/Sub docs [1,2]. However, you can manually adjust the TimeoutException in java utils. This way, you will be able to catch the timeout exceptions. Refer to [3] to see how you can use the timeout exception with awaitTermination.

The awaitTermination method is used to “Block until all work has completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first” [4]. Please notice that this is typically used to extend the timeout when waiting for messages using asynchronous subscriptions.

There are other options in order to configure single thread timeouts in Java but those are not commonly used on PubSub [5]. Moreover, as you stated, if those credentials are not properly configured, even if you extend the timeout, the request will not succeed.

[1] - https://cloud.google.com/pubsub/docs/

[2] - https://googleapis.dev/java/google-cloud-pubsub/1.108.3/index.html

[3] - https://cloud.google.com/pubsub/docs/quickstart-client-libraries#receive_messages

[4] - https://googleapis.dev/java/gax/latest/com/google/api/gax/core/BackgroundResource.html#awaitTermination-long-java.util.concurrent.TimeUnit-

[5] - Simple timeout in java

Upvotes: 1

Related Questions