Sebastian Edelmeier
Sebastian Edelmeier

Reputation: 4167

Reactively chaining requests

I'm new to reactive programming and stuck at a probably simple point. I have two methods returning observables.

GetQueues(): Observable<Queue[]>{...}

and

GetMessageCount(queue: Queue): Observable<number>{...}

Now I want to chain these in a method with the following signature

GetAllQueueMessageCount(): Observable<number>{...}

As you can imagine, I want to call the first method to read a list of queues, iterate over the result and call the second one for each of them.

I can imagine something like the following, but as you see, this doesn't return what the signature expects:

public GetAllQueueMessageCount(): Observable<number> {
    var messageCount = 0;
    this.GetQueues()
        .subscribe(queues => {
            var queueCountRequests = [];
            queues.forEach((queue) => {
                queueCountRequests.push(this.GetQueueMessageCount(queue));
            });
            Observable.forkJoin(queueCountRequests)
                .subscribe(t => {
                    t.forEach(
                        count => messageCount = messageCount + (count as number));
                });
        }, error => Observable.throw(error));
}

All my attempts using flatMap resulted in the same.

Upvotes: 1

Views: 111

Answers (2)

Olaf Horstmann
Olaf Horstmann

Reputation: 16892

Two general rules related to your issue:

  1. Don't use subscribe manually if you can avoid it, which in your case can most likely be avoided.

  2. Avoid client-side-calculations like this, because sending multiple REST-calls (which is assume you are doing) is very heavy on the connection as well as the server and usually takes longer, because there is a limit of how many parallel requests can be done. - So I would strongly suggest to implement a separate endoint for something like this.

That being said, here is how I would write that stream


public GetAllQueueMessageCount(): Observable<number> {
    return this.GetQueues()
        .mergeAll() // split up the array into seperate emissions
        .concatMap(queue => this.GetQueueMessageCount(workflowId, queue.Name))
        .toArray() // optional if you want one single result-emit use it, if you want a new emission for every queueMessageCount, remove it
        .map(msgCounts => msgCounts.reduce((sum,num) => sum + num, 0));
}

Upvotes: 1

Richard Matsen
Richard Matsen

Reputation: 23543

I think this is the structure you're looking for.

Since you're returning an observable, general rule is use map() or a form of it such as flatMap() instead of subscribe(), and use returns at each level.

const GetAllQueueMessageCount(): Observable<number> {
  return this.GetQueues()
    .map(queues => {
      const queueCountRequests = queues
        .map(queue => this.GetQueueMessageCount(workflowId, queue.Name) );
      return Observable.forkJoin(queueCountRequests)
        .map(
          results: number[] => results.reduce((sum,num) => sum + num, 0))
          error => Observable.throw(error)
         )
}

Upvotes: 2

Related Questions