pkt
pkt

Reputation: 1838

Rx - Split Observable based on contents (group by until changed)

Let me give some context for the problem first. The goal would be to use Rx to handle a simple search screen with paging.

On the input side, the user can add various filtering criteria, and also change the currently displayed page (in case of larger result sets). For simplicity's sake, this can be modelled with a stream of {filter, page} pairs.

On the output side, the result data needs to be displayed in a table, and the number of pages needs to be updated. This would mean a stream of {data, count} pairs.

The logic connecting the two is the following: whenever the filter parameters change, send a 'count' query to the server to get the page count. Whenever the filter parameters OR page index change, send a 'select' query to the server to get the result data. Wait until both are available before updating the screen.

Here's a plunk that simulates this in a minimal fashion. s1/s2/s3 indicate filter parameters, p1/p2/p3 are page indexes, #s1/#s2/#s3 are 'count' responses for a filter, and something like 's2 p3' is a 'select' response for a given filter and page combination. You can see various failed approaches, commented out.

http://plnkr.co/xurmUO

It seems like the solution should be using something like groupBy or window, but neither quite produces the desired outcome. With groupBy, what you can do is create groups from the input stream, with the input items being assigned to a group based on the filter parameter. Then, you only trigger a 'count' query when a new group is created, and trigger 'select' queries for each individual item in the group.

The problem with this is that groups last until the end of the source stream, so if you repeat a filter parameter that occurred previously, a new group will not be created. Consequently, no 'count' queries will happen, either. The groupByUntil operator allows specifying an expiration duration, which almost solves everything. Then you can have the creation of a new group (due to a different filter parameter in the input) trigger the expiration of the current group. Like so:

inputs
  .groupByUntil(
    function(input) { return input.filter; },
    null,
    function(inputGroup) { 
      var filter = inputGroup.key;
      return inputs.firstOrDefault(function (input) {
        return input.filter !== filter;
      });
    }
  );

Unfortunately, it seems like this is triggered later than it should be, depending on the order the operators execute in. As a result, the group boundaries are off by one, and you end up with an extra 'count' call. I also tried using the window operator, which has a similar problem: an extra item makes it into the window before it closes.

What I would like is a variant of groupBy that only has one active group at a time, making the previous one expire when an item with a new key is received. Can you build this by combining the existing operators somehow? Or do you have to go back to Observable.create, and do this with the low-level operations?

EDIT: For reference, here's my solution using a custom operator, done with Observable.create.

Rx.Observable.prototype.splitBy = function(keySelector) {
  var source = this;
  var currentGroup = null;
  var lastKey = null;
  return Rx.Observable.create(function(observer) {
    return source.subscribe(
      function(value) {
        var key = keySelector(value);
        // Create a new group if the key changed (or this is the first value)
        if (currentGroup === null || key !== lastKey) {
          // Close previous group
          if (currentGroup !== null) {
            currentGroup.onCompleted();
          }
          lastKey = key;
          // Emit current group
          currentGroup = new Rx.Subject();
          currentGroup.key = key;
          observer.onNext(currentGroup);
        }
        currentGroup.onNext(value);
      },
      // Forward errors/completion to current group and the main stream
      function(error) {
        if (currentGroup !== null) {
          currentGroup.onError(error);
        }
        observer.onError(error);
      },
      function() {
        if (currentGroup !== null) {
          currentGroup.onCompleted();
        }
        observer.onCompleted();
      });
  });
};

Upvotes: 4

Views: 2410

Answers (1)

Brandon
Brandon

Reputation: 39182

The groupBy family of operators does not work for the reasons you have already discovered. You want flatMapLatest:

var count = inputs.flatMapLatest(function (input) {
    // some code that returns an
    // Rx observable that will eventually
    // return the count
    return rxQueryForCount(input.filter);
});
count.subscribe(...);

You'd write something similar to return the page of data.

Edit: if you want to synchronise the results of your 2 queries, then you do something like this:

var count = inputs
    .distinctUntilChanged(null, function (a, b) {
        // however you decide if your filters have changed...
        return a.filter === b.filter;
    })
    .flatMapLatest(function (input) {
        // some code that returns an
        // Rx observable that will eventually
        // return the count
        return rxQueryForCount(input.filter)
            .startWith(undefined);
    });
var data = inputs
    .flatMapLatest(function (input) {
        return rxQueryForData(input.filter, input.index)
            .startWith(undefined);
    });
var results = Rx.Observable
    .combineLatest(count, data, function (c, d) {
        if (c === undefined || d === undefined) {
            return undefined;
        }

        return { count: c, data: d };
    });

results will be undefined whenever the queries are re-running and will only ever take a value when both queries have updated results. You can use that in your UI logic, or even just add a where clause to filter out undefined values.

Upvotes: 4

Related Questions