Alexander Mills
Alexander Mills

Reputation: 100020

Avoid recursion with RxJS5 observables

Okay, so I would like to avoid recursion with observables, using a combination of external and internal events instead of recalling the same method/function.

Right now I have this:

Queue.prototype.deq = function (opts) {

    opts = opts || {};

    const noOfLines = opts.noOfLines || opts.count || 1;
    const isConnect = opts.isConnect !== false;

    let $dequeue = this.init()
        .flatMap(() => {
            return acquireLock(this)
                .flatMap(obj => {
                    if(obj.error){

                    // if there is an error acquiring the lock we
                    // retry after 100 ms, which means using recursion
                    // because we call "this.deq()" again

                        return Rx.Observable.timer(100)
                            .flatMap(() => this.deq(opts));
                    }
                    else{
                        return makeGenericObservable()
                          .map(() => obj);
                    }
                })

        })
        .flatMap(obj => {
            return removeOneLine(this)
                .map(l => ({l: l, id: obj.id}))
        })
        .flatMap(obj => {
            return releaseLock(this, obj.id)
                .map(() => obj.l)
        })
        .catch(e => {
            console.error(e.stack || e);
            return releaseLock(this);
        });

    if (isConnect) {
        $dequeue = $dequeue.publish();
        $dequeue.connect();
    }

    return $dequeue;

};

instead of the above, which uses recursion (hopefully correctly), I'd like to use a more evented approach. If there is an error passed back from the acquireLock function, I would like to retry every 100ms, as soon as it succeeds I want to stop, I am not sure how to do this, and it's difficult for me to test it....is this about right?

Queue.prototype.deq = function (opts) {

    // ....

    let $dequeue = this.init()
        .flatMap(() => {
            return acquireLock(this)
                .flatMap(obj => {
                    if(obj.error){
                        return Rx.Observable.interval(100)
                            .takeUntil(
                                acquireLock(this)
                                .filter(obj => !obj.error)
                            )
                    }
                    else{

                        // this is just an "empty" observable
                        // which immediately fires onNext()

                        return makeGenericObservable()
                              .map(() => obj);
                    }
                })

        })

     // ...

    return $dequeue;

};

Is there a way to make that more concise? I also would like to only retry 5 times or so. My primary question is - How can I also create a count alongside the interval, so that every 100 ms I retry, until either the lock is acquired or a count reaches 5?

I need something like this:

.takeUntil(this or that)

perhaps I can simply chain the takeUntils, like so:

                   return Rx.Observable.interval(100)
                    .takeUntil(
                        acquireLock(this)
                        .filter(obj => !obj.error)
                    )
                    .takeUntil(++count < 5);

I could do this:

                return Rx.Observable.interval(100)
                    .takeUntil(
                        acquireLock(this)
                        .filter(obj => !obj.error)
                    )
                    .takeUntil( Rx.Observable.timer(500));

But probably looking for something a little less kludgy

But I don't know where to (store / keep track of) the count variable...

Also looking to make this more concise and possibly check it for correctness.

I have to say, if this stuff works, it's very powerful coding constructs.

Upvotes: 3

Views: 642

Answers (1)

Sergey Sokolov
Sergey Sokolov

Reputation: 2839

There are two operators that can help you: retry and retryWhen. The both resubscribe on source observable an thus retry the failed operation.

Check this example where we have an observable that fails on first count subscriptions:

let getObs = (count) => {
  return Rx.Observable.create((subs) => {
    console.log('Subscription count = ', count);

    if(count) {
      count--;
      subs.error("ERROR");
    } else {
      subs.next("SUCCESS");
      subs.complete();
    }
  
    return () => {};
  });
};

getObs(2).subscribe(console.log, console.log);
getObs(2).retry(2).subscribe(console.log, console.log);
getObs(3).retry(2).subscribe(console.log, console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

As you can see:

  • If we call it as it is it will fail
  • With retry we can, well, retry it several times and пуе success response
  • If observable fails too many tomes retry will give up and propagate error along the chain.

What you actually need is retryWhen because retry tries to perform operation again without delay.

let getObs = (count) => {
  return Rx.Observable.create((subs) => {
    if(count) {
      count--;
      subs.error("ERROR");
    } else {
      subs.next("SUCCESS");
      subs.complete();
    }
  
    return () => {};
  });
};

getObs(2).retryWhen(errors => errors.delay(100))
  .subscribe(console.log, console.log);
getObs(4).retryWhen(errors => errors.delay(100))
  .subscribe(console.log, console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

It's easy to add a delay with retryWhen but force it to fail after number of attempts is harder:

let getObs = (count) => {
  return Rx.Observable.create((subs) => {
    if(count) {
      count--;
      subs.error("ERROR");
    } else {
      subs.next("SUCCESS");
      subs.complete();
    }
  
    return () => {};
  });
};

getObs(2)
  .retryWhen(errors => {
    return errors.delay(100).scan((errorCount, err) => {
      if(!errorCount) {
        throw err;
      }
      return --errorCount;
    }, 2);
  })
  .subscribe(console.log, console.log);

getObs(4)
  .retryWhen(errors => {
    return errors.delay(100).scan((errorCount, err) => {
      if(!errorCount) {
        throw err;
      }
      return --errorCount;
    }, 2);
  })
  .subscribe(console.log, console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

And finally, both retries expect error to be thrown so you need to do this when acquiring lock:

    .flatMap(() => {
        return acquireLock(this)
            .switchMap(obj => {
              if(obj.error) {
                return Rx.Observable.throw(obj.error);
              } else {
                Rx.Observable.of(obj);
              }
            })
            .retryWhen(...)
    })

Upvotes: 1

Related Questions