Reputation: 9522
Is it possible with rxjs to groupBy a object property and when compare new emitted event of one group with the latest value of each other group? After this merge all streams together to one again.
Eg I have 5 rooms each with a light. An emitter randomly turns a light on or off in one of the rooms. I like to know in the pipe when the lights are off or on in all rooms and when add a property allLightsOff to the emitted object.
const source = interval(1000).pipe(
map((e) => {
return {
iteration: e,
room: Math.floor(Math.random()*5),
lightOn: Math.round(Math.random()),
allLightsOff: null
};
}),
groupBy((o) => o.room)
// how to compare one room with all others?
// how to merge alls groups to a single stream together again?
);
const subscribe = source.subscribe((o) => console.log(o));
Upvotes: 1
Views: 229
Reputation: 96891
I think you could do it like the following using ReplaySubject(1)
and switchMap
+ combileLatest
:
const source = interval(1000).pipe(
map((e) => {
return {
iteration: e,
room: Math.floor(Math.random()*5),
lightOn: Math.round(Math.random()),
allLightsOff: null
};
}),
groupBy((o) => o.room, undefined, undefined, () => new ReplaySubject(1)),
scan((allGroups, group) => [...allGroups, group], []),
switchMap(groups => combineLatest(groups)),
map((rooms: any[]) => {
if (rooms.every(room => room.lightOn)) {
console.log(rooms);
return 'all on';
} else if (rooms.every(room => !room.lightOn)) {
console.log(rooms);
return 'all off';
}
return 'some on, some off';
}),
);
const subscribe = source.subscribe((o) => console.log(o));
Each new group is ReplaySubject(1)
that replays its last value so when a new group is added it'll be accumulated by scan
and then combileLatest
will subscribe to the new list.
Live demo: https://stackblitz.com/edit/rxjs-u9w73k?devtoolsheight=60
Upvotes: 1