Manuel
Manuel

Reputation: 9522

rxjs groupBy and compare each emitted object with latest of other groups

Is it possible with rxjs to groupBy a object property and when compare new emitted event of one group with the latest value of each other group? After this merge all streams together to one again.

Eg I have 5 rooms each with a light. An emitter randomly turns a light on or off in one of the rooms. I like to know in the pipe when the lights are off or on in all rooms and when add a property allLightsOff to the emitted object.

const source = interval(1000).pipe(
  map((e) => {
    return {
      iteration: e,
      room: Math.floor(Math.random()*5),
      lightOn: Math.round(Math.random()),
      allLightsOff: null
    };
  }),
  groupBy((o) => o.room)
  // how to compare one room with all others?
  // how to merge alls groups to a single stream together again?
);
const subscribe = source.subscribe((o) => console.log(o));

Upvotes: 1

Views: 229

Answers (1)

martin
martin

Reputation: 96891

I think you could do it like the following using ReplaySubject(1) and switchMap + combileLatest:

const source = interval(1000).pipe(
  map((e) => {
    return {
      iteration: e,
      room: Math.floor(Math.random()*5),
      lightOn: Math.round(Math.random()),
      allLightsOff: null
    };
  }),
  groupBy((o) => o.room, undefined, undefined, () => new ReplaySubject(1)),
  scan((allGroups, group) => [...allGroups, group], []),
  switchMap(groups => combineLatest(groups)),
  map((rooms: any[]) => {
    if (rooms.every(room => room.lightOn)) {
      console.log(rooms);
      return 'all on';
    } else if (rooms.every(room => !room.lightOn)) {
      console.log(rooms);
      return 'all off';
    }
    return 'some on, some off';
  }),
);

const subscribe = source.subscribe((o) => console.log(o));

Each new group is ReplaySubject(1) that replays its last value so when a new group is added it'll be accumulated by scan and then combileLatest will subscribe to the new list.

Live demo: https://stackblitz.com/edit/rxjs-u9w73k?devtoolsheight=60

Upvotes: 1

Related Questions