embarq
embarq

Reputation: 335

Combining/merging observables

Suppose I'm developing a chat app. I have observable threads$ that emits array of threads every n seconds, observable offline$ that notifies when a thread became offline, observable online$ that notifies when a thread became online:

enum ConnectionStatus { Offline = 0, Online }

interface Thread {
    id: string;
    status: ConnectionStatus
}

const threads$ = Observable
    .interval(n)
    .switchMap(() => Observable.create((observer: Observer<Array<Thread>>) =>
        getThreads((threads: Array<Thread>) => observer.next(threads))));

const online$ = Observable.create((observer: Observer<Thread>) =>
    onOnline((threadId: string) => observer.next({
        id: threadId,
        status: ConnectionStatus.Online
    })));

const offline$ = Observable.create((observer: Observer<Thread>) =>
    onOffline((threadId: string) => observer.next({
        id: threadId,
        status: ConnectionStatus.Offline
    })));

I want to combine these streams following this rule: threads$ should emit array every n seconds, but whenever online$ or offline$ emits, I want to grab latest value(Array<Threads>) of threads$ and map it by changing status of one thread and emit mapped collection immediately.

I've lost track with Rx's combineLatest, mergeMap, zip and similar, so I would appreciate if someone could help me to implement combining in this case(in more of a Rx-way)

Upvotes: 3

Views: 521

Answers (2)

martin
martin

Reputation: 96979

I think you could make it like this using multicast():

const stop$ = Observable.merge(online$, offline$);
threads$
    .multicast(new Subject(), obs => Observable.merge(obs, obs.takeUntil(stop$).takeLast(1).map(...)))
    .subscribe(...);

I obviously didn't test it but maybe it'll push you the right direction.

Upvotes: 1

Jon G St&#248;dle
Jon G St&#248;dle

Reputation: 3904

This should emit an Array<Thread> every time threads$ emits and immediately when online$ and offline$ emits.

const threadUpdate$ = Observable.merge(
    threads$,
    Observable.merge(online$, offline$)
        .withLatestFrom(threads$,
            (thread, threads) => threads.map(t => {
                if(t.id === thread.id) {
                    t.status = thread.status
                }
            })));

Note that threads$ will continue to emit and might even emit at, potentially, the same time as the combined online$/offline$ stream.

Upvotes: 0

Related Questions