Reputation: 384
I have a question about rxjs in the context of nestjs CQRS sagas.
Suppose I have a scenario where there is two events being published one after another one. One sets the value and the other one unsets it. I need to be able to listen to the event which sets the value for 3 seconds and perform some action if another event has not been published in the meantime.
Here is some code for starters:
valuePersisted = (events$: Observable<any>): Observable<ICommand> => {
return events$
.pipe(
ofType(ValueHasBeenSet),
map(event => {
return new SomeCommand();
}),
);
}
I need to listen to ValueHasBeenUnset
event somehow and cancel out of the stream in case this event was received within some time.
EDIT
I just realized that events ValueHasBeenSet
and ValueHasBeenUnset
can have different value types to be set and unset and code should distinguish that. For example both events have a property called type
and its value can be 'blue' | 'yellow'
. Is there a way to preserve the logic per event type keeping only two generic events ValueHasBeenSet
and ValueHasBeenUnset
?
Upvotes: 1
Views: 208
Reputation: 6424
Consider implementing it in the following way:
return events$
.pipe(
ofType(ValueHasBeenSet), // <- listen for ValueHasBeenSet event
switchMap(x => { // <- use switchMap to return a timer
return timer(3000).pipe(
takeUntil(events$.pipe(ofType(ValueHasBeenUnset))), // <- unsubscribe from the timer on ValueHasBeenUnset event
map(event => {
return new SomeCommand(); // <- map to SomeCommand once 3 seconds have passed
})
)
})
Upvotes: 2