gargantuan
gargantuan

Reputation: 8944

Can I manipulate an observable with multiple events from another observable?

Let's say I have an array of validation messages that map to input fields. Let's assume that we get these validation messages from a server.

I also have a stream of change events that map to input fields.

Whenever I get a change event for a given field I want to

  1. See if there's a corresponding validation message for that field
  2. Remove the validation message

Here's an example which hopefully illustrates why the usual suspects (combineLatest, withLatestFrom) don't get me over the line.

validating filtering stream

The issue with combineLatest is: The 4th change event will filter the 2nd validation array.

The issue with withLatestFrom is: The third change event will undo the work of the second change event.

Does anyone have any tips? Am I barking up the wrong tree? Have I over simplified the problem? Is there an RxJS method that's perfect for this?

Upvotes: 0

Views: 1055

Answers (3)

hellosmithy
hellosmithy

Reputation: 279

I've adapted the above jsfiddle to show the solution I came up with.

jsfiddle: http://jsfiddle.net/hellosmithy/7kcoud2c/2/

Essentially this version uses Rx.Observable.window to chunk up the changeEvents$ stream based on new events coming from the validation$ stream.

var filteredValidation$ = validation$
    .merge(
        changeEvent$
        .window(validation$)
        .flatMapLatest(changeWindow => {
            return changeWindow.scan((acc, next) => acc.add(next), new Set());
        )
        .withLatestFrom(validation$, (filterSet, errs) => {
            return errs.filter(err => !filterSet.has(err.key));
        })
    )
    .distinctUntilChanged()

Upvotes: 1

user3743222
user3743222

Reputation: 18665

Here is another answer with a different technique. The idea is similar (using scan to remember previous value, but we make the current value from a reducing function which depends on the source emitting a value).

jsfiddle : http://jsfiddle.net/guc1tm4m/1

var ta_validation = document.getElementById('ta_validation');
var ta_change = document.getElementById('ta_change');
var ta_result = document.getElementById('ta_result');

function emits ( who, who_ ) {return function ( x ) {
 who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n");
};}

function isIn (changeEvent, prev){
  return !!prev[0][changeEvent];
}

function removeChangeEvent(changeEvent, prev) {
delete prev[0][changeEvent];
  return prev;
}

 var validation$ = Rx.Observable
 .fromEvent(document.getElementById('validation'), 'click')
 .map(function(_, index){
   var values=[
     [{a:'req', b:'req', c:'req'}],
           [{d:'req', e:'req'}],
           [{x:'req', y:'req', z:'req'}]
   ];
   return values[index % values.length];
   })
 .do(emits(ta_validation, 'validation'))
 .share();

var changeEvent$ = Rx.Observable
 .fromEvent(document.getElementById('change'), 'click')
 .map(function(_, index){
   console.log('ssds', index)
   var values=['x', 'a', 'b', 'z'];
   return values[index % values.length];
   })
 .do(emits(ta_change, 'change'))
 .share();

var filteredValidation$ = Rx.Observable
  .merge(validation$.map(function (x) {return function (  ) {return x;}}),
         changeEvent$.map(function ( changeEvent ) {
           return function ( prev ) {
             return isIn(changeEvent, prev) ? removeChangeEvent(changeEvent, prev) : prev;
           };
         }))
  .scan(function ( prev, f ) {
          return f(prev);
        }, {})
  .do(emits(ta_result, 'result'));

validation$.subscribe(function(){    });
changeEvent$.subscribe(function(){    });
filteredValidation$.subscribe(function(){    });

Upvotes: 1

user3743222
user3743222

Reputation: 18665

Every time you have a validation message, it seems you are starting anew the processing of the change events. In that case, you could try something like :

filteredValidation$ = validation$.flatMapLatest(function ( aObjs ) {
  return changeEvent$.scan(function ( acc, changeEvent ) {
    return isIn(changeEvent, acc) ? removeChangeEvent(changeEvent, acc) : acc;
  }, aObjs);
});

The isIn and removeChangeEvent you have to write according to the particular shape of change_event but that's pretty easy.

  • every validation message will start a new stream
  • that stream will have a state which is represented by the accumulator acc in the scan function, and that state itself is the latest filtered validation object.

I haven't tested, but hopefully it works.

BTW : I LOVE YOUR CHART, it is a perfect illustration of the processing you want to achieve.

Upvotes: 1

Related Questions