Ced
Ced

Reputation: 17337

Turn observable into subject

We have a function that gets a stream from the backend as observable. However we would like to be able to push to that observable as well to see the changes before those are done in the back-end. To do so I tried giving back a subject instead but the connection is still on going after the unsubscribe.

In other words, in the code below, we would like the console.log(i) not to start before we subscribe to the subject, and finish when we unsubscribe from it :

import { ReplaySubject, Observable, interval } from 'rxjs';
import { tap } from 'rxjs/operators'

function test() {
    const obs = interval(1000).pipe(tap(i => console.log(i)));
    const subj = new ReplaySubject(1);
    obs.subscribe(subj);
    return subj;
}

const subject = test();
subject.next('TEST');

const subscription = subject.pipe(
    tap(i => console.log('from outside ' + i))
).subscribe()
setTimeout(_ => subscription.unsubscribe(), 5000);

example

Upvotes: 3

Views: 351

Answers (1)

m1ch4ls
m1ch4ls

Reputation: 3435

You cannot subscribe in test. I guess you want to create an Observable and a Subject and merge those - you would have to return both separately.

return [subject, merge(subject, obs)]

and then

const [subject, obs] = test();
subject.next()

But I would do it by providing subject as a parameter.

import { ReplaySubject, Observable, interval, merge } from 'rxjs';
import { tap } from 'rxjs/operators'

function test(subject) {
  return merge(
    interval(1000).pipe(tap(i => console.log(i))),
    subject
  );
}

const subject = new ReplaySubject(1);

const obs = test(subject);
subject.next('TEST');

const subscription = obs.pipe(
  tap(i => console.log('from outside ' + i))
).subscribe()
setTimeout(_ => subscription.unsubscribe(), 5000);

Upvotes: 2

Related Questions