glortho
glortho

Reputation: 13198

RxJS: How to combine multiple nested observables with buffer

Warning: RxJS newb here.

Here is my challenge:

  1. When an onUnlink$ observable emits...
  2. Immediately start capturing values from an onAdd$ observable, for a maximum of 1 second (I'll call this partition onAddBuffer$).
  3. Query a database (creating a doc$ observable) to fetch a model we'll use to match against one of the onAdd$ values
  4. If one of the values from the onAddBuffer$ observable matches the doc$ value, do not emit
  5. If none of the values from the onAddBuffer$ observable matches the doc$ value, or if the onAddBuffer$ observable never emits, emit the doc$ value

This was my best guess:

// for starters, concatMap doesn't seem right -- I want a whole new stream
const docsToRemove$ = onUnlink$.concatMap( unlinkValue => {

  const doc$ = Rx.Observable.fromPromise( db.File.findOne({ unlinkValue }) )

  const onAddBuffer$ = onAdd$
    .buffer( doc$ ) // capture events while fetching from db -- not sure about this
    .takeUntil( Rx.Observable.timer(1000) );

  // if there is a match, emit nothing. otherwise wait 1 second and emit doc
  return doc$.switchMap( doc =>
    Rx.Observable.race( 
      onAddBuffer$.single( added => doc.attr === added.attr ).mapTo( Rx.Observable.empty() ),
      Rx.Observable.timer( 1000 ).mapTo( doc )
    )
  );
});

docsToRemove$.subscribe( doc => {
  // should only ever be invoked (with doc -- the doc$ value) 1 second
  // after `onUnlink$` emits, when there are no matching `onAdd$`
  // values within that 1 second window.
})

This always emits EmptyObservable. Maybe it's because single appears to emit undefined when there is no match, and I'm expecting it not to emit at all when there is no match? The same thing happens with find.

If I change single to filter, nothing ever emits.

FYI: This is a rename scenario with file system events -- if an add event follows within 1 second of an unlink event and the emitted file hashes match, do nothing because it's a rename. Otherwise it's a true unlink and it should emit the database doc to be removed.

Upvotes: 4

Views: 1207

Answers (1)

martin
martin

Reputation: 96979

This is my guess how you could do this:

onUnlink$.concatMap(unlinkValue => {
  const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })).share();
  const bufferDuration$ = Rx.Observable.race(Rx.Observable.timer(1000), doc$);
  const onAddBuffer$ = onAdd$.buffer(bufferDuration$);

  return Observable.forkJoin(onAddBuffer$, doc$)
    .map(([buffer, docResponse]) => { /* whatever logic you need here */ });
});

The single() operator is a little tricky because it emits the item that matches the predicate function only after the source Observable completes (or emits an error when there're two items or no matching items).

The race() is tricky as well. If one of the source Observables completes and doesn't emit any value race() will just complete and not emit anything. I reported this some time ago and this is the correct behavior, see https://github.com/ReactiveX/rxjs/issues/2641.
I guess this is what went wrong in your code.

Also note that .mapTo(Rx.Observable.empty()) will map each value into an instance of Observable. If you wanted to ignore all values you can use filter(() => false) or the ignoreElements() operator.

Upvotes: 3

Related Questions