Estus Flask
Estus Flask

Reputation: 222389

Pipe RxJS observable to existing subject

There is existing subject that is in use:

const fooSubject = new BehaviorSubject(null);

And there is another observable (another subject in this example):

const barSubject = new Subject();
barSubject.subscribe(
  value => fooSubject.next(),
  err => fooSubject.error(err),
  () => fooSubject.complete()
);

barSubject.next('bar');

The code works but looks clumsy.

Is there a better way to pipe (in broad sense, not necessarily using pipe operator) barSubject observable to fooSubject? It looks like an operation that could be handled by the library itself.

Upvotes: 37

Views: 16239

Answers (2)

Adam
Adam

Reputation: 2736

Regarding unsubscribing after the source Observable completes, I've been using this code. It functions as expected, but I don't know if it's an "anti-pattern" or not...?

const subscription = this.http.get(url)
  .pipe(finalize(() => subscription.unsubscribe()))
  .subscribe(this.mySubject$);

EDIT: You don't need to unsubscribe from http.get(..) because it's done automatically. Therefore, with respect to the code above, the correct form would be:

this.http.get(url).subscribe(mySubject$)

EDIT 2: A gotcha with the above code is that when http.get completes, then mySubject$ will also complete. Now if you .subscribe(mySubject$) or mySubject$.next(..) it will not emit values. To avoid this and keep mySubject$ hot, use this code:

this.http.get(url).subscribe(r => this.mySubject$.next(r))

Upvotes: 1

martin
martin

Reputation: 96889

Since Subject is already an observer with methods next(), error() and complete() you can just subscribe it to any Observable:

const fooSubject = new BehaviorSubject(null);

const barSubject = new Subject();
barSubject.subscribe(fooSubject);

barSubject.next('bar');

Upvotes: 47

Related Questions