user3743222
user3743222

Reputation: 18665

RXjs : How to create an operator on streams : scan operator where the accumulator state can be reset through an observable

I need to create a new instance operator on streams with the following characteristics

My initial attempt to do this was to modify the accumulator_fn to have the seed$ modify a variable that would in the scope of accumulator_fn so I can detect changes in the function itself.

I pursue two goals here:

I had a look at scan source code : https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/scan.js but I am not sure where to go from there.

Does anybody has any experience in creating Rxjs stream operators? What are the conventions to follow and traps to avoid? Are there any examples of custom-made operators that I could look at? How would you go about implementing this particular one?

[UPDATE] : Some test code for the accepted answer

 var seed$ = Rx.Observable.fromEvent(document, 'keydown')
                          .map(function(ev){return ev.keyCode})
                          .startWith(0);
 var counter$ = Rx.Observable.fromEvent(document, 'mousemove')
                             .map(function(ev){return 1});
 var result$ = counter$.scanWithReset(seed$,
                                      function accumulator_fn (acc, counter) {return acc+counter});
 var s = function (x) {console.log("value: ", x)};
 var disposable = result$.subscribe(s)

Moving the mouse should show a value increase by 1, and pressing a key should restart the counter with the value of the key pressed.

Upvotes: 3

Views: 2023

Answers (1)

paulpdaniels
paulpdaniels

Reputation: 18663

As a general case when creating operators it is generally easiest to use the Observable.create method which essentially defines how your Observable should behave when it is subscribed to or just wrap an existing set of operators ala share.

When you get more into performance there are some other considerations (Observable.create is not terribly efficient at scale) and you could look into creating a custom Observable like map.

For your case I would recommend the former for right now. I would think of your problem really as several independent streams that we would like to flatten into a single stream. Each new stream will start when reset is triggered. This is really sounding an awful lot like flatMap to me:

Rx.Observable.prototype.scanWithReset = function ($reset, accum, seed) {
    var source = this;
    //Creates a new Observable
    return Rx.Observable.create(function (observer) {
        //We will be reusing this source so we want to make sure it is shared
        var p = source.publish();


        var r = $reset
            //Make sure the seed is added first
            .startWith(seed)
            //This will switch to a new sequence with the associated value
            //every time $reset fires
            .flatMapLatest(function (resetValue) {
              //Perform the scan with the latest value
              return source.scan(accum, resetValue);
        });

        //Make sure every thing gets cleaned up
        return new Rx.CompositeDisposable(
            r.subscribe(observer),
            //We are ready to start receiving from our source
            p.connect());
    });
}

Upvotes: 3

Related Questions