Reputation: 4167
I'm new to reactive programming and stuck at a probably simple point. I have two methods returning observables.
GetQueues(): Observable<Queue[]>{...}
and
GetMessageCount(queue: Queue): Observable<number>{...}
Now I want to chain these in a method with the following signature
GetAllQueueMessageCount(): Observable<number>{...}
As you can imagine, I want to call the first method to read a list of queues, iterate over the result and call the second one for each of them.
I can imagine something like the following, but as you see, this doesn't return what the signature expects:
public GetAllQueueMessageCount(): Observable<number> {
var messageCount = 0;
this.GetQueues()
.subscribe(queues => {
var queueCountRequests = [];
queues.forEach((queue) => {
queueCountRequests.push(this.GetQueueMessageCount(queue));
});
Observable.forkJoin(queueCountRequests)
.subscribe(t => {
t.forEach(
count => messageCount = messageCount + (count as number));
});
}, error => Observable.throw(error));
}
All my attempts using flatMap resulted in the same.
Upvotes: 1
Views: 111
Reputation: 16892
Two general rules related to your issue:
Don't use subscribe manually if you can avoid it, which in your case can most likely be avoided.
Avoid client-side-calculations like this, because sending multiple REST-calls (which is assume you are doing) is very heavy on the connection as well as the server and usually takes longer, because there is a limit of how many parallel requests can be done. - So I would strongly suggest to implement a separate endoint for something like this.
That being said, here is how I would write that stream
public GetAllQueueMessageCount(): Observable<number> {
return this.GetQueues()
.mergeAll() // split up the array into seperate emissions
.concatMap(queue => this.GetQueueMessageCount(workflowId, queue.Name))
.toArray() // optional if you want one single result-emit use it, if you want a new emission for every queueMessageCount, remove it
.map(msgCounts => msgCounts.reduce((sum,num) => sum + num, 0));
}
Upvotes: 1
Reputation: 23543
I think this is the structure you're looking for.
Since you're returning an observable, general rule is use map()
or a form of it such as flatMap()
instead of subscribe()
, and use returns at each level.
const GetAllQueueMessageCount(): Observable<number> {
return this.GetQueues()
.map(queues => {
const queueCountRequests = queues
.map(queue => this.GetQueueMessageCount(workflowId, queue.Name) );
return Observable.forkJoin(queueCountRequests)
.map(
results: number[] => results.reduce((sum,num) => sum + num, 0))
error => Observable.throw(error)
)
}
Upvotes: 2