Pankaj Kapare
Pankaj Kapare

Reputation: 7802

RxJS adding new item to array stream not getting published to subscriber

I am learning RxJS. I have question regarding below code snippet.

var arr = [1,2,3,4,5];
var arraysource = Observable.from(arr);

arr.push(6);
var subscription = arraysource.subscribe(
     x => console.log('onNext: %s', x),
     e => console.log('onError: %s', e),
     () => console.log('onCompleted'));

arr.push(7);

When I run above code I get following output.

onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onCompleted

My question is why seventh element is not getting published which is added after subscription? Is it because input stream is cold stream and its reading items synchronously? So items added post onComplete fires will never reach to observer? Can someone please throw some light on this behavior?

Upvotes: 2

Views: 2470

Answers (2)

subhaze
subhaze

Reputation: 8855

You could create your own Observable to get the functionality you're looking for

Rx 5 Beta (change next and complete to onNext onCompleted for Rx 4)

var source = Rx.Observable.create(function (observer) {
    [1,2,3,4,5].forEach(item => observer.next(item));

    observer.next(6);
    // observer.complete() // <-- remove comment to allow observable to complete
    // Any cleanup logic might go here
    return function () {
        console.log('disposed');
    };
});


var subscription = source.subscribe(
    function (x) { console.log('onNext: %s', x); },
    function (e) { console.log('onError: %s', e); },
    function () { console.log('onCompleted'); }
);

example http://jsbin.com/datuqoniyo/edit?js,console

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/creating.md#creating-a-sequence-from-scratch

http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-create

Upvotes: 2

user3743222
user3743222

Reputation: 18665

arraySource is a cold source, and hence lazily instantiates its sequence of values when you subscribe to it. At that moment, your array has 6 values. And yes because of Rx.Observable.from(array) produces its values synchronously, you will see all 6 values printed after what the observable will finish.

That said, in a real code, it is not at all a good practice to mutate a parameter which intervenes in the definition of an observable. It makes your program very difficult to reason about as the impact of that mutation will depend of a large number of things (synchronicity as here, implementation of observable operator, etc.). If you have an argument which changes value over time, then you conceptually have an observable don't you. So you could model that as an observable to make easy to manipulate and reason about.

Upvotes: 2

Related Questions