Ivan Kleshnin
Ivan Kleshnin

Reputation: 1844

How onComplete actually works in RxJS

Observables complete naturally if they are constructed from finite data.

import {Observable, Subject} from "rx";

let stream0$ = Observable.of("1", "2", "3");
let stream1$ = stream0$.map(x => x);

stream1$.subscribe(
  (val) => { console.log("onNext", val) },
  (err) => { console.log("onError", err) },
  ()    => { console.log("onCompleted") }
);

// onNext 1
// onNext 2
// onNext 3
// onCompleted

Or don't if not. But what about observables subscribed on subjects? For example:

import {Observable, Subject} from "rx";

let subj$ = new Subject();
let stream1$ = subj$.map(x => x);

stream1$.subscribe(
  (val) => { console.log("onNext", val) },
  (err) => { console.log("onError", err) },
  ()    => { console.log("onCompleted") }
);

subj$.onNext("foo");

// onNext foo

"onCompleted" is not logged though source is ended. Can we pass this "end" event to stream1$ somehow. I've found no information about this important stuff in docs. It would be great to see a diagram like here Hot and Cold observables : are there 'hot' and 'cold' operators? to nail that event flow.

Upvotes: 2

Views: 4555

Answers (2)

Ivan Kleshnin
Ivan Kleshnin

Reputation: 1844

How Complete and Error events actually work in RxJS is a following research I've made.

Quoting myself from there.

  1. Completion should not be taken as "event at the end of the program" or something. It's a very specific thing. There are only three possible ways for stream to complete: 1) Be a finite Observable and complete naturally 2) Be a Subject and get imperative onCompleted() call. 3) Get completion event from upstream. Any form of process termination / unsubscription do not complete streams.

  2. Completion terminates stream. Nothing happens in a stream after completion.

  3. Completion is passed downstream. Observable derived from Subject completes if/when Subject completes. Subject subscribed to Observable completes if/when Observable completes.

Upvotes: 3

user3743222
user3743222

Reputation: 18665

With a subject, you are completely in control. Rx.Subject implements the observer interface and it is that observer interface that you use when you call onNext.

A subject

assume that all serialization and grammatical correctness are handled by the caller of the subject.

That means among other things that it is up to you to signal completion and error. To signal completion, use onCompleted. FYI, here is the aforementioned grammar :

This grammar allows observable sequences to send any amount (0 or more) of onNext messages to the subscribed observer instance, optionally followed by a single success (onCompleted) or failure (onError) message.

The single message indicating that an observable sequence has finished ensures that consumers of the observable sequence can deterministically establish that it is safe to perform cleanup operations.

A single failure further ensures that abort semantics can be maintained for operators that work on multiple observable sequences.

NOTE : for RxJS v5, the observer interface has changed, cf. new interface

Upvotes: 4

Related Questions