Marinos An
Marinos An

Reputation: 10880

How should I emit a single value when observable completes?

I want to emit one value when the original observable completes let's say like below, using the imaginary operator mapComplete:

let arr = ['a','b', 'c'];

from(arr)
.pipe(mapComplete(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log)
//further processed: myValue

I tried the following which work but don't seem suitable:

1.

from(arr)
.pipe(toArray())
.pipe(map(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log);
//further processed: myValue

Issue: If the original observable is a huge stream, i don't want to buffer it to an array, just to emit one value.

2.

from(arr)
.pipe(last())
.pipe(map(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log);
//further processed: myValue

issue: If the stream completes without emitting anything I get an error: [Error [EmptyError]: no elements in sequence]

What would be a correct (in rxjs terms) way to do the above?

Upvotes: 6

Views: 2362

Answers (3)

frido
frido

Reputation: 14139

You can achieve this with ignoreElements to not emit anything and endWith to emit a value on complete.

from(arr).pipe(
  ignoreElements(),
  endWith('myValue'),
  map(v => `further processed: ${v}`)
).subscribe(console.log);

If you want to execute a function in map you could use count() beforehand to emit one value on complete (the amount of values emitted).

from(arr).pipe(
  count(), // could also use "reduce(() => null, 0)" or "last(null, 0)" or "takeLast(1), defaultIfEmpty(0)" 
  map(() => getMyValue()),
  map(v => `further processed: ${v}`)
).subscribe(console.log);

Upvotes: 7

Barremian
Barremian

Reputation: 31125

You could also use the last() operator with a default value. It would remove the no elements in sequence error when the stream is empty.

from(arr).pipe(
  last(null, 'myValue'),  // `null` denotes no predicate
  map(_ => 'myValue'),    // map the last value from the stream
  map((v)=>`further processed: ${v}`)
).subscribe(console.log);

Upvotes: 0

Picci
Picci

Reputation: 17762

You may achieve what you want by building your own custom operator.

The code could look like this

const emitWhenComplete = <T>(val: T) => <U>(source: Observable<U>) =>
  new Observable<T>((observer) => {
    return source.subscribe({
      error: (err) => observer.error(err),
      complete: () => {
        observer.next(val);
        observer.complete();
      },
    });
  });

Basically this operator would take in the source observable, ignore all values it emits, and emit only when the source completes.

You can look at this stackblitz for some tests.

Upvotes: 0

Related Questions