Reputation: 331
i have execution method which does some time consuming network calls on io thread
example
/**
* network call
* @param value
* @return
*/
private Observable<Integer> execute(final int value) {
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("done " + value);
subscriber.onNext(value);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io());
}
then i have list of "commands" which must be executed in order. (one after another)
example (Observable.range(x,y) represents list of commands)
public List<Integer> testObservableBackpressure(){
return Observable.range(0,5).flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
System.out.println("started " + integer);
return exeute(integer);
}
}).toList().toBlocking().single();
}
in this way out put is
started 0
started 1
started 2
started 3
started 4
done 0
done 1
done 2
done 4
done 3
Produses faster than is consuming
I want results like that
started 0
done 0
started 1
done 1
started 2
done 2
...
but..
public List<Integer> testObservableBackpressure(){
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> subscriber) {
Observable.range(0,5).subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(1);
}
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(Integer integer) {
System.out.println("started " + integer);
execute(integer).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
subscriber.onNext(integer);
request(1);
}
});
}
});
}
}).toList().toBlocking().single();
}
this way results are as expected
started 0
done 0
started 1
done 1
started 2
done 2
started 3
done 3
started 4
My question would be if there is another more elegant way to handle this problem?
Upvotes: 3
Views: 1709
Reputation: 2442
I'm not sure you need any particular backpressure strategy here. Just use concatMap
.
If you use concatMap
instead of flatMap
, each new input value will only subscribe when the last Observable
emitted from the concatMap
completes. Under the hood, concatMap
uses a SerialSubscription
for this. That should give you the ordering you want.
Upvotes: 3
Reputation: 684
The output that I get when I run your code is:
started 0
started 1
started 2
started 3
started 4
done 1
done 3
done 4
done 2
done 0
Notice that the "done" messages are out of order. This is due to the fact that your code basically parallelises the execution of each call to execute
. For each item emitted by the Observable.range
you flatMap
an Observable
that runs on its own IOScheduler
. Thus every item is handled in parallel on separate threads which makes it impossible for the items to stay in order and correctly interleaved. One option to achieve the desired behaviour is to make sure that all items run on the same IOScheduler
(instead of each item on its own):
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import java.util.List;
public class Test {
private Observable<Integer> execute(final int value) {
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("done " + value);
subscriber.onNext(value);
subscriber.onCompleted();
}
});
}
public List<Integer> testObservableBackpressure(){
return Observable.range(0, 5).flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
System.out.println("started " + integer);
return execute(integer);
}
}).subscribeOn(Schedulers.io()).toList().toBlocking().single();
}
public static void main(String[] args) {
new Test().testObservableBackpressure();
}
}
Notice that the only difference is where you invoke the subscribeOn
operator. This code results in the following output:
started 0
done 0
started 1
done 1
started 2
done 2
started 3
done 3
started 4
done 4
Upvotes: 1