BartKrul
BartKrul

Reputation: 657

Angular 5 Rxjs Subject.subscribe() not triggering in multiple components

I'm using rxjs and subjects to update my two of components.

I'm subscribing to a subject in a service, but when the .next method is called on the Subject, it only updates one of my components.

The application consist of a WebsocketService to initialize a websocketconnection, a NotificationService that uses the WebsocketService to connect to the backend and send/recieve notifications.

I have a NotificationComponent where I can create a new notification. In this component I subscribed to the Subject in the NotificationService, and display the notification when it gets updated. This works just fine, the message reaches the backend, and get's updated in all browsers that currently have a connection.

The next step for me was to show this notification in the HeaderComponent. I injected the NotificationService here and subscribed to the same Subject, however when I send the notification, the HeaderComponents subscription does not fire. The console.log message never shows up in the console.

WebSocketService

import { Injectable } from '@angular/core';
import { ReplaySubject, Subject, Observable, Observer } from 'rxjs/Rx';

@Injectable()
export class WebsocketService {

  constructor() { }

  private subject: ReplaySubject<MessageEvent>;

  public connect(url): ReplaySubject<MessageEvent> {
    if (!this.subject) {
      this.subject = this.create(url);
      console.log("Successfully connected: " + url);
    }
    return this.subject;
  }

  private create(url): ReplaySubject<MessageEvent> {

    //create connection
    let ws = new WebSocket(url);

    //define observable
    let observable = Observable.create(
      (obs: Observer<MessageEvent>) => {
        ws.onmessage = obs.next.bind(obs);
        ws.onerror = obs.error.bind(obs);
        ws.onclose = obs.complete.bind(obs);
        return ws.close.bind(ws);
      });

    //define observer
    let observer = {
      next: (data: Object) => {
        if (ws.readyState === WebSocket.OPEN) {
          console.log("---sending ws message---");
          ws.send(JSON.stringify(data));
        }
      }
    };

    return ReplaySubject.create(observer, observable);
  }
}

NotificationService

import { Injectable } from '@angular/core';
import { Observable, Subject, ReplaySubject, BehaviorSubject } from 'rxjs/Rx';
import { WebsocketService } from './websocket.service';
import { Notification } from './../model/notification'

const NOTIFICATION_URL = 'ws://localhost:8080/Kwetter/socket'


@Injectable()
export class NotificationService {

  public _notification: ReplaySubject<Notification>;

  constructor(websocketService: WebsocketService) {

    this._notification = <ReplaySubject<Notification>>websocketService
      .connect(NOTIFICATION_URL)
      .map((response: MessageEvent): Notification => {
        let data = JSON.parse(response.data);
        return {
          sender: data.author,
          message: data.message
        }
      });
  }

  sendMessage(notification) {
    console.log("---calling .next()---");
    this._notification.next(notification);
  }
}

NotificationComponent

import { Component, OnInit } from '@angular/core';
import { NotificationService } from '../services/notification.service';
import { UserService } from '../services/user.service';
import { Notification } from './../model/notification';

@Component({
  selector: 'app-notifications',
  templateUrl: './notifications.component.html',
  styleUrls: ['./notifications.component.css']
})
export class NotificationsComponent implements OnInit {

  notification: Notification;
  text: string;

  constructor(private notificationService: NotificationService, private userService: UserService) {

    if (this.notification == null) {
      this.notification = new Notification("", "");
    }
    notificationService._notification.subscribe(notification => {
      console.log("---notification has been updated---")
      this.notification = notification;
    });
  }

  sendMsg() {
    let newNot = new Notification(this.userService.getUser(), this.text);
    this.notificationService.sendMessage(newNot);
  }

  ngOnInit() {
  }

}

HeaderComponent

    import { Component, OnInit, OnDestroy } from '@angular/core';
import { UserService } from '../../services/user.service';
import { NotificationService } from '../../services/notification.service';
import { Router } from '@angular/router';
import { Subscription } from 'rxjs/Subscription';
import { Profile } from '../../model/profile';
import { User } from '../../model/user';
import { Notification } from '../../model/notification';

@Component({
  selector: 'app-header',
  templateUrl: './header.component.html',
  styleUrls: ['./header.component.css']
})
export class HeaderComponent implements OnInit, OnDestroy {

  private notification: Notification;
  private loggedIn = false;
  private user: User;

  private subscription: Subscription;

  constructor(private userService: UserService, private router: Router, private notificationService: NotificationService) {

    console.log("---constructor headercomponent---");
    console.log(this.notification);

    this.notificationService._notification.subscribe(notification => {
      console.log("---header notification has been updated---");
      this.notification = notification;
    });

    if (this.notification == null) {
      this.notification = new Notification("", "");
    }

    this.subscription = this.userService.profile$.subscribe(user => {
      this.user = user;
      if (user !== null) {
        this.loggedIn = true;
      }
      else this.loggedIn = false;
    });

    this.loggedIn = userService.isLoggedIn();
    this.user = userService.getUser();
  }

  logout() {
    this.userService.logout();
    this.router.navigate(['home']);
  }

  home() {
    this.router.navigate(['home']);
  }

  myProfile() {
    console.log("click");
    this.router.navigate(['profile', this.userService.getUser().id]);
  }

  getLoggedIn(): void {
    this.loggedIn = !!this.userService.isLoggedIn();
  }

  ngOnInit() {
    this.getLoggedIn();
  }

  ngOnDestroy() {
    this.subscription.unsubscribe();
  }

}

The NotificationComponent is shown by using the router-outlet, and the header component is always shown by using the selector tags, but I don't think this should matter.

<div>
  <app-header></app-header>
  <div class="content">
    <router-outlet></router-outlet>
  </div>
</div>

I found the thread below, that suggested using ReplaySubject in case I subscribe after the event is fired (I don't think is the case, but I tried anyway). This didn't work.

Also, I only have a single app.module where I declare the providers. Since I'm using the same code for both components, why does .subscribe only work within the NotificationComponent?

Angular 2: Observable / Subscription not triggering

Console view

Upvotes: 2

Views: 2844

Answers (1)

a better oliver
a better oliver

Reputation: 26848

The behavior you see has to do with how RxJS works and how your stream is created. Let's take a look at WebsocketService:

let observable = Observable.create(
  (obs: Observer<MessageEvent>) => {
    ws.onmessage = obs.next.bind(obs);

obs is new for every subscription, but ws is always the same. So when you subscribe a second time in the NotificationComponent the onmessage callback invokes next only for that subscription. Hence only that component receives messages.

You can verify that by commenting out notificationService._notification.subscribe in the NotificationComponent. Then the HeaderComponent will receive messages.

One simple solution is to add the share operator in the NotificationService:

this._notification = <ReplaySubject<Notification>>websocketService
  .connect(NOTIFICATION_URL)
  .map((response: MessageEvent): Notification => {
    let data = JSON.parse(response.data);
    return {
      sender: data.author,
      message: data.message
    }
  })
.share();

That means that the subscription upstream of .share() will be shared, i.e. (obs: Observer<MessageEvent>) => { ws.onmessage = obs.next.bind(obs); will be called only once and both components will receive messages.

Btw.: RxJs offers support for websockets. You can create a stream with Observable.webSocket(url); and get rid of some code.

Upvotes: 3

Related Questions