Reputation: 13198
Warning: RxJS newb here.
Here is my challenge:
onUnlink$
observable emits...onAdd$
observable, for a maximum of 1 second (I'll call this partition onAddBuffer$
).doc$
observable) to fetch a model we'll use to match against one of the onAdd$
valuesonAddBuffer$
observable matches the doc$
value, do not emitonAddBuffer$
observable matches the doc$
value, or if the onAddBuffer$
observable never emits, emit the doc$
valueThis was my best guess:
// for starters, concatMap doesn't seem right -- I want a whole new stream
const docsToRemove$ = onUnlink$.concatMap( unlinkValue => {
const doc$ = Rx.Observable.fromPromise( db.File.findOne({ unlinkValue }) )
const onAddBuffer$ = onAdd$
.buffer( doc$ ) // capture events while fetching from db -- not sure about this
.takeUntil( Rx.Observable.timer(1000) );
// if there is a match, emit nothing. otherwise wait 1 second and emit doc
return doc$.switchMap( doc =>
Rx.Observable.race(
onAddBuffer$.single( added => doc.attr === added.attr ).mapTo( Rx.Observable.empty() ),
Rx.Observable.timer( 1000 ).mapTo( doc )
)
);
});
docsToRemove$.subscribe( doc => {
// should only ever be invoked (with doc -- the doc$ value) 1 second
// after `onUnlink$` emits, when there are no matching `onAdd$`
// values within that 1 second window.
})
This always emits EmptyObservable
. Maybe it's because single
appears to emit undefined
when there is no match, and I'm expecting it not to emit at all when there is no match? The same thing happens with find
.
If I change single
to filter
, nothing ever emits.
FYI: This is a rename scenario with file system events -- if an add
event follows within 1 second of an unlink
event and the emitted file hashes match, do nothing because it's a rename
. Otherwise it's a true unlink
and it should emit the database doc to be removed.
Upvotes: 4
Views: 1207
Reputation: 96979
This is my guess how you could do this:
onUnlink$.concatMap(unlinkValue => {
const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })).share();
const bufferDuration$ = Rx.Observable.race(Rx.Observable.timer(1000), doc$);
const onAddBuffer$ = onAdd$.buffer(bufferDuration$);
return Observable.forkJoin(onAddBuffer$, doc$)
.map(([buffer, docResponse]) => { /* whatever logic you need here */ });
});
The single()
operator is a little tricky because it emits the item that matches the predicate function only after the source Observable completes (or emits an error when there're two items or no matching items).
The race()
is tricky as well. If one of the source Observables completes and doesn't emit any value race()
will just complete and not emit anything. I reported this some time ago and this is the correct behavior, see https://github.com/ReactiveX/rxjs/issues/2641.
I guess this is what went wrong in your code.
Also note that .mapTo(Rx.Observable.empty())
will map each value into an instance of Observable. If you wanted to ignore all values you can use filter(() => false)
or the ignoreElements()
operator.
Upvotes: 3