Reputation: 15225
Although I've been writing Java code for many years, I've barely done any work with RxJava, and I need to understand how to map it to expected results. I have a lot of existing code in services I work with, but I'm not convinced they are using RxJava properly.
Note that we're using an old version of RxJava, 2.1.10. I can't upgrade at this moment.
The following is a common pattern I see in our codebase:
Single<ResultType> result1 = Single.<ResultType>create(source -> {
source.onSuccess(method1(parameters));
}).subscribeOn(Schedulers.io());
Single<ReturnType> result2 = Single.<ResultType>create(source -> {
source.onSuccess(method2(parameters));
}).subscribeOn(Schedulers.io());
if (null != result1 && null != result2) {
The intent of this is that the execution of "method1" and "method2" run in parallel, and that the check for "null != result1 && null != result2" happens after both methods have finished executing. I'm thinking it's possible that neither of these intentions are being fulfilled here, but I need confirmation of that, and also how to achieve those goals properly.
Upvotes: 0
Views: 67
Reputation: 19545
Depending on how your sources are setup, you can use combineLatest()
to wait for the result from both sources. A sample proof-of-concept code might look like this:
public static void main(String[] args) throws Exception {
Callable<Integer> c1 = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println(System.currentTimeMillis()+"|Starting first");
Thread.sleep(1111);
System.out.println(System.currentTimeMillis()+"|finished first");
return 42;
}};
Single<Integer> singleFirst = Single.fromCallable(c1).subscribeOn(Schedulers.newThread());
Callable<Integer> c2 = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println(System.currentTimeMillis()+"|Starting second");
Thread.sleep(5555);
System.out.println(System.currentTimeMillis()+"|finished second");
return 12;
}};
Single<Integer> singleSecond = Single.fromCallable(c2).subscribeOn(Schedulers.newThread());
BiFunction<Integer, Integer, Integer> func = (a,b) -> a+b;
ObservableSource<Integer> source1 = singleFirst.toObservable();
ObservableSource<Integer> source2 = singleSecond.toObservable();
Observable<Integer> resultSource = Observable.combineLatest(source1, source2, func);
System.out.println(System.currentTimeMillis()+"|All setup, wait for completion");
resultSource.blockingSubscribe(r -> {
System.out.println(System.currentTimeMillis()+"|Result is: "+r);
});
}
This might generate the following output:
1589229378890|All setup, wait for completion
1589229378895|Starting second
1589229378895|Starting first
1589229380007|finished first
1589229384451|finished second
1589229384452|Result is: 54
As you see the Single
subscriptions run in parallel and their values are "collected" in a combineLatest()
call at the end.
Upvotes: 1