Reputation: 507
Currently I have a service which read events from a queue in SQS and process the found messages. I have a concurrency with 2 threads reading at the same time the queue to process the messages. The iteration is one second to validate if there are messages in the queue. This has increased the AWS costs a lot. Looking for a solution, Now I want to have a dynamic delay or something similar when the service read the messages from the queue. These are the steps I want to implement in my method
If there are not messages in the queue increase the delay to read again the queue in one second. For example, the service read the queue and there are not messages, the delay will be increased from 1 second to 2 seconds. It will be working in the same way until the delay was 60 seconds. In case of found a message the delay will be reset to 1 second and the validation will start again from 1 second to 60.
Basically I want to know if there are or not messages in the queue and inside of that validation increase the delay to read the queue.
This is the method I have created to read the messages from the queue.
private void createPollingStream() {
Multi.createBy().repeating()
.supplier(
() -> sqsMessagePoller.pollUDMUsages() //read messages from the queue. In case of not messages I want to increase the delay
.runSubscriptionOn(Infrastructure.getDefaultExecutor())
.onItem().transformToUniAndMerge(this::processMessagesQueue) //If the poll has messages we call to another method to process them.
.onFailure().invoke(failure ->
Log.errorf("Error processing the messages: %s", failure))
.subscribe()
.with(succ -> Log.info("Current iteration of processing message complete"),
failure -> Log.error("Failed to process message in flow", failure)
)
).withDelay(Duration.ofMillis(delay))//Initial delay of read messages from the queue. I guess it should be dynamic here.
.indefinitely()
.subscribe().with(x -> Log.info("Current iteration of processing message complete"),
failure -> Log.error("Failed to poll messages in flow", failure));
}
Upvotes: 0
Views: 51
Reputation: 1235
Your code is not going to work, because once the pipeline has been created, it’s not going reevualate itself, so delay is not going to work as you want.
However, you can introduce a dynamic delay by calling it inside .call() and do the delay there.
Can you try something like this:
public class QueuePollingService {
private final AtomicLong delay = new AtomicLong(1000);
private static final long MAX_DELAY = 60000;
private static final long INITIAL_DELAY = 1000;
private void startPolling() {
Multi.createBy().repeating()
.supplier(() -> pollQueue()
.call(() -> Uni.createFrom().nullItem().onItem().delayItBy(Duration.ofMillis(delay.get())))
)
.indefinitely()
.subscribe().with(
x -> Log.info("Polling iteration complete"),
failure -> Log.error("Polling stopped due to failure", failure)
);
}
private Uni<Void> pollQueue() {
return sqsMessagePoller.pollUDMUsages()
.toUni() // Convert Multi<List<Message>> to Uni<List<Message>>
.onItem().ifNull().continueWith(List.of()) // Ensure it’s never null
.runSubscriptionOn(Infrastructure.getDefaultExecutor()) // Ensure execution
.onItem().invoke(messages -> {
if (messages.isEmpty()) {
delay.updateAndGet(d -> Math.min(d * 2, MAX_DELAY)); // Increase delay
Log.infof("No messages found, increasing delay to %d ms", delay.get());
} else {
delay.set(INITIAL_DELAY); // Reset delay on message arrival
Log.infof("Messages found (%d), resetting delay to 1 second.", messages.size());
}
})
.onItem().transformToUniAndMerge(this::processMessagesQueue)
.onFailure().invoke(failure ->
Log.errorf("Error processing messages: %s", failure))
.replaceWithVoid();
}
}
Note: I’m on my phone, so Ive let this code generated by ChatGPT for me.
@Allanh, I’ve updated the answer, can you try it again ?
Upvotes: 0