Reputation: 2748
I'm doing multiple io calls that I'd like to run in parallel, but I can illustrate my problem with a slightly modified piece of code from Ben Christensen's answer to this question.
In my case, every Observable emits two items instead of one, and I'd like to keep those two together.
public class ParallelTest {
public static void main(String[] args) {
System.out.println("------------ flatMapExampleSync");
flatMapExampleSync();
System.out.println("------------ flatMapExampleAsync");
flatMapExampleAsync();
System.out.println("------------concatMapExampleAsync");
concatMapExampleAsync();
System.out.println("------------");
}
private static void flatMapExampleAsync() {
Observable.range(0, 5).flatMap(i -> {
return getDataAsync(i);
}).toBlocking().forEach(System.out::println);
}
private static void concatMapExampleAsync() {
Observable.range(0, 5).concatMap(i -> {
return getDataAsync(i);
}).toBlocking().forEach(System.out::println);
}
private static void flatMapExampleSync() {
Observable.range(0, 5).flatMap(i -> {
return getDataSync(i);
}).toBlocking().forEach(System.out::println);
}
// artificial representations of IO work
static Observable<String> getDataAsync(int i) {
return getDataSync(i).subscribeOn(Schedulers.io());
}
static Observable<String> getDataSync(int i) {
return Observable.create((Subscriber<? super String> s) -> {
// simulate latency
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
s.onNext("First"+i);
try {
Thread.sleep(50);
} catch (Exception e) {
e.printStackTrace();
}
s.onNext("Second"+i);
s.onCompleted();
});
}
}
The output is like this:
------------ flatMapExampleSync
First0
Second0
First1
Second1
First2
Second2
First3
Second3
First4
Second4
------------ flatMapExampleAsync
First0
First4
First1
First2
First3
Second0
Second4
Second1
Second2
Second3
------------concatMapExampleAsync
First0
Second0
First1
Second1
First2
Second2
First3
Second3
First4
Second4
------------
In the first one the ordering is fine, but it (obviously) is all sequential, so it's slow.
The second case is fast, but the FirstN and SecondN are separated.
Finally I tried replacing flatMap with concatMap, that solves the ordering problems, but it also completely removes any parallelism, so it behaves like flatMapExampleSync.
I'd like to end up with something like this:
------------someMagicalOperatorAsync
First2
Second2
First3
Second3
First0
Second0
First1
Second1
First4
Second4
------------
So I don't care about the ordering of the 'groups' as long as they are kept together.
Is there an operator that does that?
Upvotes: 1
Views: 762
Reputation: 70017
Sounds like you need concatEager
or concatMapEager
which will prestart your sources (so they can run in parallel) but concatenate them in order.
Observable.range(0, 5).concatMapEager(i -> {
return getDataAsync(i)
.doOnSubscribe(() -> System.out.println(i + " is running!"))
;
}).toBlocking().forEach(System.out::println);
Upvotes: 2