Lubos Mudrak
Lubos Mudrak

Reputation: 680

RxJava merge Observables but take result only from first

I want to achive something like this: I have request that produces observable and also I have kind of a memory cache that can be a behavior subject. In my code I want to merge these two observables in a way, that if subject has some data I want to take these and don't start request observable. Can something like this be achived ?

At the moment I am doing something like this :

val resp = run { run(Task.GetMessages()) }

return InboxModel(news = Observable.merge(IoImplementation.cachedUserMessages(), resp).distinctUntilChanged())

but I don't think this is good

Upvotes: 0

Views: 867

Answers (1)

Aaron
Aaron

Reputation: 1009

It sounds like you want to try to find work in the cache observable and only if it's empty then try to do the real work. The observable.switchIfEmpty(Observable) operator should do what you need. If your cache observable is empty then it will subscribe to the real work observable.

public void handleRequests(Observable<Request> requests) {
    requests.flatMap(Request r -> {
            String key = r.getSomeCacheKey();
            return getFromCache(key)
                .switchIfEmpty(doRealWork(r)
                    .doOnNext(saveToCache(key))
                );
        })
        .subscribe((String response) -> System.out::println);
}

// hashmap based cache may not be what you want but i'm 
// including it for completeness of example
Map<String, Observable<String>> cache = //...

Observable<String> getFromCache(String key) {
    return cache.get(key);
}

Action1<String> saveToCache(String key) {
    return (String response) -> cache.put(key, response);
}

Observable<String> doRealWork(Request someRequest) {
    // imagine real work here
    return Observable.just("some response");
}

Upvotes: 0

Related Questions