Luke
Luke

Reputation: 1306

How to filter Observables with other Observables

The problem I am facing is as follows: I have two observables one is fetching data from network and the other from db. The second one might be empty but the lack of the first one is considered an error. Then if the result from network comes I need to compare it with the latest results from db ( if present ) and if they differ I want to store them ( if the db observable is empty I want to store network results anyway).

Is there any dedicated operator that handles a case like this?

So far I tried a solution with zipWith ( which is not working as expected if db is empty ), buffer ( which is working but is far from ideal ), and I also tried flatmapping ( which requires additional casting in the subscriber ).

Below is the solution with buffer.

Observable.concat(ratesFromNetwork(), latestRatesFromDB())
                .buffer(3000, 2)
                .filter(buffer -> !(buffer.size() == 2 && !buffer.get(0).differentThan(buffer.get(1))))
                .map(buffer -> buffer.get(0))
                .subscribe(this::save,
                        (ex) -> System.out.println(ex.getMessage()),
                        () -> System.out.println("completed"));

If I modify latestRatesFromDb so that it is not returning Observable but and Optional instead the whole problem becomes trivial because I can filter using this result. It seams that there is no way to filter in an asynchronous way ( or did I miss something ?)

Upvotes: 1

Views: 49

Answers (1)

flakes
flakes

Reputation: 23624

Okay, here is how I would go about writing this.

Firstly, whatever class has the differentThan function should be changed to override equals instead. Otherwise you can't use a lot of basic methods with these objects.

For the purpose of this example I wrote all the observables using the Integer class as my type parameter. I then use a scheduler to write two mock methods:

static Observable<Integer> ratesFromNetwork(Scheduler scheduler) {
    return Observable.<Integer>create(sub -> {
        sub.onNext(2);
        sub.onCompleted();
    }).delay(99, TimeUnit.MILLISECONDS, scheduler);
}

static Observable<Integer> latestRatesFromDB(Scheduler scheduler) {
    return Observable.<Integer>create(sub -> {
        sub.onNext(1);
        sub.onCompleted();
    }).delay(99, TimeUnit.MILLISECONDS, scheduler);
}

As you can see both are similar, however, they will emit different values.

lack of the first one is considered an error

The best way to achieve this is to use a timeout. You can log the error immediately here and continue:

final Observable<Integer> networkRate = ratesFromNetwork(scheduler)
    .timeout(networkTimeOut, TimeUnit.MILLISECONDS, scheduler)
    .doOnError(e -> System.err.println("Failed to get rates from network."));

When the timeout fails an error will be thrown by rx. doOnError will give you a better idea of where this error started and let it propagate through the rest of the sequence.

The second one might be empty

In this case I would do a similar strategy, however, do not let the error propagate by using the method onErrorResumeNext. Now you can make sure the observable emits at least one value by using firstOrDefault. In this method use some dummy value that you expect to never match with the network results.

final Observable<Integer> databaseRate = latestRatesFromDB(scheduler)
    .timeout(databaseTimeOut, TimeUnit.MILLISECONDS, scheduler)
    .doOnError(e -> System.err.println("Failed to get rates from database"))
    .onErrorResumeNext(Observable.empty())
    .firstOrDefault(-1);

Now by using the distinct method you can grab a value only when it is different than the one that came before it (which is why you need to override equals).

databaseRate.concatWith(networkRate).distinct().skip(1)
    .subscribe(i -> System.out.println("Updating to " + i),
        System.err::println,
        () -> System.out.println("completed"));

Here the database rate was placed before the network rate to take advantage of distinct. a skip is then added to always ignore the database rate value.


Complete Code:

final long networkTimeOut = 100;
final long databaseTimeOut = 100;

final TestScheduler scheduler = new TestScheduler();

final Observable<Integer> networkRate = ratesFromNetwork(scheduler)
    .timeout(networkTimeOut, TimeUnit.MILLISECONDS, scheduler)
    .doOnError(e -> System.err.println("Failed to get rates from network."));

final Observable<Integer> databaseRate = latestRatesFromDB(scheduler)
    .timeout(databaseTimeOut, TimeUnit.MILLISECONDS, scheduler)
    .doOnError(e -> System.err.println("Failed to get rates from database"))
    .onErrorResumeNext(Observable.empty())
    .firstOrDefault(-1);

databaseRate.concatWith(networkRate).distinct().skip(1)
    .subscribe(i -> System.out.println("Updating to " + i),
        System.err::println,
        () -> System.out.println("completed"));

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);

When networkTimeOut and databaseTimeOut are greater than 100 it prints:

Updating to 2
completed

When networkTimeOut is less than 100 it prints:

Failed to get rates from network.
java.util.concurrent.TimeoutException

When databaseTimeOut is less than 100 it prints:

Failed to get rates from database
Updating to 2
completed

And if you modify latestRatesFromDB and ratesFromNetwork to return the same value, it simply prints:

completed

And if you don't care about forcing timeouts or logging then it boils down to:

latestRatesFromDB().firstOrDefault(dummyValue)
    .concatWith(ratesFromNetwork())
    .distinct().skip(1)
    .subscribe(this::save, 
        System.err::println, 
        () -> System.out.println("completed"));

Upvotes: 1

Related Questions