Reputation: 35
I have this working. On receipt of a socket message, a value is emitted immediately and every second thereafter (incrementing the age), until socket received again.
However, I want it to emit every second regardless of whether socket has been received. So it would start off emitting every seconds, but when socket received the properties would change to the new ones and they would be emitted every second.
Can't quite figure out what to do.
updated: Observable<TargetDevice>;
this.updated = socketService.onMessage.pipe(
filter(
message =>
message.messageType === SocketIoMessageType.Device &&
message.data.id === this.id
),
map((message: SocketIoMessage) => <Device>message.data),
tap(d => this.setProps(d)),
switchMap(d =>
timer(0, 1000).pipe(
tap(tick => (this.age = d.age + tick)),
map(() => this)
)
)
);
Upvotes: 0
Views: 223
Reputation: 365
One of possible solutions for you problem is to create beforeSocketMessage$
stream, and limit it with takeUntil
operator.
const beforeSocketMessage$ = interval(1000).pipe(mapTo('beforeSocketMessage'), takeUntil(socketService.onMessage));
const message$ = socketService.onMessage.pipe(
filter(/* you filter*/),
map((message: SocketIoMessage) => <Device>message.data),
tap(d => this.setProps(d)),
switchMap((data) => interval(0, 1000).pipe(mapTo(data))),
tap(tick => (this.age = d.age + tick)),
mapTo(this)
)
this.update = merge(beforeSocketMessage$, message$);
Simplified version at stackblitz.com: https://stackblitz.com/edit/rxjs-jwg2cb?devtoolsheight=60
Upvotes: 0
Reputation: 14169
You'd want to use combineLatest
and startWith
to achieve your desired behaviour:
combineLatest(
socketService.onMessage.pipe(
startWith(DEFAULT_MESSAGE)
),
timer(0, 1000)
).pipe(
//...
)
Upvotes: 1
Reputation: 718
I think you want something like that. You were right with switchMap. The cool thing about it is that it does unsubscribe from the source observable after the next value has been emitted.
socketService.onMessage.pipe(
switchMap(value => interval(1000).map(() => value))
);
Check it out on RxJs Playground here
Upvotes: 0