David M. Karr
David M. Karr

Reputation: 15225

How to ensure that rxjava methods execute in parallel, and finish?

Although I've been writing Java code for many years, I've barely done any work with RxJava, and I need to understand how to map it to expected results. I have a lot of existing code in services I work with, but I'm not convinced they are using RxJava properly.

Note that we're using an old version of RxJava, 2.1.10. I can't upgrade at this moment.

The following is a common pattern I see in our codebase:

Single<ResultType> result1 = Single.<ResultType>create(source -> {
    source.onSuccess(method1(parameters));
}).subscribeOn(Schedulers.io());
Single<ReturnType> result2 = Single.<ResultType>create(source -> {
    source.onSuccess(method2(parameters));
}).subscribeOn(Schedulers.io());

if (null != result1 && null != result2) {

The intent of this is that the execution of "method1" and "method2" run in parallel, and that the check for "null != result1 && null != result2" happens after both methods have finished executing. I'm thinking it's possible that neither of these intentions are being fulfilled here, but I need confirmation of that, and also how to achieve those goals properly.

Upvotes: 0

Views: 67

Answers (1)

Progman
Progman

Reputation: 19545

Depending on how your sources are setup, you can use combineLatest() to wait for the result from both sources. A sample proof-of-concept code might look like this:

public static void main(String[] args) throws Exception {
    Callable<Integer> c1 = new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            System.out.println(System.currentTimeMillis()+"|Starting first");
            Thread.sleep(1111);
            System.out.println(System.currentTimeMillis()+"|finished first");
            return 42;
        }};
    Single<Integer> singleFirst = Single.fromCallable(c1).subscribeOn(Schedulers.newThread());

    Callable<Integer> c2 = new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            System.out.println(System.currentTimeMillis()+"|Starting second");
            Thread.sleep(5555);
            System.out.println(System.currentTimeMillis()+"|finished second");
            return 12;
        }};
    Single<Integer> singleSecond = Single.fromCallable(c2).subscribeOn(Schedulers.newThread());
    BiFunction<Integer, Integer, Integer> func = (a,b) -> a+b;

    ObservableSource<Integer> source1 = singleFirst.toObservable();
    ObservableSource<Integer> source2 = singleSecond.toObservable();
    Observable<Integer> resultSource = Observable.combineLatest(source1, source2, func);
    System.out.println(System.currentTimeMillis()+"|All setup, wait for completion");
    resultSource.blockingSubscribe(r -> {
        System.out.println(System.currentTimeMillis()+"|Result is: "+r);
    });
}

This might generate the following output:

1589229378890|All setup, wait for completion
1589229378895|Starting second
1589229378895|Starting first
1589229380007|finished first
1589229384451|finished second
1589229384452|Result is: 54

As you see the Single subscriptions run in parallel and their values are "collected" in a combineLatest() call at the end.

Upvotes: 1

Related Questions