Woods Geroge
Woods Geroge

Reputation: 101

how to buffer the latest value until another value arrive in another sequence by rxjs?

I am trying to use rxjs in my project. I have following sequences, what I expected is that the 1rd sequence will only be handled after a value arrive in another sequence, and only the latest value in the 1st sequence will be reserved. Any suggestion for it?

s1$ |a---b----c-

s2$ |------o----

expected result:

s3$ |------b--c-

Upvotes: 7

Views: 1079

Answers (3)

Gabriel Vergnaud
Gabriel Vergnaud

Reputation: 21

I guess I would do this using a ReplaySubject:

const subject$ = new Rx.ReplaySubject(1)

const one$ = Rx.Observable.interval(1000) 
const two$ = Rx.Observable.interval(2500)

one$.subscribe(subject$)

const three$ = two$
  .take(1)
  .flatMap(() => subject$)

// one$   |----0---1---2---3---4---
// two$   |----------0---------1---
// three$ |----------1-2---3---4---

Upvotes: 2

martin
martin

Reputation: 96999

I'd combine sample() that is already very similar to what you need and skipUntil().

const start = Scheduler.async.now();
const trigger = new Subject();

const source = Observable
    .timer(0, 1000)
    .share();

Observable.merge(source.sample(trigger).take(1), source.skipUntil(trigger))
    .subscribe(val => console.log(Scheduler.async.now() - start, val));

setTimeout(() => {
    trigger.next();
}, 2500);

This will output numbers starting with 2.

source  0-----1-----2-----3-----4
trigger ---------------X---------
output  ---------------2--3-----4

Console output with timestamps:

2535 2
3021 3
4024 4
5028 5

Alternatively you could use switchMap() and ReplaySubject but it's probably not as obvious as the previous example and you need two Subjects.

const start = Scheduler.async.now();
const trigger = new Subject();

const source = Observable
    .timer(0, 1000)
    .share();

const replayedSubject = new ReplaySubject(1);
source.subscribe(replayedSubject);

trigger
    .switchMap(() => replayedSubject)
    .subscribe(val => console.log(Scheduler.async.now() - start, val));

setTimeout(() => {
    trigger.next();
}, 2500);

The output is exactly the same.

Upvotes: 3

Julia Passynkova
Julia Passynkova

Reputation: 17909

last + takeUntil will work

here is an example:

let one$ = Rx.Observable.interval(1000);
let two$ = Rx.Observable.timer(5000, 1000).mapTo('stop');

one$
  .takeUntil(two$)
  .last()
  .subscribe(
     x=>console.log(x),
     err =>console.error(err),
     ()=>console.log('done')
  );

Upvotes: -1

Related Questions