shavenwalrus
shavenwalrus

Reputation: 35

Emit once and then every second thereafter until value changes

I have this working. On receipt of a socket message, a value is emitted immediately and every second thereafter (incrementing the age), until socket received again.

However, I want it to emit every second regardless of whether socket has been received. So it would start off emitting every seconds, but when socket received the properties would change to the new ones and they would be emitted every second.

Can't quite figure out what to do.

updated: Observable<TargetDevice>;

this.updated = socketService.onMessage.pipe(
  filter(
    message =>
      message.messageType === SocketIoMessageType.Device &&
      message.data.id === this.id
  ),
  map((message: SocketIoMessage) => <Device>message.data),
  tap(d => this.setProps(d)),
  switchMap(d =>
    timer(0, 1000).pipe(
      tap(tick => (this.age = d.age + tick)),
      map(() => this)
    )
  )
);

Upvotes: 0

Views: 223

Answers (3)

Maksim Romanenko
Maksim Romanenko

Reputation: 365

One of possible solutions for you problem is to create beforeSocketMessage$ stream, and limit it with takeUntil operator.

const beforeSocketMessage$ = interval(1000).pipe(mapTo('beforeSocketMessage'), takeUntil(socketService.onMessage));


const message$ = socketService.onMessage.pipe(
    filter(/* you filter*/),
    map((message: SocketIoMessage) => <Device>message.data),
    tap(d => this.setProps(d)),
    switchMap((data) => interval(0, 1000).pipe(mapTo(data))),
    tap(tick => (this.age = d.age + tick)),
    mapTo(this)
)

this.update = merge(beforeSocketMessage$, message$);

Simplified version at stackblitz.com: https://stackblitz.com/edit/rxjs-jwg2cb?devtoolsheight=60

Upvotes: 0

ggradnig
ggradnig

Reputation: 14169

You'd want to use combineLatest and startWith to achieve your desired behaviour:

combineLatest(
    socketService.onMessage.pipe(
        startWith(DEFAULT_MESSAGE)
    ),
    timer(0, 1000)
).pipe(
    //...
)

Upvotes: 1

M4R1KU
M4R1KU

Reputation: 718

I think you want something like that. You were right with switchMap. The cool thing about it is that it does unsubscribe from the source observable after the next value has been emitted.

socketService.onMessage.pipe(
    switchMap(value => interval(1000).map(() => value))
);

Check it out on RxJs Playground here

Upvotes: 0

Related Questions