ESala
ESala

Reputation: 7058

How to remember state with retry operators in RxJava2

I have a network client that is able to resume from interruptions, but needs the last message for doing so when there is a retry.

Example in Kotlin:

fun requestOrResume(last: Message? = null): Flowable<Message> =
    Flowable.create({ emitter ->
        val connection = if (last != null)
                             client.start()
                         else
                             client.resumeFrom(last.id)

        while (!emitter.isDisposed) {
            val msg = connection.nextMessage()
            emitter.onNext(msg)
        }
    }, BackpressureStrategy.MISSING)

requestOrResume()
    .retryWhen { it.flatMap { Flowable.timer(5, SECONDS) } }
    // how to pass the resume data when there is a retry?

Question: as you can see, I need the last received message in order to prepare the resume call. How can I keep track of it so that when there is a retry it is available to make the resume request?

One possible solution may be to create a holder class that just holds a reference to the last message and is updated when a new message is received. This way when there is a retry the last message can be obtained from the holder. Example:

class MsgHolder(var last: Message? = null)

fun request(): Flowable<Message> {
    val holder = MsgHolder()
    return Flowable.create({ emitter ->
        val connection = if (holder.last != null)
                             client.start()
                         else
                             client.resumeFrom(holder.last.id)

        while (!emitter.isDisposed) {
            val msg = connection.nextMessage()
            holder.last = msg // <-- update holder reference
            emitter.onNext(msg)
        }
    }, BackpressureStrategy.MISSING)
}

I think this might work, but it feels like a hack (thread synchronization issues?).

Is there a better way to keep track of the state so it is available for retries?

Upvotes: 0

Views: 117

Answers (2)

concat
concat

Reputation: 3197

Note that, unless you rethrow a wrapper around your last element (not too functionally different from your existing "hack"-ish solution but way uglier imo), no error handling operators can recover the last element without some outside help because they only get access to streams of Throwable. Instead, see if the following recursive approach suits your needs:

fun retryWithLast(seed: Flowable<Message>): Flowable<Message> {
  val last$ = seed.last().cache();
  return seed.onErrorResumeNext {
    it.flatMap {
      retryWithLast(last$.flatMap {
        requestOrResume(it)
      })
    }
  };
}
retryWithLast(requestOrResume());

The biggest distinction is caching the trailing value from the last attempt in an Observable with cache rather than doing so manually in a value. Note also that the recursion in the error handler means retryWithLast will continue to extend the stream if subsequent attempts continue failing.

Upvotes: 1

Panczur
Panczur

Reputation: 651

Take a close look to buffer() operator: link You could use it like this:

requestOrResume()
    .buffer(2)

From now, your Flowable will return List<Message> with two latests objects

Upvotes: 0

Related Questions