Reputation: 1944
I have an observable that emits a values (it is emitting arrays) periodically.
How can I make it so that if there was no value for 2 seconds, a constant value (for example an empty array) should be emitted?
Upvotes: 2
Views: 1334
Reputation: 14750
One way to do this would be to use switchMap
to create an observable that emits 2 items:
If switchMap
receives another emission from the source, it will create a new inner observable without ever emitting the empty array.
const obs$ = source$.pipe(
switchMap(source =>
concat(
of(source),
of([]).pipe(delay(2000))
)
)
);
Here's a StackBlitz sample.
You could even pull this logic out into its own custom operator:
const obs$ = source$.pipe(
onDormant(2000, [])
);
/*
| Emits the provided value when no emissions
| occur after the specified time duration.
*/
function onDormant<T, V extends T>(delayMs: number, value: V) {
return switchMap((source: T) => concat(
of(source),
timer(delayMs).pipe(mapTo(value as T))
));
}
Upvotes: 4
Reputation: 2165
It would depend on how you are getting these arrays. But you can always update your observable to emit every 2 seconds since last emission using setInterval
. This would ensure a maximum window of 2 seconds per message (but it can be much less)
For example:
const observable = new Observable(function subscribe(subscriber) {
var interval;
function resetInterval() {
clearInterval(interval);
interval = setInterval(() => subscriber.next([]), 2_000);
}
resetInterval();
const subject = webSocket("ws://localhost:8081");
subject.subscribe(
msg => { subscriber.next(msg); resetInterval(); }
err => subscriber.error
() => console.log('complete') // Called when connection is closed (for whatever reason).
);
});
Upvotes: 1