pfooti
pfooti

Reputation: 2704

Is it an antipattern to use subscription.next()?

I have a subscription that I want to use to signal events, in a piece of middleware code. So I have one thing that wants to subscribe an observable, and another thing that's not quite an observable.

I thought about using Subjects for this - that's what they seem to be for:

const repeater = new Rx.Subject();

function subscribe(observer) {
  return repeater.subscribe(observer);
}

// in some other function, call repeater.next(val)

But then I started looking at what regular subscribe() calls return. I could do this instead:

let sub = null;
function subscribe(observer) {
  return Rx.Observable.create((o) => sub = o);
}
// elsewhere sub.next(val)

But also?

let unsub = null;
function subscribe(observer) {
  unsub = Rx.Observable.create(() => false).subscribe(observer)
}
// elsewhere unsub.next(val)

all of these things will signal val to the subscriber. The weird thing that I don't understand here is the subscription return having a next() available to it - I thought next() only lived on the observer in the Observable context.

I need to keep a handle on the unsubscription doodad no matter what - when the middleware is torn down, I need to signal stream completion and release some resources. It was surprising to me that unsub had a functioning next.

This signals to me that there's some bit of RxJS that I just plain don't grok yet, in terms of Observers and Observables and Subjects and so on. In general, I understand how to wire event sources and other similar things into an observable stream. It's really just in the context of building an observable stream out of a boring function call - whenever the function is called by an external library, this stream signals an updated observable.

Upvotes: 5

Views: 10622

Answers (1)

John C
John C

Reputation: 1166

A Subscriber extends both Subscription and Observer, adding state. It exposes one method to change the state (namely, unsubscribe()), and it also exposes observer's next()/error()/complete() methods, but those methods now both honor state and also change state.

So, if I gave you a bare observer, you could call next()/error()/complete() on it in any order, as many times as you want, even though it would be terrible for you to call my next() after you've called my complete().

On the other hand, if I gave you an Observer wrapped up in a Subscriber, now there is state, and if you try to call next() on that subscriber after you called complete(), I won't see it. If you call unsubscribe(), I'll be detached.

When you call subscribe, as in

subscriber = Rx.Observable.create(fn).subscribe(observer);

you're getting back the same observer, and only that observer, wrapped inside a Subscriber. That's why you see the next()/error()/complete() methods. But those methods are normally for internal use, and if you use them to feed an observer, it is not going to do what you expect:

let observerA = {
    next: (x) => console.log('A: value: ' + x),
    error: (x) => console.log('A: error: ' + x),
    complete: () => console.log('A: completed')
}
let observerB = {
    next: (x) => console.log('B: value: ' + x),
    error: (x) => console.log('B: error: ' + x),
    complete: () => console.log('B: completed')
}

let observable = Rx.Observable.create(() => false);
let subscriberA = observable.subscribe(observerA);
let subscriberB = observable.map(x => 10*x).subscribe(observerB);
subscriberA.next(1); // only feeds observerA
// => "A: value: 1"
subscriberB.next(2); // only feeds observerB
// => "B: value: 2"  // What?

Odds are, for your use case, you will

  1. Want to use Subject because it gives you next()/error()/complete() interfaces that let you feed the front of the operator chain,
  2. Want to use Subject because it lets you feed the same values to multiple observers,
  3. Forget what you just learned about Subscribers because you're not going to use the next()/error()/complete() Subscriber interfaces. Instead, think of the object returned by subscribe() as a Subscription only, and only use the unsubscribe() method on it.

So:

let subject = new Rx.Subject();
let subscriptionA = subject.subscribe(observerA);
let subscriptionB = subject.map(x=>10*x).subscribe(observerB);
subject.next(3);
// => A: value: 3
// => B: value: 30
subscriptionA.unsubscribe()
subject.next(4);
// => B: value: 40
subscriptionB.unsubscribe()
subject.next(5);
// (no output)

Also see When to use asObservable() in rxjs?.

Upvotes: 8

Related Questions