alecf
alecf

Reputation: 603

How to join streams based on a key

This is for redux-observable but I think the general pattern is pretty generic to rxjs

I have a stream of events (from redux-observable, these are redux actions) and I'm specifically looking to pair up two differnt types of events for the same "resource" - "resource set active" and "resource loaded" - and emit a new event when these events "match up". The problem is these can come in in any order, for any resources, and can be fired multiple times. You might set something active before it is loaded, or load something before it is set active, and other resources might get loaded or set active in between.

What I want is a stream of "this resource, which is now loaded, is now active" - which also means that once a resource is loaded, it can be considered forever loaded.

If these events were not keyed by a resource id, then it would be very simple:

First I would split them up by type:

const setActive$ = action$.filter(a => a.type == 'set_active');
const load = action$.filter(a => a.type == 'loaded');

In a simple case where there is no keying, I'd say something like:

const readyevent$ = setActive$.withLatestFrom(loaded$)

then readyevent$ is just a stream of set_active events where there has been at least one loaded event.

But my problem is that the set_active and loaded events are each keyed by a resource id, and for reasons beyond my control, the property to identify the resource is different in the two events. So this becomes something like:

const setActive$ = action$.filter(a => a.type === 'set_active').groupBy(a => a.activeResourceId);
const loaded$ = action$.filter(a => a.type === 'loaded').groupBy(a => a.id);

but from this point I can't really figure out how to then "re-join " these two streams-of-grouped-observables on the same key, so that I can emit a stream of withLatestFrom actions.

Upvotes: 2

Views: 760

Answers (2)

bygrace
bygrace

Reputation: 5988

I believe this does what you are describing:

const action$ = Rx.Observable.from([
  { activeResourceId: 1, type: 'set_active' },
  { id: 2, type: 'loaded' },
  { id: 1, type: 'random' },
  { id: 1, type: 'loaded' },
  { activeResourceId: 2, type: 'set_active' }
]).zip(
  Rx.Observable.interval(500),
  (x) => x
).do((x) => { console.log('action', x); }).share();

const setActiveAction$ = action$.filter(a => a.type === 'set_active')
  .map(a => a.activeResourceId)
  .distinct();
const allActive$ = setActiveAction$.scan((acc, curr) => [...acc, curr], []);
  
const loadedAction$ = action$.filter(a => a.type === 'loaded')
  .map(a => a.id)
  .distinct();
const allLoaded$ = loadedAction$.scan((acc, curr) => [...acc, curr], []);
  
Rx.Observable.merge(
  setActiveAction$
    .withLatestFrom(allLoaded$)
    .filter(([activeId, loaded]) => loaded.includes(activeId)),
  loadedAction$
    .withLatestFrom(allActive$)
    .filter(([loadedId, active]) => active.includes(loadedId))
).map(([id]) => id)
 .distinct()
 .subscribe((id) => { console.log(`${id} is loaded and active`); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.min.js"></script>

The basic approach is to create a distinct stream for each action type and join it with the distinct aggregate of the other. Then merge the two streams. This will emit the value when there are matching setActive and loaded events. The distinct() on the end of the merge makes it so that you will only get notified once. If you want a notification on each setActive action after the initial one then just remove that operator.

Upvotes: 1

Richard Matsen
Richard Matsen

Reputation: 23473

groupBy looks somewhat complicated to do this with, there's a key value in there but you get an Observable of Observables - so maybe a little hard to get right.

I would map the id to a common property and then use scan to combine. I use this pattern for grouping in my app.

The accumulator in the scan is an object, which is used as an associative array - each property is an id and the property value is an array of actions accumulated so far.

After the scan, we convert to an observable stream of arrays of matching actions - sort of like withLatestFrom but some arrays may not yet be complete.

The next step is to filter for those arrays we consider complete.
Since you say

where there has been at least one loaded event

I'm going to assume that the presence of two or more actions, with at least one is type 'loaded' - but it's a bit tricky to tell from your question if that is sufficient.

Finally, reset that id in the accumulator as presumably it may occur again later in the stream.

const setActive$ = action$.filter(a => a.type === 'set_active')
  .map(a => { return { id: a.activeResourceId, action: a } });

const loaded$ = action$.filter(a => a.type === 'loaded')
  .map(a => { return { id: a.id, action: a } });

const accumulator = {};
const readyevent$: Observable<action[]> = 
  Observable.merge(setActive$, loaded$)
  .scan((acc, curr) => {
    acc[curr.id] = acc[curr.id] || [];
    acc[curr.id].push(curr.action)
  }, accumulator)
  .mergeMap((grouped: {}) => Observable.from(
    Object.keys(grouped).map(key => grouped[key])
  ))
  .filter((actions: action[]) => {
    return actions.length > 1 && actions.some(a => a.type === 'loaded')
   })
  .do(actions => {
    const id = actions.find(a => a.type === 'loaded').id;
    accumulator[id] = [];
  });

Upvotes: 0

Related Questions