Reputation: 12900
I need something similar to withLatestFrom which corresponds to a diagram below:
---------A-----------------B--
-1-2-3------4------5-6-7-8----
--------[A,[123]]---------------[B,[45678]]
Is there any way to implement such behavior with RxJS?
Upvotes: 1
Views: 247
Reputation: 117175
I apologise that I'm not a javascript coder, but this works in .NET. Hopefully you can translate.
var xs = new Subject<string>();
var ys = new Subject<string>();
var qs =
xs.Publish(pxs =>
ys.Buffer(() => pxs)
.Zip(pxs, (numbers, letter) => new { letter, numbers }));
The .Publish(pxs =>
operator takes a single observable, subscribes to it only once and shares that subscription within the lambda. It prevents multiple subscriptions of the source and synchronizes the production of the values from pxs
within the lambda.
The ys.Buffer(() => pxs
takes all values of ys
and turns the values into a series of lists broken up at the point when pxs
produces values.
Finally the .Zip(...)
takes the values from pxs
and pairs them up with the lists produced by .Buffer(...)
.
qs.Subscribe(q => Console.WriteLine(q));
ys.OnNext("1");
ys.OnNext("2");
ys.OnNext("3");
xs.OnNext("A");
ys.OnNext("4");
ys.OnNext("5");
ys.OnNext("6");
ys.OnNext("7");
ys.OnNext("8");
xs.OnNext("B");
ys.OnCompleted();
xs.OnCompleted();
It produces:
Here's the translated javascript version:
var qs = xs.publish(
pxs =>
ys.buffer(() => pxs).zip(pxs, (numbers, letter) => [ letter, numbers ])
);
Upvotes: 2
Reputation: 18665
This seems to be working. It is based on the sample
operator but there are other ways to do the same.
var ta_message = document.getElementById('ta_message');
var ta_result = document.getElementById('ta_result');
function emits ( who, who_ ) {return function ( x ) {
who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n");
};}
function pushValue (arr, el){arr.push(el); return arr;}
var sourceCounter = 0;
var samplerCounter = 0;
var source$ = Rx.Observable
.fromEvent(document.getElementById('source'), 'click')
.map(function(){return sourceCounter++;})
.do(emits(ta_source, 'source'))
.share();
var sampler$ = Rx.Observable
.fromEvent(document.getElementById('sampler'), 'click')
.map(function(){return String.fromCharCode(65+samplerCounter++);})
.do(emits(ta_sampler, 'sampler'))
.share();
var bufferedSource$ = sampler$
.startWith('0')
.flatMapLatest(function (x){
console.log("x",x)
return source$.scan(pushValue,[])
})
.do(emits(ta_sampler, 'bufferedSource$'));
var sampledSource$ = bufferedSource$
.sample(sampler$)
.do(emits(ta_result, 'sampledSource'))
.withLatestFrom(sampler$,
function (bufferedValues, samplerValue){
return "" +samplerValue + bufferedValues;
})
.do(emits(ta_result, 'result'))
//sampler$.connect();
sampledSource$.subscribe(function(){});
Upvotes: 1