Tomek
Tomek

Reputation: 13

How to implement repeat with Mono and state

I have a method that takes a list of items, makes a web request and then returns items that failed processing (some items may have been processed correctly, some items might have failed and only failed ones are returned or an empty list):

// returns list of of items that failed processing or empty list.
private List<String> sendsItems(List<String> items);

I want to use Mono and retry sending only failed items for up to N times. Using a blocking code, it would look like this:

public void sendWithRetries() {
    List<String> items = List.of("one", "two", "three");
    int retryNo = 0;
    while (!items.isEmpty() && retryNo < 3) {
        items = sendsItems(items);
        retryNo++;
    }
}

I have really hard time translating that to code using reactor. I've been thinking about using Mono.repeat or Mono.retry family of functions but I do not see any nice way of passing failed items back to sendItems except doing something ugly like:

    var items = new AtomicReference<>(List.of("one", "two", "three"));
    Mono.fromCallable(() -> {
                List<String> failedItems = sendsItems(items.get());
                items.set(failedItems);
                if (!failedItems.isEmpty()) {
                    throw new RuntimeException("retry me");
                }
                return Collections.emptyList();
            })
            .retry(3)

Is there a better way of doing that?

note: the real use case I need it for is kinesis put records api call. It returns info about which records failed to be processed and I want to retry sending only them.

Upvotes: 1

Views: 418

Answers (1)

Martin Tarj&#225;nyi
Martin Tarj&#225;nyi

Reputation: 9947

You can use expand operator to maintain state:

Mono.fromCallable(() -> sendsItems(new Attempt(List.of("one", "two", "three"), 0)))
    .expand(attempt -> {
        if (attempt.getItems().isEmpty() || attempt.getRetry() > 3) {
            return Mono.empty();
        } else {
            return Mono.fromCallable(() -> sendsItems(attempt));
        }
    });

private Attempt sendsItems(Attempt previousAttempt) {
    System.out.println(previousAttempt);
    // implement actual sending logic here, below is just some dummy
    List<String> failedItems = previousAttempt.getItems().subList(0, previousAttempt.getItems().size() - 1);
    return new Attempt(failedItems, previousAttempt.retry + 1);
}

@Value
static class Attempt {
    List<String> items;
    int retry;
}

Upvotes: 1

Related Questions