Akxe
Akxe

Reputation: 11595

RXJS switchmap + tap like operator

I have a stream of files and I want to fill additional information about it, but I would like to present the currently obtained data to the user, as it is all that is initially visible anyway.

I want observable that:

What I have currently is awaiting the result, before emitting the files.

Set-up & current try itteration:

this.pagedFLFiles = fileService.getFiles().pipe(
    switchMap(response => concat(
        of(response),
        fileService.getAdditionalInfo(response.items).pipe(
            switchMap(() => EMPTY),
        ),
    )),
    shareReplay(1),
);

fileService.getAdditionalInfo(response.items) - it is modifing the data

getAdditionalInfo(files: FLFile[]): Observable<FLFile[]> {
    return this.api.getWithToken(token => {
        return { path: `v5/user/${token}/files/${files.map(file => file.id).join(',')}}/facilities` };
    }).pipe(
        map(information => {
            files.forEach(file => {
                const info = information[file.id];
                (Object.entries(info) as [keyof typeof info, any][]).forEach(([key, value]) => {
                    file[key] = value;
                });
            });
            return files;
        }),
    );
}

Upvotes: 2

Views: 2031

Answers (1)

timo-haas
timo-haas

Reputation: 231

Use merge instead of concat.

Concat waits for both observables, of(reponse) and getAdditionalInfo, before emitting a value.

Merge emits each time one of its observables emits.

Example: getFiles will emit each second for 3 seconds getAdditionalInfo will be cancelled 2 times (because it runs longer than 1 seond), and therefore will only modify the last emitted files array

import { merge, EMPTY, timer, of, interval } from 'rxjs';
import { finalize, switchMap, map, take, shareReplay } from 'rxjs/operators';


const fileService = {
  getFiles: () => interval(1000).pipe(
    take(3),
    map(x => {
      const items = [0, 1, 2].map(i => { return { 'info1': i }; })
      return { 'index': x, 'items': items };
    })
  ),
  getAdditionalInfo: (files) => {
    let wasModified = false;
    return timer(2000).pipe(
      map(information => {
        files.forEach(file => {
          file['info2'] = 'information' + files.length;
        });
        console.log('getAdditionalInfo: modified data');
        wasModified = true;
        return files;
      }),
      finalize(() => {
        if (!wasModified) {
          console.log('getAdditionalInfo: cancelled');
        }
      })
    );
  }
}


const pagedFLFiles = fileService.getFiles().pipe(
  switchMap(response => {
    return merge(
      of(response),
      fileService.getAdditionalInfo(response.items).pipe(
        switchMap(() => EMPTY),
      ));
  }
  ),
  shareReplay(1),
);


pagedFLFiles.subscribe(x => {
  console.log('immediate', x.index);
});

Stackblitz

Upvotes: 1

Related Questions