Reputation: 18993
I'm currently working on RxJS component which role is to pull periodically some data from the database, make it available as BehaviorSubject Observable and be available to clients.
// "Producer"
const source2 = Observable.create((observer: Observer<Map<number,any>>) => {
this.fooService.loadFoos().then((val) => {
observer.next(val)
observer.complete()
})
})
const subj = new BehaviorSubject(new Map())
interval(5000).pipe(
switchMap(() => source2),
).subscribe(subj)
// "Client"
return subj.pipe(
first(), // get only the latest data, a Map with number keys for foos
map((f: Map<number,any>) => {
if (!f.has(fooId)) {
throw new Error('foo not found')
}
return f
}),
).toPromise()
It works well unless except for rare cases when interval kicks in and somewhere along with switchMap f/value looks like a blank map instead. That was reproduced under stress test using apache benchmark.
Why this could be so? Is there anything wrong with the code above? Is it because of switchMap's canceling effect?
That's how often it fails:
Concurrency Level: 100
Time taken for tests: 6.620 seconds
Complete requests: 5000
Failed requests: 1230
Upvotes: 1
Views: 1664
Reputation: 14740
This isn't a definitive answer, but it was getting way too long for a comment :-)
Your BehaviorSubject starts with a default value of new Map()
, so any subscriptions that happen before your interval fires the first time (5000ms) and before the first promise call returns, will receive an empty map.
If you need the replay functionality, but not the default value, you could instead use a ReplaySubject
:
const subj = new ReplaySubject<Map<number,any>>(1);
Is it because of switchMap's canceling effect
No. switchMap wouldn't just return an empty array, when it "switches" it just drops emissions from the previous source observable and only propagates emissions from the new one.
Is it possible getFoos()
ever returns an empty map?
your "producer" function is basically from()
(docs | source code), so you could just use:
const source2 = from(this.fooService.loadFoos());
If you're not calling subject.next
anywhere, you could get rid of the subject altogether and simply do:
const subj = interval(5000).pipe(
startWith(new Map()),
switchMapTo(source2),
shareReplay(1)
);
Upvotes: 1