Reputation: 13154
I need a, one would think, relatively simple mechanism of an expiring cache (say 1 minute).
Now, I wanted to do this in a single rxjava chain. .replay(1, 1, MINUTE)
looked perfect until I learned, that after one minute passed, the source observable is not resubscribed. I never get anything from that observable again. I probably need something that merges replay()
with repeatWhen{}
but cannot quite find it. I tried really exotic combination and none worked for my test cases.
Upvotes: 2
Views: 371
Reputation: 815
I think this code (the interval of time is here 5 SECONDS instead of 1 MINUTE. It was easier for the test) should do the trick:
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss").withZone(ZoneId.systemDefault());
AtomicInteger subscriberCounter = new AtomicInteger(0);
PublishProcessor<String> repeater = PublishProcessor.create();
Flowable<String> flowable =
Flowable.defer(() -> Flowable.just(apiCall()))
.repeatWhen(handler -> repeater
.skip(5, SECONDS) // don't trigger a repeat for any new subscription between 0 and 5 seconds
.throttleFirst(5, SECONDS) // trigger a repeat and ignore any new notification during 5 sec
)
.replay(1)
.autoConnect()
.doOnSubscribe(s -> {
System.out.println(formatter.format(now())
+ " -----> new subscription of subscriber #"
+ subscriberCounter.incrementAndGet());
repeater.onNext("whatever"); // notify a new subscription to the repeat handler
});
flowable.subscribe(s ->System.out.println("subscriber #1 receives: " + s));
Thread.sleep(3000);
flowable.subscribe(s -> System.out.println("subscriber #2 receives: " + s));
Thread.sleep(4000);
flowable.subscribe(s -> System.out.println("subscriber #3 receives: " + s));
Thread.sleep(100);
flowable.subscribe(s -> System.out.println("subscriber #4 receives: " + s));
Thread.sleep(6000);
flowable.subscribe(s -> System.out.println("subscriber #5 receives: " + s));
Thread.sleep(1000);
flowable.subscribe(s -> System.out.println("subscriber #6 receives: " + s));
Thread.sleep(6000);
flowable.subscribe(s -> System.out.println("subscriber #7 receives: " + s));
Flowable.timer(60, SECONDS) // Just to block the main thread for a while
.blockingSubscribe();
This gives:
16:54:54 -----> new subscription of subscriber #1
subscriber #1 receives: API call #0
16:54:57 -----> new subscription of subscriber #2
subscriber #2 receives: API call #0
16:55:01 -----> new subscription of subscriber #3
subscriber #1 receives: API call #1
subscriber #2 receives: API call #1
subscriber #3 receives: API call #1
16:55:01 -----> new subscription of subscriber #4
subscriber #4 receives: API call #1
16:55:07 -----> new subscription of subscriber #5
subscriber #1 receives: API call #2
subscriber #2 receives: API call #2
subscriber #3 receives: API call #2
subscriber #4 receives: API call #2
subscriber #5 receives: API call #2
16:55:08 -----> new subscription of subscriber #6
subscriber #6 receives: API call #2
16:55:14 -----> new subscription of subscriber #7
subscriber #1 receives: API call #3
subscriber #2 receives: API call #3
subscriber #3 receives: API call #3
subscriber #4 receives: API call #3
subscriber #5 receives: API call #3
subscriber #6 receives: API call #3
subscriber #7 receives: API call #3
May be we can do better, but I have no other idea right now.
Upvotes: 0
Reputation: 400
It might not be the best solution, but I would try to do it this way:
public final class SimpleCacheSingle<T : Any> constructor(
val apiRequest: (value: String, callback: (T) -> Unit) -> Unit
) {
private var lastTimeSeconds = 0L
private lateinit var cachedValue: T
fun getSingle(): Single<T> = Single.create { emitter ->
if (System.currentTimeMillis() / 1000 - lastTimeSeconds > 60) {
apiRequest("example argument") {
cachedValue = it
lastTimeSeconds = System.currentTimeMillis() / 1000
emitter.onSuccess(cachedValue)
}
} else {
emitter.onSuccess(cachedValue)
}
}
}
Just create an instance of it, and use getSingle() to create single for each subscriber.
Of course "apiRequest" in attached code snippet, needs to be modified to meet your needs.
Edit: Please note that when you subscribe before previous api call has finished, you will have two or more pending API requests instead of one. You will have to modify the code so that there will be only one request at a time.
Upvotes: 3
Reputation: 1517
You can use the below operators
Upvotes: -1