Reputation: 100020
Okay, so I would like to avoid recursion with observables, using a combination of external and internal events instead of recalling the same method/function.
Right now I have this:
Queue.prototype.deq = function (opts) {
opts = opts || {};
const noOfLines = opts.noOfLines || opts.count || 1;
const isConnect = opts.isConnect !== false;
let $dequeue = this.init()
.flatMap(() => {
return acquireLock(this)
.flatMap(obj => {
if(obj.error){
// if there is an error acquiring the lock we
// retry after 100 ms, which means using recursion
// because we call "this.deq()" again
return Rx.Observable.timer(100)
.flatMap(() => this.deq(opts));
}
else{
return makeGenericObservable()
.map(() => obj);
}
})
})
.flatMap(obj => {
return removeOneLine(this)
.map(l => ({l: l, id: obj.id}))
})
.flatMap(obj => {
return releaseLock(this, obj.id)
.map(() => obj.l)
})
.catch(e => {
console.error(e.stack || e);
return releaseLock(this);
});
if (isConnect) {
$dequeue = $dequeue.publish();
$dequeue.connect();
}
return $dequeue;
};
instead of the above, which uses recursion (hopefully correctly), I'd like to use a more evented approach. If there is an error passed back from the acquireLock function, I would like to retry every 100ms, as soon as it succeeds I want to stop, I am not sure how to do this, and it's difficult for me to test it....is this about right?
Queue.prototype.deq = function (opts) {
// ....
let $dequeue = this.init()
.flatMap(() => {
return acquireLock(this)
.flatMap(obj => {
if(obj.error){
return Rx.Observable.interval(100)
.takeUntil(
acquireLock(this)
.filter(obj => !obj.error)
)
}
else{
// this is just an "empty" observable
// which immediately fires onNext()
return makeGenericObservable()
.map(() => obj);
}
})
})
// ...
return $dequeue;
};
Is there a way to make that more concise? I also would like to only retry 5 times or so. My primary question is - How can I also create a count alongside the interval, so that every 100 ms I retry, until either the lock is acquired or a count reaches 5?
I need something like this:
.takeUntil(this or that)
perhaps I can simply chain the takeUntils, like so:
return Rx.Observable.interval(100)
.takeUntil(
acquireLock(this)
.filter(obj => !obj.error)
)
.takeUntil(++count < 5);
I could do this:
return Rx.Observable.interval(100)
.takeUntil(
acquireLock(this)
.filter(obj => !obj.error)
)
.takeUntil( Rx.Observable.timer(500));
But probably looking for something a little less kludgy
But I don't know where to (store / keep track of) the count
variable...
Also looking to make this more concise and possibly check it for correctness.
I have to say, if this stuff works, it's very powerful coding constructs.
Upvotes: 3
Views: 642
Reputation: 2839
There are two operators that can help you: retry and retryWhen. The both resubscribe on source observable an thus retry the failed operation.
Check this example where we have an observable that fails on first count
subscriptions:
let getObs = (count) => {
return Rx.Observable.create((subs) => {
console.log('Subscription count = ', count);
if(count) {
count--;
subs.error("ERROR");
} else {
subs.next("SUCCESS");
subs.complete();
}
return () => {};
});
};
getObs(2).subscribe(console.log, console.log);
getObs(2).retry(2).subscribe(console.log, console.log);
getObs(3).retry(2).subscribe(console.log, console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
As you can see:
retry
we can, well, retry it several times and пуе success responseretry
will give up and propagate error along the chain.What you actually need is retryWhen
because retry
tries to perform operation again without delay.
let getObs = (count) => {
return Rx.Observable.create((subs) => {
if(count) {
count--;
subs.error("ERROR");
} else {
subs.next("SUCCESS");
subs.complete();
}
return () => {};
});
};
getObs(2).retryWhen(errors => errors.delay(100))
.subscribe(console.log, console.log);
getObs(4).retryWhen(errors => errors.delay(100))
.subscribe(console.log, console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
It's easy to add a delay with retryWhen
but force it to fail after number of attempts is harder:
let getObs = (count) => {
return Rx.Observable.create((subs) => {
if(count) {
count--;
subs.error("ERROR");
} else {
subs.next("SUCCESS");
subs.complete();
}
return () => {};
});
};
getObs(2)
.retryWhen(errors => {
return errors.delay(100).scan((errorCount, err) => {
if(!errorCount) {
throw err;
}
return --errorCount;
}, 2);
})
.subscribe(console.log, console.log);
getObs(4)
.retryWhen(errors => {
return errors.delay(100).scan((errorCount, err) => {
if(!errorCount) {
throw err;
}
return --errorCount;
}, 2);
})
.subscribe(console.log, console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
And finally, both retries expect error to be thrown so you need to do this when acquiring lock:
.flatMap(() => {
return acquireLock(this)
.switchMap(obj => {
if(obj.error) {
return Rx.Observable.throw(obj.error);
} else {
Rx.Observable.of(obj);
}
})
.retryWhen(...)
})
Upvotes: 1