Kiwi
Kiwi

Reputation: 2773

Rxjs observables reducing

I'm working on processing of a string, but some functions are observables, so I'm trying to figure out an easy way to do this.

Say i have the following class:

class Something {
  doSomething(input: string): Observable<string> {
    return of('123 - ' + input);
  }
}

where the of('123 - ' + input) is some service (like http.get()) that's been called and returns a string.

And I'd like to append each result of a service to the result of the next one, until the last is reached. But I have no clue on how to do this.

Latest what I've got was the following:

return of(
  new Something(),
  new Something(),
  new Something(),
)
  .pipe(
    reduce(
      (acc, curr, i) => {
        console.log('NEXT', acc);
        if(!(curr instanceof Observable)){
          acc = of(acc);
        }
        return acc.pipe(map(c => curr.doSomething(c)));
      },
      'Start string: '
  );

this would then result in Start string: 123 - 123 - 123 -, but this is not the case. and I'm running out of idea's to get this working

Here is a basic stackblitz to get started, if you'd like to try some things: https://stackblitz.com/edit/typescript-htcna8?file=index.ts

Upvotes: 0

Views: 170

Answers (3)

paulpdaniels
paulpdaniels

Reputation: 18663

Seems like you want mergeScan:

const source = [
  new Something(),
  new Something(),
  new Something(),
  new Something()
];

from(source).pipe(
  // Scan over all the inner streams passing each result to the next call
  mergeScan((acc, s) => s.doSomething(acc), '', 1),
  // The above emits partial results, so ignore all except the last one.
  last()
).subscribe(console.log);

// Expected output: 123 - 123 - 123 - 123

Docs here

Applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, then each intermediate Observable returned is merged into the output Observable.

It's like scan, but the Observables returned by the accumulator are merged into the outer Observable.

Upvotes: 1

Yanis-git
Yanis-git

Reputation: 7875

Because i'am not sure about what you expect, i have prepare 3 cases for you :

// Expected output: One next with ["123 - 1", "123 - 2", "123 - 3", "123 - 4"]
forkJoin(source).subscribe(val => console.log(val));

// Expected output: Many next with : 
// * 123 - 1
// * 123 - 2
// * 123 - 3
// * 123 - 4
forkJoin(source).pipe(concatAll()).subscribe(val => console.log(val));

// Expected output : One next with string "123 - 1, 123 - 2, 123 - 3, 123 - 4"
forkJoin(source).pipe(map((e) => {
  return e.join(', ');
})).subscribe(val => console.log(val));

Sample : https://stackblitz.com/edit/typescript-s3yr3h?file=index.ts

Upvotes: -1

Kiwi
Kiwi

Reputation: 2773

Found out you can do the following:

const source = [
  new Something(),
  new Something(),
  new Something(),
  new Something()
];

let chain = source[0].doSomething('Initial string');
source.splice(0, 1);

source.forEach(r => {
  chain = chain.pipe(
    switchMap(s => r.doSomething(s))
  );
});

Upvotes: 1

Related Questions