Mikey
Mikey

Reputation: 430

Gcloud PubSub Java implementation - java.util.concurrent.RejectedExecutionException

I use the sample snippet from GCloud documentation to receive msg as a subscriber. My pubsub gcloud jar version is 0.19.0-alpha

The problem is that I can receive the msg with attribute map but I keep having this exception:

2017-07-12 16:52:25,219 [grpc-default-worker-ELG-1-16] WARN  io.netty.util.concurrent.DefaultPromise - An exception was thrown by io.grpc.netty.NettyClientHandler$3.operationComplete()
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@fbf4a6d rejected from java.util.concurrent.ScheduledThreadPoolExecutor@25cbe860[Terminated, pool size = 35, active threads = 0, queued tasks = 0, completed tasks = 2403]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
    at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
    at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
    at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
    at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:110)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.onReady(ClientCallImpl.java:573)
    at io.grpc.internal.DelayedStream$DelayedStreamListener.onReady(DelayedStream.java:398)
    at io.grpc.internal.AbstractStream2$TransportState.notifyIfReady(AbstractStream2.java:305)
    at io.grpc.internal.AbstractStream2$TransportState.onStreamAllocated(AbstractStream2.java:248)
    at io.grpc.netty.NettyClientStream$TransportState.setHttp2Stream(NettyClientStream.java:227)
    at io.grpc.netty.NettyClientHandler$3.operationComplete(NettyClientHandler.java:429)
    at io.grpc.netty.NettyClientHandler$3.operationComplete(NettyClientHandler.java:417)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
    at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)

After that, the program shuts and stop listening and getting msg. How to resolve this interruption and I even get rid of finally clause that has subscriber.stopAsync().

Upvotes: 3

Views: 1464

Answers (1)

mannuscript
mannuscript

Reputation: 4941

There is a bug in the snippet provided by them. You need to call get() on the messaegeIdFuture. Following code resolves the issue:

Publisher publisher = null;
String projectId = ServiceOptions.getDefaultProjectId();
ProjectTopicName topic = ProjectTopicName.of(projectId, "test");
ApiFuture<String> messageIdFuture = null;
try {
       publisher = Publisher.newBuilder(topic).build();
       ByteString data = ByteString.copyFromUtf8("my-message");
       PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
       messageIdFuture = publisher.publish(pubsubMessage);
    } catch (IOException e) {
       e.printStackTrace();
    } finally {
        messageIdFuture.get();    //This resolves this issue.
        // Wait on any pending requests
        if (publisher != null) {
            publisher.shutdown();
            //publisher.awaitTermination(1, TimeUnit.SECONDS);
        }
    }

Upvotes: 2

Related Questions