Reputation: 103
I am trying to figure out how publishReplay
in rxjs works. I have an example where it works as expected:
const source = new Subject()
const sourceWrapper = source.pipe(
publishReplay(1),
refCount()
)
const subscribeTest1 = sourceWrapper
.subscribe(
(data: any) => {
console.log('subscriber1 received a value:', data)
}
)
source.next(1)
source.next(2)
setTimeout(() => {
const subscribeTest2 = sourceWrapper
.subscribe(
(data: any) => {
console.log('subscriber2 received a value:', data)
}
)
}, 5000)
We have a subject and a wrapper on it where publushReplay(1), refCount()
are added.
We create first subscription and then emit 2 values. As a result, we see in console the following:
subscriber1 received a value: 1
subscriber1 received a value: 2
And after 5 seconds we create another subscription, which receives last buffered value from ReplaySubject
created by publishReplay(1)
. As a result, we will see one more note in console after 5 seconds:
subscriber2 received a value: 2
So far so good.
But I have another example, where I try to apply this pattern in Angular application. Here is the link.
It contains a single module with 2 components app.component.ts
and hello.component.ts
and 1 service test.service.ts
test.service.ts
@Injectable({
providedIn: 'root'
})
export class TestService {
private _test$: Subject<string> = new Subject()
public get test$(): Observable<string> {
return this._test$.pipe(
publishReplay(1),
refCount()
)
}
public pushToStream(val: string): void {
this._test$.next(val)
}
}
app.component.ts
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
public name = 'Angular';
constructor(private testService: TestService) {}
public ngOnInit(): void {
this.testService.test$.subscribe((data: string) => {
console.log('Subscriber in app.component received a value: ', data)
})
this.testService.pushToStream('new value')
this.testService.pushToStream('one more value')
}
}
app.component.html
<hello name="{{ name }}"></hello>
<p>
Start editing to see some magic happen :)
</p>
hello.component.ts
@Component({
selector: 'hello',
template: `<h1>Hello {{name}}!</h1>`,
styles: [`h1 { font-family: Lato; }`]
})
export class HelloComponent implements OnInit {
@Input() name: string;
constructor(private testService: TestService) { }
public ngOnInit(): void {
setTimeout(() => {
this.testService.test$.subscribe((data: string) => {
console.log('Subscriber in hello component received a value:', data)
})
}, 4000)
}
}
Here is the same principle: hold source in service, which is the singleton, create 1 subscription in app.component.ts
, emit 2 values and create another subscription with the delay in hello.component.ts
. As we could see in the previous example, the second subscription should get latest buffered value, but it doesn't. We only see in console the following:
Subscriber in app.component received a value: new value
Subscriber in app.component received a value: one more value
What am I missing out and why it doesn't work?
Upvotes: 0
Views: 415
Reputation: 31105
It appears the wrapper reference to the subject is lost when returned using an accessor (getter) or a member function (eg. getTest() { return this.test$ }
). If the value test$
is declared public
and accessed directly, the reference to the source observable seems to be preserved among different components. Try the following
Service
export class TestService {
private _test$: Subject<string> = new Subject()
public test$ = this._test$.pipe(
publishReplay(1),
refCount()
);
public pushToStream(val: string): void {
this._test$.next(val)
}
}
I've modified your Stackblitz
Probably the functions return separate copies of the variables that are bound to the same subject observable but they (the individual test$
in the components) isn't the same instance but multiple individual instances.
Perhaps someone else might be able to provide a canonical answer.
Upvotes: 2