sennett
sennett

Reputation: 8424

How can I filter a stream only when it changes?

New to Reactive Programming. I have one stream, a scroll stream, bound to a domNode, and then some other streams subscribing through a filter:

var element = document.getElementById('scrollableElement');
var sourceStream = Rx.Observable.fromEvent(element, 'scroll').map(function(e){
  return e.srcElement.scrollTop;
});

var inTheOneHundreds = sourceStream
  .filter(function (x, idx, obs) {
    return x >= 100 && x < 200;
  });

var inTheTwoHundreds = sourceStream
  .filter(function (x, idx, obs) {
    return x >= 200 && x < 300;
  });

inTheOneHundreds.subscribe(function(value){
  console.log('one hundreds ' + value);
});

inTheTwoHundreds.subscribe(function(value){
  console.log('two hundreds ' + value);
});

This outputs like:

one hundreds 193
one hundreds 196
one hundreds 199
two hundreds 201
two hundreds 204

You can see this here: http://jsbin.com/zedazapato/edit?js,console,output

I want these new streams to output when the hundreds change (from true to false), rather than outputting repeatedly:

one hundreds 199
two hundreds 201
one hundreds 170
two hundreds 270
one hundreds 103
two hundreds 200
one hundreds 156

I have tried using Observable.distinctUntilChanged but it does not seem to behave like I expect (it seems to output the same thing): http://jsbin.com/gibefagiri/1/edit?js,console,output

Where am I going wrong?

Upvotes: 1

Views: 183

Answers (1)

Tamas Hegedus
Tamas Hegedus

Reputation: 29906

You have multiple options.

This will make streams from a predicate, emitting an item when the predicate becomes true:

var element = document.getElementById('scrollableElement');
var sourceStream = Rx.Observable.fromEvent(element, 'scroll').map(function(e){
  return e.srcElement.scrollTop;
});

function whenBecomesTrue(stream, selector) {
  return stream.distinctUntilChanged(selector).filter(selector);
}

var inTheOneHundreds = whenBecomesTrue(sourceStream, function (x, idx, obs) {
  return x >= 100 && x < 200;
});

var inTheTwoHundreds = whenBecomesTrue(sourceStream, function (x, idx, obs) {
  return x >= 200 && x < 300;
});

inTheOneHundreds.subscribe(function(value){
  console.log('one hundreds ' + value);
});

inTheTwoHundreds.subscribe(function(value){
  console.log('two hundreds ' + value);
});

Or you can first emit page changes:

var element = document.getElementById('scrollableElement');
var sourceStream = Rx.Observable.fromEvent(element, 'scroll').map(function(e){
  return e.srcElement.scrollTop;
});

function pageOf(x) {
  return Math.floor(x / 100);
}

var pageChanges = sourceStream.distinctUntilChanged(pageOf);

var inTheOneHundreds = pageChanges.filter(function (x, idx, obs) {
  return pageOf(x) === 1;
});

var inTheTwoHundreds = pageChanges.filter(function (x, idx, obs) {
  return pageOf(x) === 2;
});

inTheOneHundreds.subscribe(function(value){
  console.log('one hundreds ' + value);
});

inTheTwoHundreds.subscribe(function(value){
  console.log('two hundreds ' + value);
});

The problem with your approach using distinctUntilChanged is it actually distincts by the original scrollTop value (which always differs from the previous value), not on the boolean value indicating if the number is in the given range.

Upvotes: 1

Related Questions