Palaima
Palaima

Reputation: 331

RxJava Backpressure (Fast producer slow consumer)

i have execution method which does some time consuming network calls on io thread

example

/**
 * network call
 * @param value
 * @return
 */
private Observable<Integer> execute(final int value) {
    return Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {

            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("done " + value);
            subscriber.onNext(value);
            subscriber.onCompleted();
        }
    }).subscribeOn(Schedulers.io());
}

then i have list of "commands" which must be executed in order. (one after another)

example (Observable.range(x,y) represents list of commands)

public List<Integer> testObservableBackpressure(){
   return Observable.range(0,5).flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            System.out.println("started " + integer);
            return exeute(integer);
        }
    }).toList().toBlocking().single();
}

in this way out put is

started 0
started 1
started 2
started 3
started 4
done 0
done 1
done 2
done 4
done 3

Produses faster than is consuming

I want results like that

started 0
done 0
started 1
done 1
started 2
done 2
...

but..

public List<Integer> testObservableBackpressure(){

    return Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(final Subscriber<? super Integer> subscriber) {
            Observable.range(0,5).subscribe(new Subscriber<Integer>() {

                @Override
                public void onStart() {
                    request(1);
                }

                @Override
                public void onCompleted() {
                    subscriber.onCompleted();
                }

                @Override
                public void onError(Throwable e) {
                    subscriber.onError(e);
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("started " + integer);
                    execute(integer).subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            subscriber.onNext(integer);
                            request(1);
                        }
                    });
                }
            });
        }
    }).toList().toBlocking().single();
}

this way results are as expected

started 0
done 0
started 1
done 1
started 2
done 2
started 3
done 3
started 4

My question would be if there is another more elegant way to handle this problem?

Upvotes: 3

Views: 1709

Answers (2)

lopar
lopar

Reputation: 2442

I'm not sure you need any particular backpressure strategy here. Just use concatMap.

If you use concatMap instead of flatMap, each new input value will only subscribe when the last Observable emitted from the concatMap completes. Under the hood, concatMap uses a SerialSubscription for this. That should give you the ordering you want.

Upvotes: 3

Georgi Khomeriki
Georgi Khomeriki

Reputation: 684

The output that I get when I run your code is:

started 0
started 1
started 2
started 3
started 4
done 1
done 3
done 4
done 2
done 0

Notice that the "done" messages are out of order. This is due to the fact that your code basically parallelises the execution of each call to execute. For each item emitted by the Observable.range you flatMap an Observable that runs on its own IOScheduler. Thus every item is handled in parallel on separate threads which makes it impossible for the items to stay in order and correctly interleaved. One option to achieve the desired behaviour is to make sure that all items run on the same IOScheduler (instead of each item on its own):

import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

import java.util.List;

public class Test {
    private Observable<Integer> execute(final int value) {
        return Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {

                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("done " + value);
                subscriber.onNext(value);
                subscriber.onCompleted();
            }
        });
    }

    public List<Integer> testObservableBackpressure(){
        return Observable.range(0, 5).flatMap(new Func1<Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(Integer integer) {
                System.out.println("started " + integer);
                return execute(integer);
            }
        }).subscribeOn(Schedulers.io()).toList().toBlocking().single();
    }

    public static void main(String[] args) {
        new Test().testObservableBackpressure();
    }
}

Notice that the only difference is where you invoke the subscribeOn operator. This code results in the following output:

started 0
done 0
started 1
done 1
started 2
done 2
started 3
done 3
started 4
done 4

Upvotes: 1

Related Questions