Reputation:
I 've been well with node.js until RxJS implementation.
Here is my trial code studying-
Reactive-Extensions / rxjs-node https://github.com/Reactive-Extensions/rxjs-node
rx_http.js
(RxJS wrapper of the http lib of node.js)
var Rx = require("./rx.min");
var http = require("http");
for(var k in http)
{
exports[k] = http[k];
}
exports.createServer = function ()
{
var subject = new Rx.AsyncSubject();
var observable = subject.asObservable();
observable.server = http.createServer(function (request, response)
{
subject.onNext({ request:request, response:response });
subject.onCompleted();
});
return observable;
};
server.js
var http = require('./rx_http');
// rxServer
var serverObservable = http.createServer();
var port = 3000;
serverObservable.server.listen(port);
console.log("Server listening on port: "+port);
// HTTP request event loop function
serverObservable.subscribe(function (data)
{
var req = data.request;
console.log(req.headers);
var res = data.response;
res.writeHead(200, {'Content-Type':"text/html"});
res.end("hello world");
console.log("res content out");
});
// exceptiopn
process.on('uncaughtException', function (err)
{
console.log(['Caught exception:', err.message].join(" "));
});
The code ends up with one-time 'hello world' output to browser, and the RxServer stops reacting to another access (brwoser reload etc.).
I'm on the way to learn RxJS thing, but few documentation found on the web.
Tell me what's wrong with the code, and if you know better implementations, please share. Thank you.
Upvotes: 9
Views: 5876
Reputation: 523
Use Rx.Subject instead of Rx.AsyncSubject in rx_http.js.
AsyncSubject caches the last value of onNext() and propagates it to the all observers when completed. AsyncSubject
exports.createServer = function ()
{
var subject = new Rx.Subject();
var observable = subject.asObservable();
observable.server = http.createServer(function (request, response)
{
subject.onNext({ request:request, response:response });
});
return observable;
};
Upvotes: 5
Reputation: 31
Calling oncompleted on the subject when the first request arrives ends the observable sequence. Could you please remove that line an try again.
I hope it helps.
Ahmet Ali Akkas
Upvotes: 3