Reputation: 2299
I'm using the Google Pub/Sub Java SDK to subscribe to a topic. What I want to do is the following:
I can't seem to find anything in the documentation and only. Maybe it's just not possible?
Here's how I start the subscriber:
// Create a subscriber bound to the asynchronous message receiver
subscriber = Subscriber.newBuilder(projectSubscriptionName, new PubSubRoeMessageReceiver()).build();
// Start subscriber
subscriber.startAsync().awaitRunning();
// Allow the subscriber to run indefinitely unless an unrecoverable error occurs.
subscriber.awaitTerminated();
And this is what my message receiver looks like:
public class PubSubRoeMessageReceiver implements MessageReceiver {
@Override
public void receiveMessage(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) {
// Acknowledge message
System.out.println("Acknowledge message");
ackReplyConsumer.ack();
// TODO: stop the subscriber
// TODO: run task X
// TODO: start the subscriber
}
}
Any ideas?
Upvotes: 2
Views: 2069
Reputation: 17251
Using Cloud Pub/Sub in this way is an anti-pattern and would cause issues. If you immediately ack the message after you receive it, but before you process it, what do you do if the subscriber crashes for some reason? Pub/Sub won't redeliver the message and therefore may never process it potentially.
Therefore, you probably want to wait to ack until after the message is processed. But then, you wouldn't be able to shut down the subscriber because the fact that the message is outstanding would be lost and therefore, the ack deadline would expire and the message would get redelivered.
If you want to ensure the client only receives one message at a time, you could use the FlowControlSettings on the client. If you set MaxOutstandingElementCount to 1, then only one message will be delivered to receiveMessage
at a time:
subscriber = Subscriber.newBuilder(projectSubscriptionName, new PubSubRoeMessageReceiver())
.setFlowControlSettings(FlowControlSettings.newBuilder()
.setMaxOutstandingRequestBytes(10L * 1024L * 1024L) // 10MB messages allowed.
.setMaxOutstandingElementCount(1L) // Only 1 outstanding message at a time.
.build())
.build();
Keep in mind that if you have a large backlog of small messages at the time you start up the subscriber and you intend to start up multiple subscribers, you may run into inefficient load balancing as explained in the documentation.
Upvotes: 3