Reputation: 1762
Rightnow, I'm trying to implement a queue-workers based design, where we receive millions of messages in queue. And workers are limited, so I'm using the following code assign work to workers. I'm using ExecutorService for the same:
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
while(LISTEN_FOR_MESSAGE_FLAG == true){
Message receivedMessage = sqsClient.receiveMessage(request);
if(receivedMessage == null){
Thread.sleep(5000); // sleep for 5 seconds
}
else {
// lock the message for a certain amount of time (60 secs).
// Other workers can't receive a message, when it is locked.
sqsClient.changeMessageVisibility(receivedMessage, 60);
pool.execute(new Task(receivedMessage); // process the message.
}
}
I'm currently using Amazon SQS for the queue. Above code has serious problem. It receives the messages every 5 seconds and lock them with a visibility timeout. This lock will be broken once the visibility timeout is gone. Which causes workers to hold messages which are NOT LOCKED anymore. Therefore, it has problems of duplicate processing. Note: Amazon SQS provides a way to extend the visibility timeout.
Please help, how can above code be written to handle this case.
Upvotes: 0
Views: 1144
Reputation: 11280
If you change
pool.execute(new Task(receivedMessage);
to :
Future<?> task = pool.submit(new Task(receivedMessage));
you can execute a helper task that extends the visibility time before time runs out. The helper task basically does a get() on the Future with a timeout shorter than the 60 seconds lock. and when a TimeoutException occurs, it extends the visibility timeout, and starts waiting on the Future again.
pool.execute(new Runnable() {
@Override
public void run() {
boolean done = false;
while (!done) {
try {
task.get(50, TimeUnit.SECONDS);
done = true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
done = true;
} catch (ExecutionException e) {
done = true;
// handle e.getCause()
} catch (TimeoutException e) {
// we need more time
sqsClient.changeMessageVisibility(receivedMessage, 60);
}
}
}
});
Upvotes: 1