Kenny
Kenny

Reputation: 408

How to create an observable in RxJS which waits for 2 observables ( in sequence) , then acts , and then waits for those 2 again in sequendce

I'm struggling with a specific scenario for which I was hoping to use RxJS :

There are two observables which should trigger in sequence for a value to be registered :

  1. An observable representing a user pressing a button

  2. An observable representing a device returning it's next value

After the user presses the button, it should wait until the device returns it's next value, then this value should be captured and processed. After that, it should again wait for the same sequence of events.

I found that it works once if I use concat, and I complete the observables upon receiving the button action and the device's value.

However, after the observables complete I can't use them anymore for subsequent actions.

I also found that I could use zip, which would trigger after both observables return their next() , but then the order isn't guaranteed ( it won't return the next value after the button was pressed, it will emit the value that was emitted in the meantime).

What would be the best-practice solution for this case ?

import { Observable, Subject, forkJoin } from 'rxjs';
import { concat, repeat, switchMap } from 'rxjs/operators';

let user: Subject<any> = new Subject<any>();
let device: Subject<number> = new Subject<number>();

user.pipe(concat(device)).subscribe(() => {
    console.log("both called");
});


setInterval(() => {
    setTimeout(() => {
        user.complete();
        console.log("user action executed");
    }, 2000);

    setTimeout(() => {
        device.next(new Date().getSeconds());
        console.log("device value sent");
        device.complete();
    }, 5000);
}, 10000);

Upvotes: 0

Views: 685

Answers (2)

kos
kos

Reputation: 5364

exhaustMap will let you listen to another stream while ignoring consequent events from the source stream:

const { fromEvent, timer, Subject } = rxjs;
const { exhaustMap, take } = rxjs.operators;

// mock device$ will output every 3 seconds
const device$ = new Subject();
timer(0, 3000).subscribe(device$);
device$.subscribe(console.log); // output device ticking

// listen to button clicks
const btn = document.getElementById('btn');
const click$ = fromEvent(btn, 'click');
click$.pipe(
  // when a button is clicked -- we start listening
  // for the next event on the device$
  // until then we don't react to click$
  exhaustMap(() => device$.pipe(take(1)))
).subscribe(value => {
  console.log('Clicked at ' + value);
});
<script src="https://unpkg.com/[email protected]/bundles/rxjs.umd.min.js"></script>

<button id="btn">Clicker</button>

Heres an illustration of what's going on

exhaustMap illustration

Play with the exhaustMap operator here.

Hope this helps

Upvotes: 1

Adrian Brand
Adrian Brand

Reputation: 21658

You can switchMap and return another observable.

const { of, fromEvent } = rxjs;
const { switchMap, delay } = rxjs.operators;

fromEvent(document.getElementById('btn'), 'click').pipe(
    switchMap(e => of('some value').pipe(delay(1000)))
  )
  .subscribe(val => { console.log(val) });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>
<button id="btn">Click</button>

Upvotes: 0

Related Questions