LppEdd
LppEdd

Reputation: 21124

RxJs - transform array of Observables to an array of emitted values

Sorry for the title, I couldn't think of a better one.
I've got this piece of code, which basically:

  1. filter for valid (non-null) cron epressions' arrays
  2. map each cron expression to a call to a service

this.formGroup.valueChanges.pipe(
    op.filter(v => !!v.cronExpressions),
    op.map((v): string[] => v.cronExpressions),
    op.map((v: string[]) => v.map(cron =>
            this.cronService.getReadableForm(cron).pipe(
                op.map(this.toDescription),
                op.map((description): CronExpressionModel => ({ cron, description }))
            )
        )
    ),
    // What now?
).subscribe((cronExpressions: CronExpressionModel[]) => ...) // Expected result

I'd like to get, on subscribe(), the array of CronExpressionModel returned from all the services calls.

I can't wrap my head around this.


Current solution, as per Martin answer:

filter(v => !!v.cronExpressions),
map(v => v.cronExpressions),
map(cronExprs => cronExprs.map(c => this.invokeCronService(c))),
mergeMap(serviceCalls => forkJoin(serviceCalls).pipe(defaultIfEmpty([])))

Upvotes: 1

Views: 3565

Answers (2)

madjaoue
madjaoue

Reputation: 5224

To transform a stream into an array, you can use toArray operator.

Here's a suggestion:

this.formGroup.valueChanges.pipe(
    filter(v => !!v.cronExpressions),
    // transform [item1, item2...] into a stream ----item1----item2----> 
    concatMap((v): Observable<string> => from(v.cronExpressions).pipe(
        // for each of the items, make a request and wait for it to respond
        concatMap((cron: string) => this.cronService.getReadableForm(cron)),
        map(this.toDescription),
        map((description): CronExpressionModel => ({ cron, description })),
        // wait for observables to complete. When all the requests are made, 
        // return an array containing all responses
        toArray()
      )
    ).subscribe((cronExpressions: CronExpressions[]) => ...) // Expected result

Note :

You can use mergeMap instead of concatMap to parallelize the requests. But you need to know what you're doing ;)

Upvotes: 2

martin
martin

Reputation: 96891

You can just add forkJoin if you don't mind running all requests in parallel:

switchMap(observables => forkJoin(...observables))

Or if you want to run all of them in sequence:

switchMap(observables => concat(...observables).pipe(toArray()))

Instead of switchMap you might want to use concatMap or mergeMap depending on what behavior you want.

Upvotes: 2

Related Questions