Ivan Frolov
Ivan Frolov

Reputation: 103

Source with `publishReplay()` and `refCount()` doesn't work as expected between subscriptions in multiple components in Angular

I am trying to figure out how publishReplay in rxjs works. I have an example where it works as expected:

const source = new Subject()
const sourceWrapper = source.pipe(
  publishReplay(1),
  refCount()
)

const subscribeTest1 = sourceWrapper
.subscribe(
  (data: any) => {
    console.log('subscriber1 received a value:', data)
  }
)

source.next(1)
source.next(2)

setTimeout(() => {
  const subscribeTest2 = sourceWrapper
  .subscribe(
    (data: any) => {
      console.log('subscriber2 received a value:', data)
    }
  )
}, 5000)

We have a subject and a wrapper on it where publushReplay(1), refCount() are added. We create first subscription and then emit 2 values. As a result, we see in console the following:

subscriber1 received a value: 1
subscriber1 received a value: 2

And after 5 seconds we create another subscription, which receives last buffered value from ReplaySubject created by publishReplay(1). As a result, we will see one more note in console after 5 seconds:

subscriber2 received a value: 2

Link to stackblitz

So far so good.

But I have another example, where I try to apply this pattern in Angular application. Here is the link.

It contains a single module with 2 components app.component.ts and hello.component.ts and 1 service test.service.ts

test.service.ts

@Injectable({
  providedIn: 'root'
})

export class TestService {
  private _test$: Subject<string> = new Subject()

  public get test$(): Observable<string> {
    return this._test$.pipe(
      publishReplay(1),
      refCount()
    )
  }

  public pushToStream(val: string): void {
    this._test$.next(val)
  }
}

app.component.ts

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
  public name = 'Angular';

  constructor(private testService: TestService) {}

  public ngOnInit(): void {
    this.testService.test$.subscribe((data: string) => {
      console.log('Subscriber in app.component received a value: ', data)
    })
    this.testService.pushToStream('new value')
    this.testService.pushToStream('one more value')
  }
}

app.component.html

<hello name="{{ name }}"></hello>
<p>
  Start editing to see some magic happen :)
</p>

hello.component.ts

@Component({
  selector: 'hello',
  template: `<h1>Hello {{name}}!</h1>`,
  styles: [`h1 { font-family: Lato; }`]
})
export class HelloComponent implements OnInit  {
  @Input() name: string;

  constructor(private testService: TestService) { }

  public ngOnInit(): void {
    setTimeout(() => {
      this.testService.test$.subscribe((data: string) => {
        console.log('Subscriber in hello component received a value:', data)
      })
    }, 4000)
  }

}

Here is the same principle: hold source in service, which is the singleton, create 1 subscription in app.component.ts, emit 2 values and create another subscription with the delay in hello.component.ts. As we could see in the previous example, the second subscription should get latest buffered value, but it doesn't. We only see in console the following:

Subscriber in app.component received a value: new value
Subscriber in app.component received a value: one more value

What am I missing out and why it doesn't work?

Upvotes: 0

Views: 415

Answers (1)

Barremian
Barremian

Reputation: 31105

It appears the wrapper reference to the subject is lost when returned using an accessor (getter) or a member function (eg. getTest() { return this.test$ }). If the value test$ is declared public and accessed directly, the reference to the source observable seems to be preserved among different components. Try the following

Service

export class TestService {
  private _test$: Subject<string> = new Subject()

  public test$ = this._test$.pipe(
    publishReplay(1),
    refCount()
  );

  public pushToStream(val: string): void {
    this._test$.next(val)
  }
}

I've modified your Stackblitz

Probably the functions return separate copies of the variables that are bound to the same subject observable but they (the individual test$ in the components) isn't the same instance but multiple individual instances.

Perhaps someone else might be able to provide a canonical answer.

Upvotes: 2

Related Questions