evgeniy44
evgeniy44

Reputation: 3162

RxJava: How to handle events in multiple threads

For example, I have the following code that uses RxJava library:

public class MultithreadingExample {
public static void main(String[] args) throws InterruptedException {
    Observable.from(Lists.newArrayList(1, 2, 3, 4, 5))
            .observeOn(Schedulers.computation())
            .map(numberToString())
            .subscribe(printResult());
    Thread.sleep(10000);
}

private static Func1<Integer, String> numberToString() {
    return number -> {
        System.out.println("Operator thread: " + Thread.currentThread().getName());
        return String.valueOf(number);
    };
}

private static Action1<String> printResult() {
    return result -> {
        System.out.println("Subscriber thread: " + Thread.currentThread().getName());
        System.out.println("Result: " + result);
    };
}

}

And I want events to be processed in the Observer by multiple threads, for example, item '1' by Thread-1, item '2' by Thread-2 and so on.

What is the best way to do it with RxJava?

Upvotes: 0

Views: 127

Answers (1)

Bob Dalgleish
Bob Dalgleish

Reputation: 8227

You can use theflatMap() operator.

Observable.from(Lists.newArrayList(1, 2, 3, 4, 5))
        .flatMap( number -> Observable.defer( numberToString() )
                              .subscribeOn( Schedulers.computation() ) )
        .observeOn(Schedulers.computation())
        .map(numberToString())
        .subscribe(printResult());

The flatMap() operator will subscribe to the new observable on a (likely new) thread, merging the results back on to the thread where the final observeOn() is done.

Upvotes: 2

Related Questions