Reputation: 34071
I have following code snippet:
const source$ = Rx.Observable.from([1,2,3,4])
.filter(x => x % 2)
.map(x => x * x)
.share();
source$.subscribe(x => console.log(`Stream 1 ${x}`));
source$.subscribe(x => console.log(`Stream 2 ${x}`));
But I am excepting shared results like:
"Stream 1 1"
"Stream 2 1"
"Stream 1 9"
"Stream 2 9"
Why the result does not get shared?
Upvotes: 0
Views: 38
Reputation: 96891
This is because you're using a cold Observable (http://reactivex.io/documentation/observable.html).
When you subscribe for the first time it takes the refCount()
operator and subscribes to its source Observable which is the Observable.from()
. This all happens synchronously so it emits all its values the the subscriber and then emits complete which makes the refCount()
unsubscribe from the source because there're no other Observers.
Then you subscribe with the second Observer and this all happens again.
If you wanted to achieve your expected result that you could use just publish()
to turn the source into a Connectable observable and call connect()
manually.
const source$ = Rx.Observable.from([1,2,3,4])
.filter(x => x % 2)
.map(x => x * x)
.publish();
source$.subscribe(x => console.log(`Stream 1 ${x}`));
source$.subscribe(x => console.log(`Stream 2 ${x}`));
source$.connect();
See live demo: https://jsbin.com/waraqi/2/edit?js,console
Upvotes: 4