Reputation: 1306
The problem I am facing is as follows: I have two observables one is fetching data from network and the other from db. The second one might be empty but the lack of the first one is considered an error. Then if the result from network comes I need to compare it with the latest results from db ( if present ) and if they differ I want to store them ( if the db observable is empty I want to store network results anyway).
Is there any dedicated operator that handles a case like this?
So far I tried a solution with zipWith ( which is not working as expected if db is empty ), buffer ( which is working but is far from ideal ), and I also tried flatmapping ( which requires additional casting in the subscriber ).
Below is the solution with buffer.
Observable.concat(ratesFromNetwork(), latestRatesFromDB())
.buffer(3000, 2)
.filter(buffer -> !(buffer.size() == 2 && !buffer.get(0).differentThan(buffer.get(1))))
.map(buffer -> buffer.get(0))
.subscribe(this::save,
(ex) -> System.out.println(ex.getMessage()),
() -> System.out.println("completed"));
If I modify latestRatesFromDb so that it is not returning Observable but and Optional instead the whole problem becomes trivial because I can filter using this result. It seams that there is no way to filter in an asynchronous way ( or did I miss something ?)
Upvotes: 1
Views: 49
Reputation: 23624
Okay, here is how I would go about writing this.
Firstly, whatever class has the differentThan
function should be changed to override equals
instead. Otherwise you can't use a lot of basic methods with these objects.
For the purpose of this example I wrote all the observables using the Integer
class as my type parameter. I then use a scheduler to write two mock methods:
static Observable<Integer> ratesFromNetwork(Scheduler scheduler) {
return Observable.<Integer>create(sub -> {
sub.onNext(2);
sub.onCompleted();
}).delay(99, TimeUnit.MILLISECONDS, scheduler);
}
static Observable<Integer> latestRatesFromDB(Scheduler scheduler) {
return Observable.<Integer>create(sub -> {
sub.onNext(1);
sub.onCompleted();
}).delay(99, TimeUnit.MILLISECONDS, scheduler);
}
As you can see both are similar, however, they will emit different values.
lack of the first one is considered an error
The best way to achieve this is to use a timeout
. You can log the error immediately here and continue:
final Observable<Integer> networkRate = ratesFromNetwork(scheduler)
.timeout(networkTimeOut, TimeUnit.MILLISECONDS, scheduler)
.doOnError(e -> System.err.println("Failed to get rates from network."));
When the timeout
fails an error will be thrown by rx. doOnError
will give you a better idea of where this error started and let it propagate through the rest of the sequence.
The second one might be empty
In this case I would do a similar strategy, however, do not let the error propagate by using the method onErrorResumeNext
. Now you can make sure the observable emits at least one value by using firstOrDefault
. In this method use some dummy value that you expect to never match with the network results.
final Observable<Integer> databaseRate = latestRatesFromDB(scheduler)
.timeout(databaseTimeOut, TimeUnit.MILLISECONDS, scheduler)
.doOnError(e -> System.err.println("Failed to get rates from database"))
.onErrorResumeNext(Observable.empty())
.firstOrDefault(-1);
Now by using the distinct
method you can grab a value only when it is different than the one that came before it (which is why you need to override equals
).
databaseRate.concatWith(networkRate).distinct().skip(1)
.subscribe(i -> System.out.println("Updating to " + i),
System.err::println,
() -> System.out.println("completed"));
Here the database rate was placed before the network rate to take advantage of distinct
. a skip
is then added to always ignore the database rate value.
Complete Code:
final long networkTimeOut = 100;
final long databaseTimeOut = 100;
final TestScheduler scheduler = new TestScheduler();
final Observable<Integer> networkRate = ratesFromNetwork(scheduler)
.timeout(networkTimeOut, TimeUnit.MILLISECONDS, scheduler)
.doOnError(e -> System.err.println("Failed to get rates from network."));
final Observable<Integer> databaseRate = latestRatesFromDB(scheduler)
.timeout(databaseTimeOut, TimeUnit.MILLISECONDS, scheduler)
.doOnError(e -> System.err.println("Failed to get rates from database"))
.onErrorResumeNext(Observable.empty())
.firstOrDefault(-1);
databaseRate.concatWith(networkRate).distinct().skip(1)
.subscribe(i -> System.out.println("Updating to " + i),
System.err::println,
() -> System.out.println("completed"));
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
When networkTimeOut
and databaseTimeOut
are greater than 100 it prints:
Updating to 2
completed
When networkTimeOut
is less than 100 it prints:
Failed to get rates from network.
java.util.concurrent.TimeoutException
When databaseTimeOut
is less than 100 it prints:
Failed to get rates from database
Updating to 2
completed
And if you modify latestRatesFromDB
and ratesFromNetwork
to return the same value, it simply prints:
completed
And if you don't care about forcing timeouts or logging then it boils down to:
latestRatesFromDB().firstOrDefault(dummyValue)
.concatWith(ratesFromNetwork())
.distinct().skip(1)
.subscribe(this::save,
System.err::println,
() -> System.out.println("completed"));
Upvotes: 1