Michał Klimczak
Michał Klimczak

Reputation: 13154

RxJava operator like replay(1, 1, MINUTE) but which resubscribes after 1 minute passed

I need a, one would think, relatively simple mechanism of an expiring cache (say 1 minute).

  1. When first subscriber subscribes, I want to make API call.
  2. When second one subscribes within minute, don't want to make API call but push downstream the previously loaded value.
  3. When another one subscribes after a minute from the first one, I want the API call to be made again.

Now, I wanted to do this in a single rxjava chain. .replay(1, 1, MINUTE) looked perfect until I learned, that after one minute passed, the source observable is not resubscribed. I never get anything from that observable again. I probably need something that merges replay() with repeatWhen{} but cannot quite find it. I tried really exotic combination and none worked for my test cases.

Upvotes: 2

Views: 371

Answers (3)

ctranxuan
ctranxuan

Reputation: 815

I think this code (the interval of time is here 5 SECONDS instead of 1 MINUTE. It was easier for the test) should do the trick:

DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss").withZone(ZoneId.systemDefault());
AtomicInteger subscriberCounter = new AtomicInteger(0);
PublishProcessor<String> repeater = PublishProcessor.create();

Flowable<String> flowable =
    Flowable.defer(() -> Flowable.just(apiCall()))
            .repeatWhen(handler -> repeater
                                     .skip(5, SECONDS) // don't trigger a repeat for any new subscription between 0 and 5 seconds
                                     .throttleFirst(5, SECONDS) // trigger a repeat and ignore any new notification during 5 sec
            )
            .replay(1)
            .autoConnect()
            .doOnSubscribe(s -> {
                System.out.println(formatter.format(now())
                                    + " -----> new subscription of subscriber #"
                                    + subscriberCounter.incrementAndGet());
                repeater.onNext("whatever"); // notify a new subscription to the repeat handler
            });

flowable.subscribe(s ->System.out.println("subscriber #1 receives: " + s));
Thread.sleep(3000);
flowable.subscribe(s -> System.out.println("subscriber #2 receives: " + s));
Thread.sleep(4000);
flowable.subscribe(s -> System.out.println("subscriber #3 receives: " + s));
Thread.sleep(100);
flowable.subscribe(s -> System.out.println("subscriber #4 receives: " + s));
Thread.sleep(6000);
flowable.subscribe(s -> System.out.println("subscriber #5 receives: " + s));
Thread.sleep(1000);
flowable.subscribe(s -> System.out.println("subscriber #6 receives: " + s));
Thread.sleep(6000);
flowable.subscribe(s -> System.out.println("subscriber #7 receives: " + s));

Flowable.timer(60, SECONDS) // Just to block the main thread for a while
        .blockingSubscribe();

This gives:

16:54:54 -----> new subscription of subscriber #1
subscriber #1 receives: API call #0
16:54:57 -----> new subscription of subscriber #2
subscriber #2 receives: API call #0
16:55:01 -----> new subscription of subscriber #3
subscriber #1 receives: API call #1
subscriber #2 receives: API call #1
subscriber #3 receives: API call #1
16:55:01 -----> new subscription of subscriber #4
subscriber #4 receives: API call #1
16:55:07 -----> new subscription of subscriber #5
subscriber #1 receives: API call #2
subscriber #2 receives: API call #2
subscriber #3 receives: API call #2
subscriber #4 receives: API call #2
subscriber #5 receives: API call #2
16:55:08 -----> new subscription of subscriber #6
subscriber #6 receives: API call #2
16:55:14 -----> new subscription of subscriber #7
subscriber #1 receives: API call #3
subscriber #2 receives: API call #3
subscriber #3 receives: API call #3
subscriber #4 receives: API call #3
subscriber #5 receives: API call #3
subscriber #6 receives: API call #3
subscriber #7 receives: API call #3

May be we can do better, but I have no other idea right now.

Upvotes: 0

Dominik Setniewski
Dominik Setniewski

Reputation: 400

It might not be the best solution, but I would try to do it this way:

    public final class SimpleCacheSingle<T : Any> constructor(
     val apiRequest: (value: String, callback: (T) -> Unit) -> Unit
 ) {
     private var lastTimeSeconds = 0L
     private lateinit var cachedValue: T

     fun getSingle(): Single<T> = Single.create { emitter ->
         if (System.currentTimeMillis() / 1000 - lastTimeSeconds > 60) {
             apiRequest("example argument") {
                 cachedValue = it
                 lastTimeSeconds = System.currentTimeMillis() / 1000
                 emitter.onSuccess(cachedValue)
             }
         } else {
             emitter.onSuccess(cachedValue)
         }
     }
 }

Just create an instance of it, and use getSingle() to create single for each subscriber.

Of course "apiRequest" in attached code snippet, needs to be modified to meet your needs.

Edit: Please note that when you subscribe before previous api call has finished, you will have two or more pending API requests instead of one. You will have to modify the code so that there will be only one request at a time.

Upvotes: 3

Mayuri Khinvasara
Mayuri Khinvasara

Reputation: 1517

You can use the below operators

  1. .retryWhen() operator to resubscribe.
  2. .delay() operator to delay the subscriber : The Delay operator modifies its source Observable by pausing for a particular increment of time (that you specify) before emitting each of the source Observable’s items. This has the effect of shifting the entire sequence of items emitted by the Observable forward in time by that specified increment.

Upvotes: -1

Related Questions