Reputation: 18665
I need to create a new instance operator on streams with the following characteristics
Signature
Rx.Observable.prototype.scan_with_reset(accumulator, seed$)
where :
Arguments
accumulator (Function)
: An accumulator function to be invoked on each element.
seed$ (Observable)
: An observable whose values will be used to restart the accumulator function. The accumulator function has the following signature function accumulator_fn(accumulator_state, source_value)
. I want the value in seed$
to reset accumulator_state
to the seed
value and emit the seed
value.
Returns
(Observable) : An observable sequence which results from the comonadic bind operation (whatever that means, I am copying Rxjs
documentation here). Vs. the normal scan
operator, what happens here is that when the accumulator function is 'restarted' from the seed value emitted by the seed$
observable, that seed value is emitted, and the next value to be emitted by the scan_with_reset
operator will be accumulator_fn(seed, source_value)
Example of use :
var seed$ = Rx.Observable.fromEvent(document, 'keydown')
.map(function(ev){return ev.keyCode})
.startWith(0);
var result$ = counter$.scan_with_reset(seed$,
function accumulator_fn (acc, counter) {return acc+counter});
The following diagrams should explain more in details the expected results:
seed : 0---------13--------27------------
counter : -1--5--2----6---2-----4---1---3---
result : 0-1--6--8-13-19--21-27-31--32--35-
My initial attempt to do this was to modify the accumulator_fn
to have the seed$
modify a variable that would in the scope of accumulator_fn
so I can detect changes in the function itself.
I pursue two goals here:
I had a look at scan
source code : https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/scan.js
but I am not sure where to go from there.
Does anybody has any experience in creating Rxjs stream operators? What are the conventions to follow and traps to avoid? Are there any examples of custom-made operators that I could look at? How would you go about implementing this particular one?
[UPDATE] : Some test code for the accepted answer
var seed$ = Rx.Observable.fromEvent(document, 'keydown')
.map(function(ev){return ev.keyCode})
.startWith(0);
var counter$ = Rx.Observable.fromEvent(document, 'mousemove')
.map(function(ev){return 1});
var result$ = counter$.scanWithReset(seed$,
function accumulator_fn (acc, counter) {return acc+counter});
var s = function (x) {console.log("value: ", x)};
var disposable = result$.subscribe(s)
Moving the mouse should show a value increase by 1, and pressing a key should restart the counter with the value of the key pressed.
Upvotes: 3
Views: 2023
Reputation: 18663
As a general case when creating operators it is generally easiest to use the Observable.create
method which essentially defines how your Observable
should behave when it is subscribed to or just wrap an existing set of operators ala share
.
When you get more into performance there are some other considerations (Observable.create
is not terribly efficient at scale) and you could look into creating a custom Observable
like map.
For your case I would recommend the former for right now. I would think of your problem really as several independent streams that we would like to flatten into a single stream. Each new stream will start when reset is triggered. This is really sounding an awful lot like flatMap
to me:
Rx.Observable.prototype.scanWithReset = function ($reset, accum, seed) {
var source = this;
//Creates a new Observable
return Rx.Observable.create(function (observer) {
//We will be reusing this source so we want to make sure it is shared
var p = source.publish();
var r = $reset
//Make sure the seed is added first
.startWith(seed)
//This will switch to a new sequence with the associated value
//every time $reset fires
.flatMapLatest(function (resetValue) {
//Perform the scan with the latest value
return source.scan(accum, resetValue);
});
//Make sure every thing gets cleaned up
return new Rx.CompositeDisposable(
r.subscribe(observer),
//We are ready to start receiving from our source
p.connect());
});
}
Upvotes: 3