kharandziuk
kharandziuk

Reputation: 12900

buffered withLatestFrom

I need something similar to withLatestFrom which corresponds to a diagram below:

---------A-----------------B--
-1-2-3------4------5-6-7-8----

--------[A,[123]]---------------[B,[45678]]

Is there any way to implement such behavior with RxJS?

Upvotes: 1

Views: 247

Answers (2)

Enigmativity
Enigmativity

Reputation: 117175

I apologise that I'm not a javascript coder, but this works in .NET. Hopefully you can translate.

var xs = new Subject<string>();
var ys = new Subject<string>();

var qs =
    xs.Publish(pxs =>
        ys.Buffer(() => pxs)
            .Zip(pxs, (numbers, letter) => new { letter, numbers }));

The .Publish(pxs => operator takes a single observable, subscribes to it only once and shares that subscription within the lambda. It prevents multiple subscriptions of the source and synchronizes the production of the values from pxs within the lambda.

The ys.Buffer(() => pxs takes all values of ys and turns the values into a series of lists broken up at the point when pxs produces values.

Finally the .Zip(...) takes the values from pxs and pairs them up with the lists produced by .Buffer(...).

qs.Subscribe(q => Console.WriteLine(q));

ys.OnNext("1");
ys.OnNext("2");
ys.OnNext("3");
xs.OnNext("A");
ys.OnNext("4");
ys.OnNext("5");
ys.OnNext("6");
ys.OnNext("7");
ys.OnNext("8");
xs.OnNext("B");

ys.OnCompleted();
xs.OnCompleted();

It produces:

result


Here's the translated javascript version:

var qs = xs.publish(
    pxs =>
      ys.buffer(() => pxs).zip(pxs, (numbers, letter) => [ letter, numbers ])
);

Upvotes: 2

user3743222
user3743222

Reputation: 18665

This seems to be working. It is based on the sample operator but there are other ways to do the same.

var ta_message = document.getElementById('ta_message');
var ta_result = document.getElementById('ta_result');

function emits ( who, who_ ) {return function ( x ) {
 who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n");
};}

function pushValue (arr, el){arr.push(el); return arr;}

var sourceCounter = 0;
var samplerCounter = 0;
var source$ = Rx.Observable
  .fromEvent(document.getElementById('source'), 'click')
  .map(function(){return sourceCounter++;})
  .do(emits(ta_source, 'source'))
  .share();
var sampler$ = Rx.Observable
  .fromEvent(document.getElementById('sampler'), 'click')
  .map(function(){return String.fromCharCode(65+samplerCounter++);})
  .do(emits(ta_sampler, 'sampler'))
  .share();

var bufferedSource$ = sampler$
  .startWith('0')
  .flatMapLatest(function (x){
    console.log("x",x)
    return source$.scan(pushValue,[])
  })  
  .do(emits(ta_sampler, 'bufferedSource$'));

var sampledSource$ = bufferedSource$
  .sample(sampler$)
  .do(emits(ta_result, 'sampledSource'))
  .withLatestFrom(sampler$, 
        function (bufferedValues, samplerValue){
          return "" +samplerValue + bufferedValues;
        })
  .do(emits(ta_result, 'result'))

//sampler$.connect();
sampledSource$.subscribe(function(){});

Upvotes: 1

Related Questions