tir38
tir38

Reputation: 10451

RxJava2's delay(rx.functions.Func1) is not emitting items in order

I am using this signature of delay:

public final <U> Observable<T> delay(Func1<? super T,? extends Observable<U>> itemDelay)

javadoc

I am using Func1 to return an Observable which acts as a sort of "trigger". My goal is to delay items until an outside async operation completes. Once that operation completes I want to emit all items that have been delayed and all future items in order.

Here is some sample code that shows what I'm trying to do:

import java.util.concurrent.atomic.AtomicBoolean;

import io.reactivex.Observable;
import io.reactivex.ObservableTransformer;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.ReplaySubject;

public class Example {

    private ReplaySubject<Object> delayTrigger = ReplaySubject.create(); // (1)

    public void main() {
        System.out.println("============ MAIN ============");
        SourceThread sourceThread = new SourceThread();
        sourceThread.start();

        sourceThread.stream
                .compose(doOnFirst(integer -> startAsyncOperation())) // (2)
                .delay(integer -> delayTrigger) // (3)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe((Integer integer)
                        -> System.out.println("onNext: " + integer));
    }

    private void startAsyncOperation() {
        System.out.println(">>>>>>> long async operation started");
        SomeOtherThread someOtherThread = new SomeOtherThread();
        someOtherThread.start();
    }

    private void onAsyncOperationComplete() {
        System.out.println("<<<<<<< long async operation completed");
        delayTrigger.onNext(new Object()); // (4)
    }

    /**
     * From https://stackoverflow.com/a/32366794
     */
    private <T> ObservableTransformer<T, T> doOnFirst(Consumer<? super T> consumer) {
        return observableTransformer -> Observable.defer(() -> {
            final AtomicBoolean first = new AtomicBoolean(true);
            return observableTransformer.doOnNext(t -> {
                if (first.compareAndSet(true, false)) {
                    consumer.accept(t);
                }
            });
        });
    }

    /**
     * Some thread to simulate a some time delayed source.
     * This is not really part of the problem,
     * we just need a time delayed source on another thread
     */
    private final class SourceThread extends Thread {
        private ReplaySubject<Integer> stream = ReplaySubject.create();

        @Override
        public void run() {
            super.run();
            for (int i = 0; i < 100; i++) {
                stream.onNext(i);
                System.out.println("Source emits item: " + i);
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private final class SomeOtherThread extends Thread {
        @Override
        public void run() {
            super.run();
            try {
                Thread.sleep(1000);
                onAsyncOperationComplete();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

At (1) I create a ReplaySubject that will act as my trigger, at (2) I start the async operation, at (3) I delay until the trigger emits something; finally at (4) I put something into the trigger stream when async operation completes.

This works fine for the most part, except when the async operation completes, the stream returned from delay is out of order.

I/System.out: Source emits item: 46
I/System.out: Source emits item: 47
I/System.out: <<<<<<< long async operation completed
I/System.out: Source emits item: 48
I/System.out: onNext: 0
I/System.out: onNext: 48 <---- problem here!!!
I/System.out: onNext: 1
I/System.out: onNext: 2
I/System.out: onNext: 3

Item 48 is emitted from delay before items 1 - 47. Item 49 will get emitted out of order as well. This will continue until items 1-47 are emitted, then the stream cleans up. But there is a big section of un-ordered items. Is there a way I can fix this? Am I using delay correctly? Is this a bug in delay?

For reference this is just a sample. In my "real" problem I have no way to re-order the emitted items once they get out of order (i.e. they aren't nicely numbered).

Upvotes: 2

Views: 305

Answers (1)

akarnokd
akarnokd

Reputation: 70007

That delay operator has no ordering guarantees because the inner source for item #1 may signal later than another inner source for item #2 in general. Any async signal may throw off the ordering, even if coming from a source such as a terminated ReplaySubject.

I assume you want to prefetch the main source but not let it through before the external signal, right? In this case, you can use concatArrayEager where the first source's completion triggers the emission of the prefetched second source:

PublishSubject<Integer> delayer = PublishSubject.create();

Observable.concatArrayEager(
    delayer,
    sourceThread.stream
)

// somewhere async
delayer.onComplete();

Upvotes: 1

Related Questions