Reputation: 11387
I'm learning reactive programming with RxJava, and want to consume emmited values concurrently withouth blocking in a single execution thread.
Observable
.interval(50, TimeUnit.MILLISECONDS)
.take(5)
.subscribe(new Action1<Long>() {
@Override
public void call(Long counter) {
sleep(1000);
System.out.println("Got: " + counter + " thread : "+ Thread.currentThread().getName());
}
});
sleep(10000);
I'll get this output
Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-1
Got: 2 thread : RxComputationThreadPool-1
Got: 3 thread : RxComputationThreadPool-1
Got: 4 thread : RxComputationThreadPool-1
how do i handle each event in async? like this
Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-2
Got: 2 thread : RxComputationThreadPool-3
Got: 3 thread : RxComputationThreadPool-4
Got: 4 thread : RxComputationThreadPool-5
Upvotes: 5
Views: 507
Reputation: 2662
In Rx, an observable represents concurrency1, so to process notifications concurrently with respect to each other, you must project each notification into an observable.
flatMap
is the asynchronous sequential composition operator. It projects each notification from a source observable into an observable, thus allowing you to process each input value concurrently. It then merges the results of each computation into a flattened observable sequence with non-overlapping notifications.
Addendum:
In the selector
for flatMap
, there are often multiple ways to create a concurrent observable depending upon the target platform. I don't know Java, but in .NET you would typically either use Observable.Start
to introduce concurrency or an asynchronous method (async/await
) to take advantage of native asynchrony, which is often preferable.
1 Technically, an individual subscription (observer) for a cold observable enables concurrency in Rx, though it's often convenient to think in terms of observables instead. See this answer for more info.
Upvotes: 4