Lukas Leitner
Lukas Leitner

Reputation: 33

RxJS deliver all messages in correct sequence

I am currently struggling with a scenario where I have 3 subscribers of an RxJS Subject, and one of the subscribers (the second subscriber (B) ) adds a messages to the subject when it receives a message from the subject.

What this causes is, that the third subscriber (C) receives all messages after the message that B has sent and also in the wrong order.

This is my code:

import { Observable, Subject } from 'rxjs';

var mystream = new Subject<any>();

// first observer 
mystream.asObservable().subscribe(msg => console.log('A: ', msg));

// second observer which also adds a message to the subject
mystream.asObservable().subscribe(msg => {
  console.log('B: ', msg);

  if(msg === 'Initial message') {
    mystream.next('Message from B');
  }
});

// third observer who receives the messages as the last one and in incorrect oder
mystream.asObservable().subscribe(msg => console.log('C: ', msg));

// start the whole process
mystream.next('Initial message');

This currently gives me the following result:

A:  Initial message
B:  Initial message
A:  Message from B
B:  Message from B
C:  Message from B
C:  Initial message

How can I achieve the following result?

A:  Initial message
B:  Initial message
C:  Initial message
A:  Message from B
B:  Message from B
C:  Message from B

Upvotes: 1

Views: 233

Answers (1)

akotech
akotech

Reputation: 2274

You could achieve it using the asyncScheduler in your consumable observable to emit each notification in a different tick.

It'd be something like this:

var publisher = new Subject<any>();
var myStream = publisher.pipe(observeOn(asyncScheduler)); // <----

// first observer
myStream.subscribe((msg) => console.log('A: ', msg));

// second observer which also adds a message to the subject
myStream.subscribe((msg) => {
  console.log('B: ', msg);

  if (msg === 'Initial message') {
    publisher.next('Message from B');
  }
});

// third observer who receives the messages as the last one and in incorrect oder
myStream.subscribe((msg) => console.log('C: ', msg));

// start the whole process
publisher.next('Initial message');

Cheers

Upvotes: 2

Related Questions