Reputation: 335
Suppose I'm developing a chat app. I have observable threads$
that emits array of threads every n seconds, observable offline$
that notifies when a thread became offline, observable online$
that notifies when a thread became online:
enum ConnectionStatus { Offline = 0, Online }
interface Thread {
id: string;
status: ConnectionStatus
}
const threads$ = Observable
.interval(n)
.switchMap(() => Observable.create((observer: Observer<Array<Thread>>) =>
getThreads((threads: Array<Thread>) => observer.next(threads))));
const online$ = Observable.create((observer: Observer<Thread>) =>
onOnline((threadId: string) => observer.next({
id: threadId,
status: ConnectionStatus.Online
})));
const offline$ = Observable.create((observer: Observer<Thread>) =>
onOffline((threadId: string) => observer.next({
id: threadId,
status: ConnectionStatus.Offline
})));
I want to combine these streams following this rule: threads$
should emit array every n seconds, but whenever online$
or offline$
emits, I want to grab latest value(Array<Threads>
) of threads$
and map it by changing status of one thread and emit mapped collection immediately.
I've lost track with Rx's combineLatest
, mergeMap
, zip
and similar, so I would appreciate if someone could help me to implement combining in this case(in more of a Rx-way)
Upvotes: 3
Views: 521
Reputation: 96979
I think you could make it like this using multicast()
:
const stop$ = Observable.merge(online$, offline$);
threads$
.multicast(new Subject(), obs => Observable.merge(obs, obs.takeUntil(stop$).takeLast(1).map(...)))
.subscribe(...);
I obviously didn't test it but maybe it'll push you the right direction.
Upvotes: 1
Reputation: 3904
This should emit an Array<Thread>
every time threads$
emits and immediately when online$
and offline$
emits.
const threadUpdate$ = Observable.merge(
threads$,
Observable.merge(online$, offline$)
.withLatestFrom(threads$,
(thread, threads) => threads.map(t => {
if(t.id === thread.id) {
t.status = thread.status
}
})));
Note that threads$
will continue to emit and might even emit at, potentially, the same time as the combined online$
/offline$
stream.
Upvotes: 0