Alexander Mills
Alexander Mills

Reputation: 100130

Cancel an Observable from the producer side, not consumer side

Cancelling from the consumer side, might be called using takeUntil, but that's not necessarily very dynamic. In this case, though, I am looking to cancel an Observable from the producer side of the equation, in the same way you might wish to cancel a Promise inside a promise chain (which is not very possible with the native utility).

Say I have this Observable being returned from a method. (This Queue library is a simple persistent queue that read/writes to a text file, we need to lock read/writes so nothing gets corrupted).

Queue.prototype.readUnique = function () {

    var ret = null;
    var lockAcquired = false;

    return this.obsEnqueue
        .flatMap(() => acquireLock(this))
        .flatMap(() => {
            lockAcquired = true;
            return removeOneLine(this)
        })
        .flatMap(val => {
            ret = val;   // this is not very good
            return releaseLock(this);
        })
        .map(() => {
            return JSON.parse(ret);
        })
        .catch(e => {
            if (lockAcquired) {
                return releaseLock(this);
            }
            else {
                return genericObservable();
            }
        });

};

I have 2 different questions -

  1. If I cannot acquire the lock, how can I "cancel" the observable, to just send back an empty Observable with no result(s)? Would I really have to do if/else logic in each return call to decide whether the current chain is cancelled and if so, return an empty Observable? By empty, I mean an Observable that simple fires onNext/onComplete without any possibility for errors and without any values for onNext. Technically, I don't think that's an empty Observable, so I am looking for what that is really called, if it exists.

  2. If you look at this particular sequence of code:

    .flatMap(() => acquireLock(this))
    .flatMap(() => {
        lockAcquired = true;
        return removeOneLine(this)
    })
    .flatMap(val => {
        ret = val;
        return releaseLock(this);
    })
    .map(() => {
        return JSON.parse(ret);
    })
    

what I am doing is storing a reference to ret at the top of the method and then referencing it again a step later. What I am looking for is a way to pass the value fired from removeOneLine() to JSON.parse(), without having to set some state outside the chain (which is simply inelegant).

Upvotes: 5

Views: 478

Answers (2)

Olaf Horstmann
Olaf Horstmann

Reputation: 16892

1) It depends on how your method acquireLock works - but I am assuming that it throws an error if it cannot acquire the lock, in that case you could create your stream with a catch and set the fallback stream to an empty one:

return Rx.Observable.catch(
        removeLine$,
        Rx.Observable.empty()
    );

2) To spare the stateful external variable you could simply chain a mapTo:

let removeLine$ = acquireLock(this)
    .flatMap(() => this.obsEnqueue
        .flatMap(() => removeOneLine(this))
        .flatMap(val => releaseLock(this).mapTo(val))
        .map(val => JSON.parse(val))
        .catch(() => releaseLock(this))
    );

Upvotes: 3

Asti
Asti

Reputation: 12687

According to your definition of cancel, it is to prevent an observable from sending a value downstream. To prevent an observable from pushing a value, you can use filter:

It can be as simple as:

observable.filter(_ => lockAcquired)

This will only send a notification downstream if lockAcquired is true.

Upvotes: 3

Related Questions