Reputation: 2914
Assume that I have a fetcher that fetches an image from a given link on a separate thread. The image will then be cached in memory. Once the image already gets cached, the fetcher won't re-fetch the link. The fetcher is considered as an Observable. There may be many subscribers that ask the fetcher for the image. After the first-ever subscriber subscribe the fetcher, the fetcher will shoot network. However, if there's a 2nd subscriber comes to subscribe then, the fetcher shouldn't shoot yet another request while it's already been fetching one before. After that, if the fetch finishes, both subscribers will get the image. Right now, if there's a 3rd subscriber that comes, the fetcher will emit the image right away.
How can I implement the scenario above with RxJava approach? What I expect is to utilise some sort of existing operators, compose them in ways much more declarative, and most importantly, to avoid the overhead of synchronised, lock, and atomic stuff.
Upvotes: 0
Views: 2193
Reputation: 2264
Have a look at ConnectableObservable
and the .replay()
method.
I'm currently using this is my fragments to handle orientation changes:
Fragment's onCreate:
ConnectableObservable<MyThing> connectableObservable =
retrofitService.fetchMyThing()
.map(...)
.replay();
connectableObservable.connect(); // this starts the actual network call
Fragment's onCreateView:
Subscription subscription = connectableObservable
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mything -> dosomething());
What happens is that I make 1 network request only, and any subscriber will (eventually/immediately) get that response.
Upvotes: 0
Reputation: 69997
This can be accomplished via ConcurrentMap and AsyncSubject:
import java.awt.image.BufferedImage;
import java.io.*;
import java.net.URL;
import java.util.concurrent.*;
import javax.imageio.ImageIO;
import rx.*;
import rx.Scheduler.Worker;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
public class ObservableImageCache {
final ConcurrentMap<String, AsyncSubject<BufferedImage>> image =
new ConcurrentHashMap<>();
public Observable<BufferedImage> get(String url) {
AsyncSubject<BufferedImage> result = image.get(url);
if (result == null) {
result = AsyncSubject.create();
AsyncSubject<BufferedImage> existing = image.putIfAbsent(url, result);
if (existing == null) {
System.out.println("Debug: Downloading " + url);
AsyncSubject<BufferedImage> a = result;
Worker w = Schedulers.io().createWorker();
w.schedule(() -> {
try {
Thread.sleep(500); // for demo
URL u = new URL(url);
try (InputStream openStream = u.openStream()) {
a.onNext(ImageIO.read(openStream));
}
a.onCompleted();
} catch (IOException | InterruptedException ex) {
a.onError(ex);
} finally {
w.unsubscribe();
}
});
} else {
result = existing;
}
}
return result;
}
public static void main(String[] args) throws Exception {
ObservableImageCache cache = new ObservableImageCache();
CountDownLatch cdl = new CountDownLatch(4);
Observable<BufferedImage> img1 = cache.get("https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/create.png");
System.out.println("Subscribing for IMG1");
img1.subscribe(e -> System.out.println("IMG1: " + e.getWidth() + "x" + e.getHeight()), Throwable::printStackTrace, cdl::countDown);
Thread.sleep(500);
Observable<BufferedImage> img2 = cache.get("https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/create.png");
System.out.println("Subscribing for IMG2");
img2.subscribe(e -> System.out.println("IMG2: " + e.getWidth() + "x" + e.getHeight()), Throwable::printStackTrace, cdl::countDown);
Observable<BufferedImage> img3 = cache.get("https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/amb.png");
Observable<BufferedImage> img4 = cache.get("https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/amb.png");
Thread.sleep(500);
System.out.println("Subscribing for IMG3");
img3.subscribe(e -> System.out.println("IMG3: " + e.getWidth() + "x" + e.getHeight()), Throwable::printStackTrace, cdl::countDown);
Thread.sleep(1000);
System.out.println("-> Should be immediate: ");
System.out.println("Subscribing for IMG4");
img4.subscribe(e -> System.out.println("IMG4: " + e.getWidth() + "x" + e.getHeight()), Throwable::printStackTrace, cdl::countDown);
cdl.await();
}
}
I'm using the ConcurrentMap's putIfAbsent to make sure only one download is triggered for a new url; everyone else will receive the same AsyncSubject on which they can 'wait' and get the data once available and immediately after that. Usually, you'd want to limit the number of concurrent downloads by using a custom Scheduler.
Upvotes: 2
Reputation: 12982
The clue is in the question: "Assume that I have a fetcher that fetches an image from a given link on a separate thread. The image will then be cached in memory."
And the answer is the cache()
operator:
"remember the sequence of items emitted by the Observable and emit the same sequence to future Subscribers"
from: https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators
So, the following Observable
should only fetch the image once, no matter how Subscribers
subscribe to it:
Observable<Bitmap> cachedBitmap = fetchBitmapFrom(url).cache();
EDIT:
I think the following example proves that the upstream Observable
is subscribed only once, even if multiple Subscriptions come in before the Observable
has emitted anything. This should also be true for network requests.
package com.example;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class SimpleCacheTest {
public static void main(String[] args) {
final Observable<Integer> cachedSomething = getSomething().cache();
System.out.println("before first subscription");
cachedSomething.subscribe(new SimpleLoggingSubscriber<Integer>("1"));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("before second subscription");
cachedSomething.subscribe(new SimpleLoggingSubscriber<Integer>("2"));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("quit");
}
private static class SimpleLoggingSubscriber<T> extends Subscriber<T> {
private final String tag;
public SimpleLoggingSubscriber(final String tag) {
this.tag = tag;
}
@Override
public void onCompleted() {
System.out.println("onCompleted (" + tag + ")");
}
@Override
public void onError(Throwable e) {
System.out.println("onError (" + tag + ")");
}
@Override
public void onNext(T t) {
System.out.println("onNext (" + tag + "): " + t);
}
}
private static Observable<Integer> getSomething() {
return Observable.create(new Observable.OnSubscribe<Integer>(){
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("going to sleep now...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext(1);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io());
}
}
Output:
before first subscription
going to sleep now...
before second subscription
onNext (1): 1
onNext (2): 1
onCompleted (1)
onCompleted (2)
quit
Upvotes: 3