softshipper
softshipper

Reputation: 34071

How does the mergeAll works?

I am trying to figure out, how mergeAll works and created examples:

const clicks = Rx.Observable.interval(4000).map(()=> "first");
const higherOrder = clicks.map((ev) => Rx.Observable.interval(1000).map(() => "inner").take(10));
const firstOrder = higherOrder.mergeAll();
firstOrder.subscribe(x => console.log(x));

the output it always inner and first never outputted. After calling mergeAll() the clicks observable is no more relevant?

On more example:

const input = document.getElementById("window");

const clicks = Rx.Observable.fromEvent(input, 'keyup').map(() => "Hello");
const interval = Rx.Observable.interval(4000);
const result = clicks.window(interval)
    .map(win => {
        return win.take(1);
    })
    .mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log("Result " + x));

on subscribe, I've got the result from outer observable "Result Hello" not the inner observable. What kind of role plays mergeAll in this case?
Why the win variable is an instance observable not Hello?

Upvotes: 2

Views: 4888

Answers (2)

Mehul Solanki
Mehul Solanki

Reputation: 21

you should use switchMap.

firstOrder.switchMap(x => console.log(x)).subscribe( value => console.log(value));

or

result.switchMap(x => console.log("Result " + x)).subscribe( value => console.log(value));

switch expects a stream of Observables, when it get an Observable pushed onto it’s input stream it unsubscribes from any previous Observables and subscribes to the new one and then emits any values from that Observable onto it’s output stream.

Upvotes: 1

Ingo Bürk
Ingo Bürk

Reputation: 20033

After calling mergeAll() the clicks observable is no more relevant?

Correct. You map each individual click to a stream of "inner" events. mergeAll simply merges those streams together. The click event lives in this resulting stream only very faintly as the point in time where a specific merged stream starts. It becomes a bit more clear this way:

const clicks$ = Rx.Observable.interval(1000);
const higherOrder$ = clicks$.map(click => Rx.Observable.interval(500)
  .map(counter => `${click}–${counter}`)
);
higherOrder$.mergeAll().subscribe(console.log);

The documentation and its marble diagram might also help you understand:

enter image description here

Upvotes: 7

Related Questions