Reputation: 2704
I have a subscription that I want to use to signal events, in a piece of middleware code. So I have one thing that wants to subscribe an observable, and another thing that's not quite an observable.
I thought about using Subjects for this - that's what they seem to be for:
const repeater = new Rx.Subject();
function subscribe(observer) {
return repeater.subscribe(observer);
}
// in some other function, call repeater.next(val)
But then I started looking at what regular subscribe() calls return. I could do this instead:
let sub = null;
function subscribe(observer) {
return Rx.Observable.create((o) => sub = o);
}
// elsewhere sub.next(val)
But also?
let unsub = null;
function subscribe(observer) {
unsub = Rx.Observable.create(() => false).subscribe(observer)
}
// elsewhere unsub.next(val)
all of these things will signal val to the subscriber. The weird thing that I don't understand here is the subscription return having a next()
available to it - I thought next()
only lived on the observer in the Observable context.
I need to keep a handle on the unsubscription doodad no matter what - when the middleware is torn down, I need to signal stream completion and release some resources. It was surprising to me that unsub had a functioning next.
This signals to me that there's some bit of RxJS that I just plain don't grok yet, in terms of Observers and Observables and Subjects and so on. In general, I understand how to wire event sources and other similar things into an observable stream. It's really just in the context of building an observable stream out of a boring function call - whenever the function is called by an external library, this stream signals an updated observable.
Upvotes: 5
Views: 10622
Reputation: 1166
A Subscriber extends both Subscription and Observer, adding state. It exposes one method to change the state (namely, unsubscribe()
), and
it also exposes observer's next()
/error()
/complete()
methods,
but those methods now both honor state and also change state.
So, if I gave you a bare observer, you could call next()
/error()
/complete()
on it in any order, as many times as you want, even though it would be
terrible for you to call my next()
after you've called my complete()
.
On the other hand, if I gave you an Observer wrapped up in a
Subscriber, now there is state, and if you try to call next()
on
that subscriber after you called complete()
, I won't see it.
If you call unsubscribe()
, I'll be detached.
When you call subscribe, as in
subscriber = Rx.Observable.create(fn).subscribe(observer);
you're getting back the same observer, and only that observer,
wrapped inside a Subscriber. That's why you see the next()
/error()
/complete()
methods. But those methods are normally for internal use, and if you use them to feed an observer, it is not going to do what you expect:
let observerA = {
next: (x) => console.log('A: value: ' + x),
error: (x) => console.log('A: error: ' + x),
complete: () => console.log('A: completed')
}
let observerB = {
next: (x) => console.log('B: value: ' + x),
error: (x) => console.log('B: error: ' + x),
complete: () => console.log('B: completed')
}
let observable = Rx.Observable.create(() => false);
let subscriberA = observable.subscribe(observerA);
let subscriberB = observable.map(x => 10*x).subscribe(observerB);
subscriberA.next(1); // only feeds observerA
// => "A: value: 1"
subscriberB.next(2); // only feeds observerB
// => "B: value: 2" // What?
Odds are, for your use case, you will
next()
/error()
/complete()
interfaces that let you feed the front of the operator chain,next()
/error()
/complete()
Subscriber interfaces. Instead, think of the object returned by subscribe()
as a Subscription only, and only use the unsubscribe()
method on it.So:
let subject = new Rx.Subject();
let subscriptionA = subject.subscribe(observerA);
let subscriptionB = subject.map(x=>10*x).subscribe(observerB);
subject.next(3);
// => A: value: 3
// => B: value: 30
subscriptionA.unsubscribe()
subject.next(4);
// => B: value: 40
subscriptionB.unsubscribe()
subject.next(5);
// (no output)
Also see When to use asObservable() in rxjs?.
Upvotes: 8