Kevin Krumwiede
Kevin Krumwiede

Reputation: 10308

Does disposing a Disposable guarantee no future calls?

Does disposing the Disposable returned from Observable#subscribe(Consumer) guarantee that the Consumer will receive no further calls to accept? If not, how can I obtain such a guarantee?

If it makes a difference, the Observable is observed and the Disposable is disposed in the same single threaded Scheduler.

Upvotes: 0

Views: 197

Answers (1)

akarnokd
akarnokd

Reputation: 70017

Does disposing the Disposable returned from Observable#subscribe(Consumer) guarantee that the Consumer will receive no further calls to accept?

There is no such hard guarantee. RxJava 2 does its best effort to stop an emission upon an asynchronous cancellation, but it is possible the thread issuing the cancellation is delayed ever so slightly that from an outside point of view, one or more items still slipped through.

If not, how can I obtain such a guarantee?

You can make things more eager and mutually exclusive but in the asynchronous world it is often difficult to tell what happened first or during an action.

If it makes a difference, the Observable is observed and the Disposable is disposed in the same single threaded Scheduler.

If the cancellation happens in-sequence, such as using take(2), a synchronously connected upstream will stop sending items: range(1, 5).take(2) will never try to emit 3, 4, 5. However,

range(1, 5)
.map(v -> v + 1)
.observeOn(io())
.take(2)
.subscribe()

may run through 1..5 and perform the mapping before take even sees the value 2.

Depending on what type of flow you have, sending a task to a scheduler to dispose a running sequence on that scheduler may never execute as it is blocked by the task currently emitting, thus the following will not stop the sequence:

var single = Schedulers.single();

var dispose = single.scheduleDirect(() -> {
    while (!Thread.currentThread().isInterrupted()) {
        System.out.println("Processing...");
    }
});

// this will not execute
single.scheduleDirect(() -> dispose.dispose());

// but this will stop the processing
dispose.dispose();

Thread.sleep(1000);

Upvotes: 2

Related Questions