Reputation: 172
We are using the Java(8) api of pubsub to connect, receive, process and acknowledge the message on a particular subscription.
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.108.0</version>
</dependency>
Typically when the system is under load, we are facing intermittent issues related to disconnection and the java application not receiving or processing and acknowledging the messages from the subscription. This is resulting in huge pile up of unacknowledged messages on the subscription resulting in loss of application availability.
Below is the code for the connection part, where we have also added the listeners to handle the terminated and failed states of a connection. This code is executed on application start up.
public void subscribeWithErrorListener(String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);
Builder builder = Subscriber.newBuilder(subscriptionName, receiver);
ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder().build();
Subscriber subscriber = builder.setExecutorProvider(executorProvider).build();
subscriber.addListener(new Subscriber.Listener() {
@Override
public void terminated(State from) {
log.log(Level.SEVERE, String.format("Subscriber stopped for: %s", subscriptionId));
if (from.toString().equals(Constants.STOPPING) && !executorProvider.getExecutor().isShutdown()) {
subscribeWithErrorListener(projectId, subscriptionId);
}
}
@Override
public void failed(State from, Throwable failure) {
log.log(Level.SEVERE, String.format("Subscriber failed for: %s", subscriptionId),
failure.getStackTrace());
if (!executorProvider.getExecutor().isShutdown()) {
subscribeWithErrorListener(projectId, subscriptionId);
}
}
}, MoreExecutors.directExecutor());
subscriber.startAsync().awaitRunning();
}
Inspite of this setting, there are errors in application intermittently where the connection is dropped and no processing happens. We see the below lines in application logs.## Heading ##
"stack_trace": "com.google.api.gax.rpc.UnavailableException: io.grpc.StatusRuntimeException: UNAVAILABLE: Abrupt GOAWAY closed sent stream. HTTP/2 error
code: NO_ERROR, debug data: max_age\n\tat com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:69)\n\tat
com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)\n\tat
com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)\n\tat
com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)\n\tat com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)\n\tat
com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1041)\n\tat
com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)\n\tat
com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1215)\n\tat
com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:983)\n\tat
com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:771)\n\tat
io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:563)\n\tat
io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)\n\tat
io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:617)\n\tat io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)\n\tat
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:803)\n\tat
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:782)\n\tat
io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)\n\tat io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)\n\tat
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tat
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tat
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by:
io.grpc.StatusRuntimeException: UNAVAILABLE: Abrupt GOAWAY closed sent stream. HTTP/2 error code: NO_ERROR, debug data: max_age\n\tat
io.grpc.Status.asRuntimeException(Status.java:533)\n\t... 14 common frames omitted\n",
Can someone provide any pointers on why this would be happening and what can be changed in the code to also listen to this type of failures and reestablish the connection automatically? Currently we have to restart the application so that the connection is established again on app start up and the messages are then processed/acknowledged.
Upvotes: 1
Views: 10321
Reputation: 1780
This error happens when there is a disconnection or retryable error in a request sent to the Pub/Sub server. The client library should recreate connections and retry requests. These errors are printed out if you have the log level set to FINE.
In conclusion, messages still continue to be sent to your MessageReceiver after this error occurs. Some errors that are not retryable by the client library itself happen when a subscription is not found, and it is propagated back to the caller.
You can see these options:
If you still have the same error, it could be a Java issue. You can find the corresponding grpc-java github issue here.
Upvotes: 3