rollcona
rollcona

Reputation: 163

Repeating/Resetting an observable

I am using rxjs to create a "channel nummber" selector for a remote control on a smart tv. The idea being that as you are entering the numbers you would see them on the screen and after you have finished entering the numbers, the user would would actually be taken to that channel.

I use two observables to achieve this:

  1. A "progress" stream that listens to all number input and emits the concatenated number string out as the numbers are inputed via the scan operator.

  2. A "completed" stream that, after n milliseconds of no number having being entered, would emit the final numeric string as completed. EG: 1-2-3 -> "123".

Here is the code that I use to try and solve this:

channelNumber:

module.exports = function (numberKeys, source, scheduler) {
    return function (completedDelay) {
        var toNumericString = function (name) {
                return numberKeys.indexOf(name).toString();
            },
            concat = function (current, numeric) {
                return current.length === 3 ? current : current + numeric;
            },
            live = createPress(source, scheduler)(numberKeys)
                .map(toNumericString)
                .scan(concat, '')
                .distinctUntilChanged(),

            completed = live.flatMapLatest(function (value) {
                return Rx.Observable.timer(completedDelay, scheduler).map(value);
            }),
            progress = live.takeUntil(completed).repeat();

        return {
            progress: progress,
            completed: completed
        };
    };
};

createPress:

module.exports = function (source, scheduler) {
    return function (keyName, throttle) {
        return source
            .filter(H.isKeyDownOf(keyName))
            .map(H.toKeyName);
    };
};

createSource:

module.exports = function (provider) {
    var createStream = function (event) {
        var filter = function (e) {
                return provider.hasCode(e.keyCode);
            },
            map = function (e) {
                return {
                    type: event,
                    name: provider.getName(e.keyCode),
                    code: e.keyCode
                };
            };
        return Rx.Observable.fromEvent(document, event)
            .filter(filter)
            .map(map);
    };

    return Rx.Observable.merge(createStream('keyup'), createStream('keydown'));
};

Interestingly the above code, under test conditions (mocking source and scheduler using Rx.TestScheduler) works as expected. But in production, when the scheduler is not passed at all and source is the result of createPress (above), the progress stream only ever emits until complete, and then never again. It's as if the repeat is completely ignored or redundant. I have no idea why.

Am I missing something here?

Upvotes: 1

Views: 2176

Answers (1)

Dorus
Dorus

Reputation: 7546

You can use Window. In this case, I would suggest WindowWithTime. You can also do more interesting things like use Window(windowBoundaries) and then pass the source with Debounce as boundary.

source
  .windowWithTime(1500)
  .flatMap(ob => ob.reduce((acc, cur) => acc + cur, ""))

Also, since our windows are closed observables, we can use Reduce to accumulate the values from the window and concat our number.


Now, this variant will close after 1,5 second. Rather, we would want to wait x seconds after the last keypress. Naïve we could do source.window(source.debounce(1000)) but now we subscribe to our source twice, that's something we want to avoid for two reasons. First we do not know is subscribing has any side effects, second we do not know the order subscriptions will receive events. That last thing isn't a problem since we use debounce that already adds a delay after the last keypress, but still something to consider.

The solution is to publish our source. In order to keep the publish inside the sequence, we wrap it into observable.create.

Rx.Observable.create(observer => {
    var ob = source.publish();
    return new Rx.CompositeDisposable(
      ob.window(ob.debounce(1000))
        .subscribe(observer),
      ob.connect());
}).flatMap(ob => ob.reduce((acc, cur) => acc + cur, ""))

Edit: Or use publish like this:

source.publish(ob => ob.window(ob.debounce(1000)))
    .flatMap(ob => ob.reduce((acc, cur) => acc + cur, ""))

Upvotes: 1

Related Questions