Muhammad Hewedy
Muhammad Hewedy

Reputation: 30078

Order of execution of Rx Operators

I am trying to figure out the order of executions of Rx operators.

What I know is that, the last one is the create operator, ie. the observable doesn't create until a subscriber is there (cold observable).

So, I written this code to test this behavior:

public static void main(String[] args) throws InterruptedException {

    test(Schedulers.immediate());
    test(Schedulers.computation());
    ExecutorService executor = Executors.newCachedThreadPool();
    test(Schedulers.from(executor));
    executor.shutdown();
    test(Schedulers.io());
    test(Schedulers.newThread());
    test(Schedulers.trampoline());

}


static void test(Scheduler scheduler) throws InterruptedException {
    System.out.printf("-------%s--------\n", scheduler);

    Observable<Integer> create = Observable.create(c -> {
        c.onNext(1);
        c.onCompleted();
        print("CREATE");
    });

    create
    .subscribeOn(scheduler)
    .observeOn(scheduler) .map(e -> { print("MAP"); return e * 2; })
    .observeOn(scheduler) .subscribe(a -> { print("SUBSCRIBE");});

    TimeUnit.MILLISECONDS.sleep(200);
}

static synchronized void print(String s) {
    System.out.printf("%s %s\n", s, Thread.currentThread());
}

The output (similar for multiple runs)

-------rx.internal.schedulers.ImmediateScheduler@45ee12a7--------
MAP Thread[main,5,main]
SUBSCRIBE Thread[main,5,main]
CREATE Thread[main,5,main]
-------rx.internal.schedulers.EventLoopsScheduler@1eb44e46--------
CREATE Thread[RxComputationScheduler-3,5,main]
MAP Thread[RxComputationScheduler-2,5,main]
SUBSCRIBE Thread[RxComputationScheduler-1,5,main]
-------rx.internal.schedulers.ExecutorScheduler@3830f1c0--------
MAP Thread[pool-1-thread-2,5,main]
CREATE Thread[pool-1-thread-3,5,main]
SUBSCRIBE Thread[pool-1-thread-1,5,main]
-------rx.internal.schedulers.CachedThreadScheduler@3fb4f649--------
CREATE Thread[RxIoScheduler-4,5,main]
MAP Thread[RxIoScheduler-3,5,main]
SUBSCRIBE Thread[RxIoScheduler-2,5,main]
-------rx.internal.schedulers.NewThreadScheduler@48cf768c--------
MAP Thread[RxNewThreadScheduler-2,5,main]
SUBSCRIBE Thread[RxNewThreadScheduler-1,5,main]
CREATE Thread[RxNewThreadScheduler-3,5,main]
-------rx.internal.schedulers.TrampolineScheduler@2c13da15--------
MAP Thread[main,5,main]
SUBSCRIBE Thread[main,5,main]
CREATE Thread[main,5,main]

It appears both immediate and trampoline schedulers (both that run on the main thread), executes the right way I expect.

But other schedulers are different (however I am synchronizing the method print, which means AFAIK I am preventing race conditions to happen to the std output).

So, why this happening?

Upvotes: 2

Views: 1667

Answers (2)

Anton Pogonets
Anton Pogonets

Reputation: 1162

Since you your observable is cold chain not starts until you call subscribe.

When you call subscribe chain is starts and firstly invokes rx.Observable.OnSubscribe#call then when you call rx.Observer#onNext value is submitted to chain. Because of you specify schedulers calling map posted to another thread and main thread have time(or not) to finish executing of rx.Observable.OnSubscribe#call.

If you move print("CREATE") above rx.Observer#onNext sequence will be always CREATE -> MAP -> SUBSCRIBE

In this case MAP always before SUBSCRIBE. CREATE is last if everything runs in one thread else in undefined position. Position is undefined due to thread switching.

Update according to comments

why SUBSCRIBE thread created before MAP thread?

Each operator wrap observable into another and return them.

When you call subscribe() invoked rx.Observable.OnSubscribe#call from last created observable.

And then process doing back through the stack.

rx.internal.operators.OnSubscribeMap#call
rx.internal.operators.OperatorObserveOn#call
rx.internal.operators.OnSubscribeMap#call
rx.internal.operators.OperatorSubscribeOn#call
...

And if you look into OperatorObserveOn (code cuted)

public final class OperatorObserveOn<T> implements Operator<T, T> {

    public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
    }

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
        }
    }

    /** Observe through individual queue per observer. */
    static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
        public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
            this.child = child;
            this.recursiveScheduler = scheduler.createWorker();
            this.delayError = delayError;
            this.on = NotificationLite.instance();
            int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
            // this formula calculates the 75% of the bufferSize, rounded up to the next integer
            this.limit = calculatedSize - (calculatedSize >> 2);
            if (UnsafeAccess.isUnsafeAvailable()) {
                queue = new SpscArrayQueue<Object>(calculatedSize);
            } else {
                queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
            }
            // signal that this is an async operator capable of receiving this many
            request(calculatedSize);
        }
    }
}

You can see it createNewWorker in call.

So each observeOn create new worker for execution operators below them in reverse order.

As you can easyly see with Schedulers.newThread()

CREATE (3) -> MAP (2) -> SUBSCRIBE (1)

Upvotes: 1

Yaroslav Stavnichiy
Yaroslav Stavnichiy

Reputation: 21456

immediate and trampoline schedulers use current (single) thread so the order of execution is strictly defined.

All other schedulers are multi-threaded. You schedule three tasks to three different threads.

MAP shall always come before SUBSCRIBE because SUBSCRIBE is only scheduled after MAP completes (the result of map() is passed to subscriber).

Apart from that there is absolutely no guarantee in what order the tasks get serialized (by your print function).

Upvotes: 0

Related Questions