Reputation: 33
I am currently struggling with a scenario where I have 3 subscribers of an RxJS Subject, and one of the subscribers (the second subscriber (B) ) adds a messages to the subject when it receives a message from the subject.
What this causes is, that the third subscriber (C) receives all messages after the message that B has sent and also in the wrong order.
This is my code:
import { Observable, Subject } from 'rxjs';
var mystream = new Subject<any>();
// first observer
mystream.asObservable().subscribe(msg => console.log('A: ', msg));
// second observer which also adds a message to the subject
mystream.asObservable().subscribe(msg => {
console.log('B: ', msg);
if(msg === 'Initial message') {
mystream.next('Message from B');
}
});
// third observer who receives the messages as the last one and in incorrect oder
mystream.asObservable().subscribe(msg => console.log('C: ', msg));
// start the whole process
mystream.next('Initial message');
This currently gives me the following result:
A: Initial message
B: Initial message
A: Message from B
B: Message from B
C: Message from B
C: Initial message
How can I achieve the following result?
A: Initial message
B: Initial message
C: Initial message
A: Message from B
B: Message from B
C: Message from B
Upvotes: 1
Views: 233
Reputation: 2274
You could achieve it using the asyncScheduler
in your consumable observable to emit each notification in a different tick.
It'd be something like this:
var publisher = new Subject<any>();
var myStream = publisher.pipe(observeOn(asyncScheduler)); // <----
// first observer
myStream.subscribe((msg) => console.log('A: ', msg));
// second observer which also adds a message to the subject
myStream.subscribe((msg) => {
console.log('B: ', msg);
if (msg === 'Initial message') {
publisher.next('Message from B');
}
});
// third observer who receives the messages as the last one and in incorrect oder
myStream.subscribe((msg) => console.log('C: ', msg));
// start the whole process
publisher.next('Initial message');
Cheers
Upvotes: 2