Reputation: 657
I'm using rxjs and subjects to update my two of components.
I'm subscribing to a subject in a service, but when the .next method is called on the Subject, it only updates one of my components.
The application consist of a WebsocketService to initialize a websocketconnection, a NotificationService that uses the WebsocketService to connect to the backend and send/recieve notifications.
I have a NotificationComponent where I can create a new notification. In this component I subscribed to the Subject in the NotificationService, and display the notification when it gets updated. This works just fine, the message reaches the backend, and get's updated in all browsers that currently have a connection.
The next step for me was to show this notification in the HeaderComponent. I injected the NotificationService here and subscribed to the same Subject, however when I send the notification, the HeaderComponents subscription does not fire. The console.log message never shows up in the console.
WebSocketService
import { Injectable } from '@angular/core';
import { ReplaySubject, Subject, Observable, Observer } from 'rxjs/Rx';
@Injectable()
export class WebsocketService {
constructor() { }
private subject: ReplaySubject<MessageEvent>;
public connect(url): ReplaySubject<MessageEvent> {
if (!this.subject) {
this.subject = this.create(url);
console.log("Successfully connected: " + url);
}
return this.subject;
}
private create(url): ReplaySubject<MessageEvent> {
//create connection
let ws = new WebSocket(url);
//define observable
let observable = Observable.create(
(obs: Observer<MessageEvent>) => {
ws.onmessage = obs.next.bind(obs);
ws.onerror = obs.error.bind(obs);
ws.onclose = obs.complete.bind(obs);
return ws.close.bind(ws);
});
//define observer
let observer = {
next: (data: Object) => {
if (ws.readyState === WebSocket.OPEN) {
console.log("---sending ws message---");
ws.send(JSON.stringify(data));
}
}
};
return ReplaySubject.create(observer, observable);
}
}
NotificationService
import { Injectable } from '@angular/core';
import { Observable, Subject, ReplaySubject, BehaviorSubject } from 'rxjs/Rx';
import { WebsocketService } from './websocket.service';
import { Notification } from './../model/notification'
const NOTIFICATION_URL = 'ws://localhost:8080/Kwetter/socket'
@Injectable()
export class NotificationService {
public _notification: ReplaySubject<Notification>;
constructor(websocketService: WebsocketService) {
this._notification = <ReplaySubject<Notification>>websocketService
.connect(NOTIFICATION_URL)
.map((response: MessageEvent): Notification => {
let data = JSON.parse(response.data);
return {
sender: data.author,
message: data.message
}
});
}
sendMessage(notification) {
console.log("---calling .next()---");
this._notification.next(notification);
}
}
NotificationComponent
import { Component, OnInit } from '@angular/core';
import { NotificationService } from '../services/notification.service';
import { UserService } from '../services/user.service';
import { Notification } from './../model/notification';
@Component({
selector: 'app-notifications',
templateUrl: './notifications.component.html',
styleUrls: ['./notifications.component.css']
})
export class NotificationsComponent implements OnInit {
notification: Notification;
text: string;
constructor(private notificationService: NotificationService, private userService: UserService) {
if (this.notification == null) {
this.notification = new Notification("", "");
}
notificationService._notification.subscribe(notification => {
console.log("---notification has been updated---")
this.notification = notification;
});
}
sendMsg() {
let newNot = new Notification(this.userService.getUser(), this.text);
this.notificationService.sendMessage(newNot);
}
ngOnInit() {
}
}
HeaderComponent
import { Component, OnInit, OnDestroy } from '@angular/core';
import { UserService } from '../../services/user.service';
import { NotificationService } from '../../services/notification.service';
import { Router } from '@angular/router';
import { Subscription } from 'rxjs/Subscription';
import { Profile } from '../../model/profile';
import { User } from '../../model/user';
import { Notification } from '../../model/notification';
@Component({
selector: 'app-header',
templateUrl: './header.component.html',
styleUrls: ['./header.component.css']
})
export class HeaderComponent implements OnInit, OnDestroy {
private notification: Notification;
private loggedIn = false;
private user: User;
private subscription: Subscription;
constructor(private userService: UserService, private router: Router, private notificationService: NotificationService) {
console.log("---constructor headercomponent---");
console.log(this.notification);
this.notificationService._notification.subscribe(notification => {
console.log("---header notification has been updated---");
this.notification = notification;
});
if (this.notification == null) {
this.notification = new Notification("", "");
}
this.subscription = this.userService.profile$.subscribe(user => {
this.user = user;
if (user !== null) {
this.loggedIn = true;
}
else this.loggedIn = false;
});
this.loggedIn = userService.isLoggedIn();
this.user = userService.getUser();
}
logout() {
this.userService.logout();
this.router.navigate(['home']);
}
home() {
this.router.navigate(['home']);
}
myProfile() {
console.log("click");
this.router.navigate(['profile', this.userService.getUser().id]);
}
getLoggedIn(): void {
this.loggedIn = !!this.userService.isLoggedIn();
}
ngOnInit() {
this.getLoggedIn();
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}
The NotificationComponent is shown by using the router-outlet, and the header component is always shown by using the selector tags, but I don't think this should matter.
<div>
<app-header></app-header>
<div class="content">
<router-outlet></router-outlet>
</div>
</div>
I found the thread below, that suggested using ReplaySubject in case I subscribe after the event is fired (I don't think is the case, but I tried anyway). This didn't work.
Also, I only have a single app.module where I declare the providers. Since I'm using the same code for both components, why does .subscribe only work within the NotificationComponent?
Angular 2: Observable / Subscription not triggering
Upvotes: 2
Views: 2844
Reputation: 26848
The behavior you see has to do with how RxJS works and how your stream is created. Let's take a look at WebsocketService
:
let observable = Observable.create(
(obs: Observer<MessageEvent>) => {
ws.onmessage = obs.next.bind(obs);
obs
is new for every subscription, but ws
is always the same. So when you subscribe a second time in the NotificationComponent
the onmessage
callback invokes next
only for that subscription. Hence only that component receives messages.
You can verify that by commenting out notificationService._notification.subscribe
in the NotificationComponent
. Then the HeaderComponent
will receive messages.
One simple solution is to add the share
operator in the NotificationService
:
this._notification = <ReplaySubject<Notification>>websocketService
.connect(NOTIFICATION_URL)
.map((response: MessageEvent): Notification => {
let data = JSON.parse(response.data);
return {
sender: data.author,
message: data.message
}
})
.share();
That means that the subscription upstream of .share()
will be shared, i.e. (obs: Observer<MessageEvent>) => {
ws.onmessage = obs.next.bind(obs);
will be called only once and both components will receive messages.
Btw.: RxJs offers support for websockets. You can create a stream with Observable.webSocket(url);
and get rid of some code.
Upvotes: 3