Reputation: 4709
In my business application I have to batch-process all the messages from a topic periodically because it is cheaper than processing them in a first-come-first-serve fashion. The current way I am planning to do it is have a cronjob
that runs the subscriber every T
hours. The problem that I am currently solving is how to terminate the subscriber once all the messages have been processed. I want to fire up the cronjob
every T
hours, let the subscriber consume all the messages in the topic-queue and terminate. From what I understand, there is no pub-sub
Java API that tells me whether the topic-queue is empty or not. I have come up with the following 2 solutions:
Create a subscriber that pulls asynchronously. Sleep for t minutes
while it consumes all the messages and then terminate it using subscriber.stopAsync().awaitTerminated();
. In this approach, there is a possibility I might not consume all the messages before terminating the subscriber. A google example here
Use Pub/Sub Cloud monitoring
to find the value of the metric subscription/num_undelivered_messages
. Then pull that many messages using the synchronous pull example provided by Google here. Then terminate the Subscriber.
Is there a better way to do this?
Thanks!
Upvotes: 2
Views: 3672
Reputation: 75940
I have done this same implementation in Go some month ago. My assumption was the following:
Thereby, I implement this: * Each time that I received a message, * I suspend the 100ms timeout * I process and ack the message * I reset to 0 the 100ms timeout * If the 100ms timeout is fired, I terminate my pull subscription
In my use case, I schedule my processing each 10 minutes. So, I set a global timeout at 9m30 to finish the processing and let the new app instance to continue the processing
Just a tricky thing: For the 1st message, set the timeout to 2s. Indeed, the first message message takes longer to come because of connexion establishment. Thus set a flag when you init your timeout "is the first message or not".
I can share my Go code if it can help you for your implementation.
EDIT
Here my Go code about the message handling
func (pubSubService *pubSubService) Received() (msgArray []*pubsub.Message, err error) {
ctx := context.Background()
cctx, cancel := context.WithCancel(ctx)
// Connect to PubSub
client, err := pubsub.NewClient(cctx, pubSubService.projectId)
if err != nil {
log.Fatalf("Impossible to connect to pubsub client for project %s", pubSubService.projectId)
}
// Put all the message in a array. It will be processed at the end (stored to BQ, as is)
msgArray = []*pubsub.Message{}
// Channel to receive messages
var receivedMessage = make(chan *pubsub.Message)
// Handler to receive message (through the channel) or cancel the the context if the timeout is reached
go func() {
//Initial timeout because the first received is longer than this others.
timeOut := time.Duration(3000)
for {
select {
case msg := <-receivedMessage:
//After the first receive, the timeout is changed
timeOut = pubSubService.waitTimeOutInMillis // Environment variable = 200
msgArray = append(msgArray, msg)
case <-time.After(timeOut * time.Millisecond):
log.Debug("Cancel by timeout")
cancel()
return
}
}
}()
// Global timeout
go func(){
timeOut = pubSubService.globalWaitTimeOutInMillis // Environment variable = 750
time.Sleep(timeOut * time.Second):
log.Debug("Cancel by global timeout")
cancel()
return
}
// Connect to the subscription and pull it util the channel is canceled
sub := client.Subscription(pubSubService.subscriptionName)
err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
receivedMessage <- msg
msg.Ack()
})
}
Upvotes: 4
Reputation: 17251
It might be worth considering whether or not Cloud Pub/Sub is the right technology to use for this case. If you want to do batch processing, you might be better off storing the data in Google Cloud Storage or in a database. Cloud Pub/Sub is really best for continuous pulling/processing of messages.
The two suggestions you have are trying to determine when there are no more messages to process. There isn't really a clean way to do this. Your first suggestion is possible, though keep in mind that while most messages will be delivered extremely quickly, there can be outliers that take longer to be sent to your subscriber. If it is critical that all outstanding messages be processed, then this approach may not work. However, if it is okay for messages to occasionally be processed the next time you start up your subscriber, then you could use this approach. It would be best to set up a timer since the last time you received a message as guillaum blaquiere suggests, though I would use a timeout on the order of 1 minute and not 100ms.
Your second suggestion of monitoring the number of undelivered messages and then sending a pull request to retrieve that many messages would not be as viable an approach. First of all, the max_messages
property of a pull request does not guarantee that all available messages up to max_messages
will be returned. It is possible to get zero messages back in a pull response and still have undelivered messages. Therefore, you'd have to keep the count of messages received and try to match the num_undelivered_messages
metric. You'd have to account for duplicate delivery in this scenario and for the fact that the Stackdriver monitoring metrics can lag behind the actual values. If the value is too large, you may be pulling trying to get messages you won't receive. If the value is too small, you may not get all of the messages.
Of the two approaches, the one that tracks how long since the last message has been received is the better one, but with the caveats mentioned.
Upvotes: 5