Reputation: 7802
I am learning RxJS. I have question regarding below code snippet.
var arr = [1,2,3,4,5];
var arraysource = Observable.from(arr);
arr.push(6);
var subscription = arraysource.subscribe(
x => console.log('onNext: %s', x),
e => console.log('onError: %s', e),
() => console.log('onCompleted'));
arr.push(7);
When I run above code I get following output.
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onCompleted
My question is why seventh element is not getting published which is added after subscription? Is it because input stream is cold stream and its reading items synchronously? So items added post onComplete fires will never reach to observer? Can someone please throw some light on this behavior?
Upvotes: 2
Views: 2470
Reputation: 8855
You could create your own Observable to get the functionality you're looking for
Rx 5 Beta (change next
and complete
to onNext
onCompleted
for Rx 4)
var source = Rx.Observable.create(function (observer) {
[1,2,3,4,5].forEach(item => observer.next(item));
observer.next(6);
// observer.complete() // <-- remove comment to allow observable to complete
// Any cleanup logic might go here
return function () {
console.log('disposed');
};
});
var subscription = source.subscribe(
function (x) { console.log('onNext: %s', x); },
function (e) { console.log('onError: %s', e); },
function () { console.log('onCompleted'); }
);
example http://jsbin.com/datuqoniyo/edit?js,console
http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-create
Upvotes: 2
Reputation: 18665
arraySource
is a cold source, and hence lazily instantiates its sequence of values when you subscribe to it. At that moment, your array has 6 values. And yes because of Rx.Observable.from(array)
produces its values synchronously, you will see all 6 values printed after what the observable will finish.
That said, in a real code, it is not at all a good practice to mutate a parameter which intervenes in the definition of an observable. It makes your program very difficult to reason about as the impact of that mutation will depend of a large number of things (synchronicity as here, implementation of observable operator, etc.). If you have an argument which changes value over time, then you conceptually have an observable don't you. So you could model that as an observable to make easy to manipulate and reason about.
Upvotes: 2