user1028880
user1028880

Reputation:

RxJS + node.js HTTP server implementation?

I 've been well with node.js until RxJS implementation.

Here is my trial code studying-

Reactive-Extensions / rxjs-node https://github.com/Reactive-Extensions/rxjs-node


rx_http.js
(RxJS wrapper of the http lib of node.js)

var Rx = require("./rx.min");
var http = require("http");
for(var k in http)
{
    exports[k] = http[k];
}
exports.createServer = function ()
{
    var subject = new Rx.AsyncSubject();
    var observable = subject.asObservable();
    observable.server = http.createServer(function (request, response)
    {
       subject.onNext({ request:request, response:response });
       subject.onCompleted();
    });
    return observable;
};

server.js

var http = require('./rx_http');

// rxServer
var serverObservable = http.createServer();
var port = 3000;
serverObservable.server.listen(port);
console.log("Server listening on port: "+port);

// HTTP request event loop function
serverObservable.subscribe(function (data)
{
    var req = data.request;
    console.log(req.headers);

    var res = data.response;
    res.writeHead(200, {'Content-Type':"text/html"});
    res.end("hello world");

    console.log("res content out");
});

// exceptiopn
process.on('uncaughtException', function (err)
{
    console.log(['Caught exception:', err.message].join(" "));
});

The code ends up with one-time 'hello world' output to browser, and the RxServer stops reacting to another access (brwoser reload etc.).

I'm on the way to learn RxJS thing, but few documentation found on the web.

Tell me what's wrong with the code, and if you know better implementations, please share. Thank you.

Upvotes: 9

Views: 5876

Answers (2)

hjy
hjy

Reputation: 523

Use Rx.Subject instead of Rx.AsyncSubject in rx_http.js.

AsyncSubject caches the last value of onNext() and propagates it to the all observers when completed. AsyncSubject

exports.createServer = function ()
{
    var subject = new Rx.Subject();
    var observable = subject.asObservable();
    observable.server = http.createServer(function (request, response)
    {
       subject.onNext({ request:request, response:response });
    });
    return observable;
};

Upvotes: 5

Ahmet Ali Akkas
Ahmet Ali Akkas

Reputation: 31

Calling oncompleted on the subject when the first request arrives ends the observable sequence. Could you please remove that line an try again.

I hope it helps.

Ahmet Ali Akkas

Upvotes: 3

Related Questions