Denis Loh
Denis Loh

Reputation: 2224

RxJS: Emit value when delay between source emits too long

I have a stream which emits at 1Hz. Once in a while, there is a delay between the emitted items of some seconds, let's say 10 seconds. I want to create an observable which subscribes to the source, and everytime the delay between the items is too long (e.g. 5s), it shall emit an item of another type. However, when the source emits normal values again, it should emit the source.

-O-O-O-O-O----------O-O-O-O---|---> source
-O-O-O-O-O----X-----O-O-O-O---|---> observable

I thought, that I could use timeoutWith(delay,of(X)) in this case, but this would unsubscribe from the source, loosing the rest of the stream.

When I use switchMap(O => of(O).timeoutWith(delay, of(x)) to have a disposable stream of Os, it does not timeout as the inner observable hasn't been created yet.

Any ideas?

FINAL SOLUTION

This is the solution, which in the end is what I need:

this.sensorChanged
    .pipe(
      mapTo(SensorEvent.SIGNAL_FOUND),
      startWith(PositioningEvent.SIGNAL_UNAVAILABLE),
      switchMap(x => concat(of(x), timer(5000).pipe(mapTo(PositioningEvent.SIGNAL_LOST)))),
      distinctUntilChanged()
    )

The missing link was the startWith() which prevented the switchMap from emission.

Upvotes: 1

Views: 456

Answers (1)

JB Nizet
JB Nizet

Reputation: 691635

Not tested, but this should do the trick:

const result$ = source$.pipe(
  switchMap(o => concat(of(o), timer(5000).pipe(mapTo(x))))
);

Upvotes: 1

Related Questions