Marco Ripamonti
Marco Ripamonti

Reputation: 247

Subscribe as soon as possible to a second observable, but wait for the first one to emit - Angular / RxJS

I'm in a case where I want to subscribe as soon as possible to two different obsevable, but wait for the first one to emit before the second one emits.

For instance I'm developing a chat application where I want to retrieve all previous messages stored on a DB, but also subscribe to a stream which emits new ones.

I want something like this:

readonly messages$ = this.service.getStoredMessages().pipe(
    switchMapTo(this.service.getNewMessages())
)

The problem with this is that I may miss some messages because of a late subscription to getNewMessages(). What I need to do is to subscribe as soon as possible to both observables, but the second one (getNewMessages()) should emits only after getStoredMessages() emits.

Upvotes: 0

Views: 1213

Answers (4)

Josep
Josep

Reputation: 13071

Assuming that:

  • storedMessges$ type is Observable<Message[]>
  • that getMessages$ type is Observable<Message>
  • and that messages$ type is supposed to be Observable<Message>

I think that you are what you are want to do is something more or less like this:

readonly messages$ = defer(() => {
  const storedMessages$ = this.service.getStoredMessages().pipe(share())
  const newMessages$ = this.service.getNewMessages().pipe(share())

  const missedMessages$ = newMessages$.pipe(
    takeUntil(storedMessages$),
    toArray(),
  )

  const initialMessages$ = merge(
    storedMessages$,
    missedMessages$
  ).pipe(mergeAll()) // instead of `mergeAll` maybe you want to `scan` and make sure that there are no duplicate values...

  return merge(
    newMessages$.pipe(ignoreElements()), // it ensures that we don't unbscribe from newMessages$ after storedmessages$ emits/completes.
    concat(initialMessages$, newMessages$),
  )
})

Upvotes: 1

Rohit Chopra
Rohit Chopra

Reputation: 54

From your question what i understand is that there is an array of messages which you want to use in your view.

Let's call it viewMessages. Now if we want to push messages into viewMessages, I recommend subscribing both the streams i.e. stream of previous messages and stream of chat message and when any of them emits, prepend or append messages/messages into viewMessages.

viewMessages:Message[] = [];
.
.
.
this.getStoredMessages().subscribe(messages=>{
    const chatMessages = this.viewMessages;
    this.viewMessages = [].concat(messages, chatMessages);
});
this.getNewMessage().subscribe(message=>{
    const existingMessages = this.viewMessages;
    this.viewMessages = [].concat(existingMessages, message);
});

Please not the sequence is maintained in both the subscriptions. Previous messages will appear first and new messages will appear afterwards.

Upvotes: 0

Mosab Mhnna
Mosab Mhnna

Reputation: 51

you can try this :

readonly messages$ = this.service.getStoredMessages().subscribe(
              res => {
                      this.service.getNewMessages()
                     })

Upvotes: 0

Anton Marinenko
Anton Marinenko

Reputation: 2982

You can use this kind of structure:

readonly messages$ = this.service.getStoredMessages().pipe(
    withLatestFrom(this.service.getNewMessages())
)

Upvotes: 0

Related Questions