Nikita Fedyashev
Nikita Fedyashev

Reputation: 18993

Race condition in RxJS?

I'm currently working on RxJS component which role is to pull periodically some data from the database, make it available as BehaviorSubject Observable and be available to clients.

// "Producer"
const source2 = Observable.create((observer: Observer<Map<number,any>>) => {
  this.fooService.loadFoos().then((val) => {
    observer.next(val)
    observer.complete()
  })
})

const subj = new BehaviorSubject(new Map())

interval(5000).pipe(
  switchMap(() => source2),
).subscribe(subj)


// "Client"
return subj.pipe(
  first(),        // get only the latest data, a Map with number keys for foos
  map((f: Map<number,any>) => {
    if (!f.has(fooId)) {
      throw new Error('foo not found')
    }
    return f
  }),
).toPromise()

It works well unless except for rare cases when interval kicks in and somewhere along with switchMap f/value looks like a blank map instead. That was reproduced under stress test using apache benchmark.

Why this could be so? Is there anything wrong with the code above? Is it because of switchMap's canceling effect?

That's how often it fails:

Concurrency Level:      100
Time taken for tests:   6.620 seconds
Complete requests:      5000
Failed requests:        1230

Upvotes: 1

Views: 1664

Answers (1)

BizzyBob
BizzyBob

Reputation: 14740

This isn't a definitive answer, but it was getting way too long for a comment :-)


Your BehaviorSubject starts with a default value of new Map(), so any subscriptions that happen before your interval fires the first time (5000ms) and before the first promise call returns, will receive an empty map.

If you need the replay functionality, but not the default value, you could instead use a ReplaySubject:

const subj = new ReplaySubject<Map<number,any>>(1);

Is it because of switchMap's canceling effect

No. switchMap wouldn't just return an empty array, when it "switches" it just drops emissions from the previous source observable and only propagates emissions from the new one.

Is it possible getFoos() ever returns an empty map?

Notes:

your "producer" function is basically from() (docs | source code), so you could just use:

const source2 = from(this.fooService.loadFoos());

If you're not calling subject.next anywhere, you could get rid of the subject altogether and simply do:

const subj = interval(5000).pipe(
  startWith(new Map()),
  switchMapTo(source2),
  shareReplay(1)
);

Upvotes: 1

Related Questions