Evgeny Fedorenko
Evgeny Fedorenko

Reputation: 384

Question about 2 rxjs observables and a timer

I have a question about rxjs in the context of nestjs CQRS sagas.

Suppose I have a scenario where there is two events being published one after another one. One sets the value and the other one unsets it. I need to be able to listen to the event which sets the value for 3 seconds and perform some action if another event has not been published in the meantime.

Here is some code for starters:

valuePersisted = (events$: Observable<any>): Observable<ICommand> => {
return events$
  .pipe(
    ofType(ValueHasBeenSet),
    map(event => {
      return new SomeCommand();
    }),
  );
}

I need to listen to ValueHasBeenUnset event somehow and cancel out of the stream in case this event was received within some time.

EDIT

I just realized that events ValueHasBeenSet and ValueHasBeenUnset can have different value types to be set and unset and code should distinguish that. For example both events have a property called type and its value can be 'blue' | 'yellow'. Is there a way to preserve the logic per event type keeping only two generic events ValueHasBeenSet and ValueHasBeenUnset?

Upvotes: 1

Views: 208

Answers (1)

Rafi Henig
Rafi Henig

Reputation: 6424

Consider implementing it in the following way:

 return events$
  .pipe(
    ofType(ValueHasBeenSet), // <- listen for ValueHasBeenSet event
    switchMap(x => {  // <- use switchMap to return a timer 
      return timer(3000).pipe(
        takeUntil(events$.pipe(ofType(ValueHasBeenUnset))), // <- unsubscribe from the timer on ValueHasBeenUnset event
        map(event => {
          return new SomeCommand(); // <- map to SomeCommand once 3 seconds have passed
        })
      )
    })

Upvotes: 2

Related Questions