Reputation: 408
I'm struggling with a specific scenario for which I was hoping to use RxJS :
There are two observables which should trigger in sequence for a value to be registered :
An observable representing a user pressing a button
An observable representing a device returning it's next value
After the user presses the button, it should wait until the device returns it's next value, then this value should be captured and processed. After that, it should again wait for the same sequence of events.
I found that it works once if I use concat, and I complete the observables upon receiving the button action and the device's value.
However, after the observables complete I can't use them anymore for subsequent actions.
I also found that I could use zip
, which would trigger after both observables return their next() , but then the order isn't guaranteed ( it won't return the next value after the button was pressed, it will emit the value that was emitted in the meantime).
What would be the best-practice solution for this case ?
import { Observable, Subject, forkJoin } from 'rxjs';
import { concat, repeat, switchMap } from 'rxjs/operators';
let user: Subject<any> = new Subject<any>();
let device: Subject<number> = new Subject<number>();
user.pipe(concat(device)).subscribe(() => {
console.log("both called");
});
setInterval(() => {
setTimeout(() => {
user.complete();
console.log("user action executed");
}, 2000);
setTimeout(() => {
device.next(new Date().getSeconds());
console.log("device value sent");
device.complete();
}, 5000);
}, 10000);
Upvotes: 0
Views: 685
Reputation: 5364
exhaustMap
will let you listen to another stream while ignoring consequent events from the source stream:
const { fromEvent, timer, Subject } = rxjs;
const { exhaustMap, take } = rxjs.operators;
// mock device$ will output every 3 seconds
const device$ = new Subject();
timer(0, 3000).subscribe(device$);
device$.subscribe(console.log); // output device ticking
// listen to button clicks
const btn = document.getElementById('btn');
const click$ = fromEvent(btn, 'click');
click$.pipe(
// when a button is clicked -- we start listening
// for the next event on the device$
// until then we don't react to click$
exhaustMap(() => device$.pipe(take(1)))
).subscribe(value => {
console.log('Clicked at ' + value);
});
<script src="https://unpkg.com/[email protected]/bundles/rxjs.umd.min.js"></script>
<button id="btn">Clicker</button>
Heres an illustration of what's going on
Play with the exhaustMap
operator here.
Hope this helps
Upvotes: 1
Reputation: 21658
You can switchMap and return another observable.
const { of, fromEvent } = rxjs;
const { switchMap, delay } = rxjs.operators;
fromEvent(document.getElementById('btn'), 'click').pipe(
switchMap(e => of('some value').pipe(delay(1000)))
)
.subscribe(val => { console.log(val) });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>
<button id="btn">Click</button>
Upvotes: 0