Reputation: 3162
For example, I have the following code that uses RxJava library:
public class MultithreadingExample {
public static void main(String[] args) throws InterruptedException {
Observable.from(Lists.newArrayList(1, 2, 3, 4, 5))
.observeOn(Schedulers.computation())
.map(numberToString())
.subscribe(printResult());
Thread.sleep(10000);
}
private static Func1<Integer, String> numberToString() {
return number -> {
System.out.println("Operator thread: " + Thread.currentThread().getName());
return String.valueOf(number);
};
}
private static Action1<String> printResult() {
return result -> {
System.out.println("Subscriber thread: " + Thread.currentThread().getName());
System.out.println("Result: " + result);
};
}
}
And I want events to be processed in the Observer by multiple threads, for example, item '1' by Thread-1, item '2' by Thread-2 and so on.
What is the best way to do it with RxJava?
Upvotes: 0
Views: 127
Reputation: 8227
You can use theflatMap()
operator.
Observable.from(Lists.newArrayList(1, 2, 3, 4, 5))
.flatMap( number -> Observable.defer( numberToString() )
.subscribeOn( Schedulers.computation() ) )
.observeOn(Schedulers.computation())
.map(numberToString())
.subscribe(printResult());
The flatMap()
operator will subscribe to the new observable on a (likely new) thread, merging the results back on to the thread where the final observeOn()
is done.
Upvotes: 2