feerlay
feerlay

Reputation: 2638

RxJS - concat last emitted value with previous values

I have a stream of click events - let's call it stream$. In the event handler, I increment a counter (started from 0) and call observer.next passing counter as an argument. When counter is equal to 3 I'm completing the stream.

So let's say that my Observable looks like this:

--a---b--c->

where a, b and c are click events

Now, I'm taking all previous elements (counter is not equal 3):

const a$ = stream$.pipe(filter(x => x !== 3))

And the last element (counter is equal 3)

const b$ = stream$.pipe(filter(x => x === 3))

Okay, so let's concat it - a$ with b$

const c$ = a$.pipe(concat(b$))

Now, subscribe to c$

c$.subscribe(console.log)

Unfortunately, it won't work. I only get 1, 2 in the console, but not 3 (last one). Can anyone explain to me why this happens?

DEMO: https://stackblitz.com/edit/typescript-iucduj?file=index.ts

CODE:

let clickCounter = 0

const stream$ = Observable.create((observer) => {
  document.addEventListener('click', () => {
    clickCounter++;
    observer.next(clickCounter)

    if (clickCounter === 3) {
      observer.complete()
    }
  })
})

const a$ = stream$.pipe(
  filter(x => x !== 3)
)

const b$ = stream$.pipe(
  filter(x => x === 3)
)

const c$ = a$.pipe(concat(b$))

c$.subscribe(console.log)

Upvotes: 2

Views: 4569

Answers (3)

martin
martin

Reputation: 96899

I think there are two things to be aware of:

  1. You're creating two 'click' event handlers because you make two subscriptions to $stream. This means that each click increments clickCounter twice which is I think what you don't want.

  2. The concat operator subscribes to the second Observable b$ only after $a completed. However at the time $a completes the value 3 was already emitted to b$ but nobody was listening.

So I think the best way to achieve what you want to use share() for $stream and then use merge instead of concat because b$ won't emit anything thanks to filter anyway.

See your updated demo: https://stackblitz.com/edit/typescript-t3i7mj?file=index.ts

const stream$ = Observable.create(
  (observer) => {
    document.addEventListener('click', () => {
      ...
    })
  })
  .pipe(
    share()
  );

...

const c$ = a$.pipe(merge(b$));

Upvotes: 1

tlt
tlt

Reputation: 15221

Aside the particular use you want to use it for (i'd listen to comments about official http replay patterns), wouldn't you be better off with a bit another approach...something like:

var stream$ = Observable.fromEvent(document, 'click').pipe(
  scan((acc:any, val:any) => [...acc,val], []),
  take(3)
);

stream$.subscribe((x) => console.log('emits: ' + x))

Upvotes: 0

siva636
siva636

Reputation: 16441

In your code, by the time you do the second filter operation, the source will have gotten emptied, and thus b$ will not get anything.

Change your code as follows to make it work as you expect:

    const stream2$ = stream$.shareReplay(3);

    const a$ = stream2$.pipe(filter(x => x !== 3));

    const b$ = stream2$.pipe(filter(x => x === 3));

    const c$ = a$.pipe(concat(b$));

    c$.subscribe(console.log);

Upvotes: 1

Related Questions