Reputation: 313
I am very new to rxjs and have read dozen of tutorials but it's still quite confusing. Let's say I have a list of user ids, and every 3 seconds I would like to query some data (online game match info) for each user from REST service. Several users might have played the same match, so getLastMatch() may return same match id for different users. I would like to group the resulting stream by match id. So, for user 1 I get match id 101, 2 -> 100, and 3 -> 101. I would like my observable to emit something like
{
[
{"match": 101, "players": [1, 3]},
{"match": 100, "players": [2]},
]
}
Here is the code I came up with so far, but I'm stuck with last line producing Observable<Observable<MatchData>>
class MatchData{
constructor(public matchId: number, public won: boolean) {}
}
const players = [1, 2, 3];
function getLastMatch(userId: number): Observable<MatchData> {
let data = new MatchData(100 + userId % 2, userId % 2 == 0);
return Observable.fromPromise(new Promise<MatchData>(resolve => resolve(data))));
}
const scheduler = Observable.interval(3000);
scheduler.map(Observable.from(players.map(p => getLastMatch(p))));
UPD:
This is what I ended up with.
class MatchData {
constructor(public playerId: number, public matchId: number) {}
}
class GroupedMatchData {
constructor(public matchId: number, public playerIds: number[]) {}
}
const accounts = [1, 2, 3];
const scheduler = Observable.interval(3000);
function getMatch(id: number): Observable<MatchData> {
let promise = new Promise<MatchData>(resolve => resolve(new MatchData(id, 100 + id % 2)));
return Observable.fromPromise(promise);
}
function requestMatchData(): Observable<GroupedMatchData> {
return Observable.from(accounts.map(account => getMatch(account)))
.mergeAll()
.groupBy(match => match.matchId, match => match.playerId)
.flatMap(group => group.reduce((accumulator, current) => [...accumulator, current], [group.key]))
.map(array => new GroupedMatchData(array[0], array.slice(1)));
}
scheduler.take(1).flatMap(requestMatchData).subscribe(console.log);
Any comments on my solution are appreciated.
Upvotes: 1
Views: 2071
Reputation: 2507
We can use Observable.forkJoin
to map list of players to the list of their last matches.
Using flatMap
we can get rid of nested Observables.
Because forkJoin
will return matches in the same order as players, we can merge each (player, match) pair into single object.
Then we can group results by matchId.
const scheduler = Observable.interval(3000);
scheduler
.flatMap(() => Observable.forkJoin(...players.map(id => getLastMatch(id))))
.map(mergeMatchesWithPlayers)
.map(groupPlayersByMatch)
.subscribe(console.log)
function mergeMatchesWithPlayers(matches: MatchData[]) {
return matches.map((match, i) => ({player: players[i], match}));
}
function groupPlayersByMatch(pairs: {player: number, match: MatchData}[]) {
const groups = [];
pairs.forEach(pair => {
const existingGroup = groups.find(group => group.match === pair.match.matchId);
existingGroup
? existingGroup.players.push(pair.player)
: groups.push({match: pair.match.matchId, players: [pair.player]})
});
return groups;
}
https://codepen.io/anon/pen/ZrPLRb
Upvotes: 1