Reputation: 247
I'm in a case where I want to subscribe as soon as possible to two different obsevable, but wait for the first one to emit before the second one emits.
For instance I'm developing a chat application where I want to retrieve all previous messages stored on a DB, but also subscribe to a stream which emits new ones.
I want something like this:
readonly messages$ = this.service.getStoredMessages().pipe(
switchMapTo(this.service.getNewMessages())
)
The problem with this is that I may miss some messages because of a late subscription to getNewMessages()
. What I need to do is to subscribe as soon as possible to both observables, but the second one (getNewMessages()
) should emits only after getStoredMessages()
emits.
Upvotes: 0
Views: 1213
Reputation: 13071
Assuming that:
storedMessges$
type is Observable<Message[]>
getMessages$
type is Observable<Message>
messages$
type is supposed to be Observable<Message>
I think that you are what you are want to do is something more or less like this:
readonly messages$ = defer(() => {
const storedMessages$ = this.service.getStoredMessages().pipe(share())
const newMessages$ = this.service.getNewMessages().pipe(share())
const missedMessages$ = newMessages$.pipe(
takeUntil(storedMessages$),
toArray(),
)
const initialMessages$ = merge(
storedMessages$,
missedMessages$
).pipe(mergeAll()) // instead of `mergeAll` maybe you want to `scan` and make sure that there are no duplicate values...
return merge(
newMessages$.pipe(ignoreElements()), // it ensures that we don't unbscribe from newMessages$ after storedmessages$ emits/completes.
concat(initialMessages$, newMessages$),
)
})
Upvotes: 1
Reputation: 54
From your question what i understand is that there is an array of messages which you want to use in your view.
Let's call it viewMessages
. Now if we want to push messages into viewMessages
, I recommend subscribing both the streams i.e. stream of previous messages and stream of chat message and when any of them emits, prepend or append messages/messages into viewMessages
.
viewMessages:Message[] = [];
.
.
.
this.getStoredMessages().subscribe(messages=>{
const chatMessages = this.viewMessages;
this.viewMessages = [].concat(messages, chatMessages);
});
this.getNewMessage().subscribe(message=>{
const existingMessages = this.viewMessages;
this.viewMessages = [].concat(existingMessages, message);
});
Please not the sequence is maintained in both the subscriptions. Previous messages will appear first and new messages will appear afterwards.
Upvotes: 0
Reputation: 51
you can try this :
readonly messages$ = this.service.getStoredMessages().subscribe(
res => {
this.service.getNewMessages()
})
Upvotes: 0
Reputation: 2982
You can use this kind of structure:
readonly messages$ = this.service.getStoredMessages().pipe(
withLatestFrom(this.service.getNewMessages())
)
Upvotes: 0