Chris
Chris

Reputation: 125

How to return RxJs observable's result as a rest response from Node.js

Scenario: The data from mutiple rest calls must be aggregated into one single object and returned as the rest response of an initial request which is served via Node.js.

Issue: The rest response is not waiting until the observable is finished thus the mutations (aggregations) are realized after the rest response has been dispatched.

//teamsController class invoked via GET /teams 

import * as Rx from 'rxjs/Rx'
import http from 'axios'
import Teams from '../models/teams'

const teamsAPI = "http://localhost:8081/api/v1/teams/players/";
const usersAPI = "http://localhost:8082/api/v1/users/";

exports.getTeamByPlayer = function (req, res) {

let username= req.get("username");

    Rx.Observable.fromPromise(fetchTeam(username))
        .map(team => {
            Rx.Observable.from(team.players).subscribe(player => {
                console.log(`Player name is ${player.username}`);
                Rx.Observable.fromPromise(fetchUser(player.username))
                    .map(avatar => avatar.avatar)
                    .subscribe(avatar => {
                        player.avatar = avatar;
                        console.log(player)
                    })
            });
            return team;
         })
        .subscribe(result => {
            console.log(`result is ${JSON.stingify(result)}`);
            res.json(result);

        })
}


/**
 * Fetch a team by a player
 *
 * @param name The name of the team
 * @returns {Promise.<Teams>}
 */
function fetchTeam(name) {
    return http.get(teamsAPI + name)
        .then(response => new Teams(response.data.data))
        .catch(error => {
            throw  new Error("todo: fill error message");
        })
}

/**
 * Fetch a user given its username
 *
 * @param username The username of the player
 * @returns {Promise.<TResult>}
 */
function fetchUser(username) {
    return new Promise(function (resolve, reject) {
        console.log(`fetching user: ${username}`);
        resolve();
    }).then(() => {
        return {
            "avatar": {
                "flagColor": "dummyValue",
                "flagCrest": "dummyValue"
            }
        }
    });

Log results:

Player name is username1
fetching user: username1
Player name is username2
fetching user: username2
result is {"id":"5a1c2a4030c39e5d88aed087","name":null,"avatar":{"flagColor":"abcdefg","flagCrest":"hijklmn"},"description":"string","motto":"string","players":[{"userId":"59b94a7b8b68ef0a048e85c1","username":"username1","status":"ACTIVE","role":"ADMIN","dateJoined":1511795264314,"dateInvited":null,"score":0},{"userId":"59b94a7b8b68ef0a048e85c1","username":"username2","status":"ACTIVE","role":"MEMBER","dateJoined":1511795264314,"dateInvited":null,"score":0}],"score":0,"type":"TEAM","open":true,"location":{"longitude":0,"latitude":0,"country":"string"},"owner":"username1","dateCreated":1511795264314}
{ userId: '59b94a7b8b68ef0a048e85c1',
  username: 'username1',
  status: 'ACTIVE',
  role: 'ADMIN',
  dateJoined: 1511795264314,
  dateInvited: null,
  score: 0,
  avatar: { flagColor: 'dummyValue', flagCrest: 'dummyValue' } }
{ userId: '59b94a7b8b68ef0a048e85c1',
  username: 'username2',
  status: 'ACTIVE',
  role: 'MEMBER',
  dateJoined: 1511795264314,
  dateInvited: null,
  score: 0,
  avatar: { flagColor: 'dummyValue', flagCrest: 'dummyValue' } }

AppServer: Node.JS v8.7.0, Middleware: expess 4.16.2 , Libs: RxJs 5.0.0-beta.12, axios 0.17.1

Upvotes: 2

Views: 4669

Answers (1)

bryan60
bryan60

Reputation: 29345

first, stop converting your http requests to promises. It makes things way more complicated than they need to be.

function fetchTeam(name) {
    return http.get(teamsAPI + name).map(res => new Teams(res.json().data));
}

function fetchUser(username) {
    return Rx.Observable.of({
        "avatar": {
            "flagColor": "dummyValue",
            "flagCrest": "dummyValue"
        }
    });
}

If some outside consumer NEEDS a promise based api, then make public functions that wrap these private functions in promises.

second, map is a synchronous operator, you can't execute an asynchronous operation inside of it, you need to use an asynchronous operator that does the subscription work for you. If you ever find yourself subscribing within an observable stream, you're doing something wrong.

let username= req.get("username");

fetchTeam(username)
    .switchMap(team => { // switchMap will subscribe to inner observables
        //build observables for fetching each avatar, pay close attention, I'm mixing the array map operator and the rx map operator in here
        let players$ = team.players.map(player => fetchUser(player.username).map(avatar => avatar.avatar)); 

        return Rx.Observable.forkJoin(players$); // use forkjoin to executre requests
     }, (team, playerAvatars) => { // use second switchMap argument to combine results
         team.players.forEach((player, idx) => player.avatar = playerAvatars[idx]);
         return team;
     })
    .subscribe(result => {
        console.log(`result is ${JSON.stingify(result)}`);
        res.json(result);

    });

Upvotes: 4

Related Questions