Reputation: 2638
I have a stream of click events - let's call it stream$. In the event handler, I increment a counter (started from 0) and call observer.next passing counter as an argument. When counter is equal to 3 I'm completing the stream.
So let's say that my Observable looks like this:
--a---b--c->
where a, b and c are click events
Now, I'm taking all previous elements (counter is not equal 3):
const a$ = stream$.pipe(filter(x => x !== 3))
And the last element (counter is equal 3)
const b$ = stream$.pipe(filter(x => x === 3))
Okay, so let's concat it - a$ with b$
const c$ = a$.pipe(concat(b$))
Now, subscribe to c$
c$.subscribe(console.log)
Unfortunately, it won't work. I only get 1, 2 in the console, but not 3 (last one). Can anyone explain to me why this happens?
DEMO: https://stackblitz.com/edit/typescript-iucduj?file=index.ts
CODE:
let clickCounter = 0
const stream$ = Observable.create((observer) => {
document.addEventListener('click', () => {
clickCounter++;
observer.next(clickCounter)
if (clickCounter === 3) {
observer.complete()
}
})
})
const a$ = stream$.pipe(
filter(x => x !== 3)
)
const b$ = stream$.pipe(
filter(x => x === 3)
)
const c$ = a$.pipe(concat(b$))
c$.subscribe(console.log)
Upvotes: 2
Views: 4569
Reputation: 96899
I think there are two things to be aware of:
You're creating two 'click'
event handlers because you make two subscriptions to $stream
. This means that each click increments clickCounter
twice which is I think what you don't want.
The concat
operator subscribes to the second Observable b$
only after $a
completed. However at the time $a
completes the value 3
was already emitted to b$
but nobody was listening.
So I think the best way to achieve what you want to use share()
for $stream
and then use merge
instead of concat
because b$
won't emit anything thanks to filter
anyway.
See your updated demo: https://stackblitz.com/edit/typescript-t3i7mj?file=index.ts
const stream$ = Observable.create(
(observer) => {
document.addEventListener('click', () => {
...
})
})
.pipe(
share()
);
...
const c$ = a$.pipe(merge(b$));
Upvotes: 1
Reputation: 15221
Aside the particular use you want to use it for (i'd listen to comments about official http replay patterns), wouldn't you be better off with a bit another approach...something like:
var stream$ = Observable.fromEvent(document, 'click').pipe(
scan((acc:any, val:any) => [...acc,val], []),
take(3)
);
stream$.subscribe((x) => console.log('emits: ' + x))
Upvotes: 0
Reputation: 16441
In your code, by the time you do the second filter operation, the source will have gotten emptied, and thus b$ will not get anything.
Change your code as follows to make it work as you expect:
const stream2$ = stream$.shareReplay(3);
const a$ = stream2$.pipe(filter(x => x !== 3));
const b$ = stream2$.pipe(filter(x => x === 3));
const c$ = a$.pipe(concat(b$));
c$.subscribe(console.log);
Upvotes: 1