Reputation: 1844
Observables complete naturally if they are constructed from finite data.
import {Observable, Subject} from "rx";
let stream0$ = Observable.of("1", "2", "3");
let stream1$ = stream0$.map(x => x);
stream1$.subscribe(
(val) => { console.log("onNext", val) },
(err) => { console.log("onError", err) },
() => { console.log("onCompleted") }
);
// onNext 1
// onNext 2
// onNext 3
// onCompleted
Or don't if not. But what about observables subscribed on subjects? For example:
import {Observable, Subject} from "rx";
let subj$ = new Subject();
let stream1$ = subj$.map(x => x);
stream1$.subscribe(
(val) => { console.log("onNext", val) },
(err) => { console.log("onError", err) },
() => { console.log("onCompleted") }
);
subj$.onNext("foo");
// onNext foo
"onCompleted" is not logged though source is ended. Can we pass this "end" event to stream1$
somehow. I've found no information about this important stuff in docs. It would be great to see a diagram like here Hot and Cold observables : are there 'hot' and 'cold' operators? to nail that event flow.
Upvotes: 2
Views: 4555
Reputation: 1844
How Complete and Error events actually work in RxJS is a following research I've made.
Quoting myself from there.
Completion should not be taken as "event at the end of the program" or something. It's a very specific thing. There are only three possible ways for stream to complete: 1) Be a finite
Observable
and complete naturally 2) Be aSubject
and get imperativeonCompleted()
call. 3) Get completion event from upstream. Any form of process termination / unsubscription do not complete streams.Completion terminates stream. Nothing happens in a stream after completion.
Completion is passed downstream.
Observable
derived fromSubject
completes if/whenSubject
completes.Subject
subscribed toObservable
completes if/whenObservable
completes.
Upvotes: 3
Reputation: 18665
With a subject, you are completely in control. Rx.Subject
implements the observer interface and it is that observer interface that you use when you call onNext
.
A subject
assume that all serialization and grammatical correctness are handled by the caller of the subject.
That means among other things that it is up to you to signal completion and error. To signal completion, use onCompleted
. FYI, here is the aforementioned grammar :
This grammar allows observable sequences to send any amount (0 or more) of
onNext
messages to the subscribed observer instance, optionally followed by a single success (onCompleted
) or failure (onError
) message.The single message indicating that an observable sequence has finished ensures that consumers of the observable sequence can deterministically establish that it is safe to perform cleanup operations.
A single failure further ensures that abort semantics can be maintained for operators that work on multiple observable sequences.
NOTE : for RxJS v5, the observer interface has changed, cf. new interface
Upvotes: 4