Reputation: 2224
I have a stream which emits at 1Hz. Once in a while, there is a delay between the emitted items of some seconds, let's say 10 seconds. I want to create an observable which subscribes to the source, and everytime the delay between the items is too long (e.g. 5s), it shall emit an item of another type. However, when the source emits normal values again, it should emit the source.
-O-O-O-O-O----------O-O-O-O---|---> source
-O-O-O-O-O----X-----O-O-O-O---|---> observable
I thought, that I could use timeoutWith(delay,of(X))
in this case, but this would unsubscribe from the source, loosing the rest of the stream.
When I use switchMap(O => of(O).timeoutWith(delay, of(x))
to have a disposable stream of Os, it does not timeout as the inner observable hasn't been created yet.
Any ideas?
FINAL SOLUTION
This is the solution, which in the end is what I need:
this.sensorChanged
.pipe(
mapTo(SensorEvent.SIGNAL_FOUND),
startWith(PositioningEvent.SIGNAL_UNAVAILABLE),
switchMap(x => concat(of(x), timer(5000).pipe(mapTo(PositioningEvent.SIGNAL_LOST)))),
distinctUntilChanged()
)
The missing link was the startWith() which prevented the switchMap from emission.
Upvotes: 1
Views: 456
Reputation: 691635
Not tested, but this should do the trick:
const result$ = source$.pipe(
switchMap(o => concat(of(o), timer(5000).pipe(mapTo(x))))
);
Upvotes: 1