Reputation: 10880
I want to emit one value when the original observable completes let's say like below, using the imaginary operator mapComplete
:
let arr = ['a','b', 'c'];
from(arr)
.pipe(mapComplete(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log)
//further processed: myValue
I tried the following which work but don't seem suitable:
1.
from(arr)
.pipe(toArray())
.pipe(map(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log);
//further processed: myValue
Issue: If the original observable is a huge stream, i don't want to buffer it to an array, just to emit one value.
2.
from(arr)
.pipe(last())
.pipe(map(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log);
//further processed: myValue
issue: If the stream completes without emitting anything I get an error: [Error [EmptyError]: no elements in sequence]
What would be a correct (in rxjs terms) way to do the above?
Upvotes: 6
Views: 2362
Reputation: 14139
You can achieve this with ignoreElements
to not emit anything and endWith
to emit a value on complete.
from(arr).pipe(
ignoreElements(),
endWith('myValue'),
map(v => `further processed: ${v}`)
).subscribe(console.log);
If you want to execute a function in map
you could use count()
beforehand to emit one value on complete (the amount of values emitted).
from(arr).pipe(
count(), // could also use "reduce(() => null, 0)" or "last(null, 0)" or "takeLast(1), defaultIfEmpty(0)"
map(() => getMyValue()),
map(v => `further processed: ${v}`)
).subscribe(console.log);
Upvotes: 7
Reputation: 31125
You could also use the last()
operator with a default value. It would remove the no elements in sequence
error when the stream is empty.
from(arr).pipe(
last(null, 'myValue'), // `null` denotes no predicate
map(_ => 'myValue'), // map the last value from the stream
map((v)=>`further processed: ${v}`)
).subscribe(console.log);
Upvotes: 0
Reputation: 17762
You may achieve what you want by building your own custom operator.
The code could look like this
const emitWhenComplete = <T>(val: T) => <U>(source: Observable<U>) =>
new Observable<T>((observer) => {
return source.subscribe({
error: (err) => observer.error(err),
complete: () => {
observer.next(val);
observer.complete();
},
});
});
Basically this operator would take in the source observable, ignore all values it emits, and emit only when the source completes.
You can look at this stackblitz for some tests.
Upvotes: 0