Lisek
Lisek

Reputation: 793

Retry on the next Flux element and omit the successful ones

A little background.

I am trying to use reactive programming to be able to download file from the other service. The trick is that in case of connection failure or failed Flux element (anything) I would like to retry on the Flux a number of times but once being able to grasp on it I would like to resume without processing the elements from the very start.

What I mean is that, something goes wrong and I got only 56 elements from my Flux out of 100 possible (let's say it's an image in .jpg) because of the connection failure. Once I successfully retry I would like to resume on 57th element so I do not have to process it and perform GET from the start once again.

Here is how the normal retry looks like: Flux retry()

but what I would like to achieve is that on retry I would only have to get the red colored element (as I already have yellow and purple).

Just a sidenote, I would like to achieve the functionality as with HTTP range request headers where I can get bytes in specific range only and in case of failure I would be able to resume from the byte I want.

Is that even possible what I am trying to achieve? If so, what could be the possible course of action?

Upvotes: 1

Views: 868

Answers (1)

Simon Baslé
Simon Baslé

Reputation: 28351

You need to keep some state (the beginning of the range to request, at least) on a per-subscriber basis. That has to be done upstream of the retry, so that each retry re-evaluates the range. At the same time, the state should be atomically updatable AND visible downstream of the retry (for updating purposes). I'm assuming you're using WebClient:

  • a flatMap can be used to create a scope in which the range state is visible
  • in the lambda, an AtomicLong can be used as the state
  • again in the flatmap lambda, wrap the webclient call in a Flux.defer to ensure lazy creation of the request with re-evaluation of the state for generating the appropriate header (reading from the AtomicLong)
  • append retry after the defer
  • update the AtomicLong as needed once each piece is received and processed (eg. in a doOnNext)

Upvotes: 2

Related Questions