Alex Stanciu
Alex Stanciu

Reputation: 23

RxJS: Converting Node.js sockets into Observables and merge them into one stream

I'm trying to convert Node's sockets into streams using RxJS. The goal is to have each socket create it's own stream and have all streams merge into one. As new sockets connect, a stream is created with socketStream = Rx.Observable.fromEvent(socket, 'message').

Then the stream is merged into a master stream with something like

mainStream = mainStream.merge(socketStream)

This appears to work fine, the problem is that after 200-250 client connections, the server throws RangeError: Maximum call stack size exceeded.

I have sample server and client code that demonstrates this behavior on a gist here: Sample Server and Client

I suspect that as clients connect/disconnect, the main stream doesn't get cleaned up properly.

Upvotes: 1

Views: 1636

Answers (1)

paulpdaniels
paulpdaniels

Reputation: 18663

The problem is that you are merging your Observable recursively. Every time you do

cmdStream = cmdStream.merge(socketStream);

You are creating a new MergeObservable/MergeObserver pair.

Taking a look at the source, you can see that what you are basically doing with each subscription is subscribing to each of your previous streams in sequence so it shouldn't be hard to see that at around 250 connections your call stack is probably at least 1000 calls deep.

A better way to approach this would be to convert use the flatMap operator and think of your connections as creating an Observable of Observables.

//Turn the connections themselves into an Observable
var connections = Rx.Observable.fromEvent(server, 'connection', 
                    socket => new JsonSocket(socket));

connections
  //flatten the messages into their own Observable
  .flatMap(socket => {
    return Rx.Observable.fromEvent(socket.__socket, 'message')
           //Handle the socket closing as well
           .takeUntil(Rx.Observable.fromEvent(socket.__socket, 'close'));
  }, (socket, msg) => { 
    //Transform each message to include the socket as well.
    return { socket : socket.__socket, data : msg}; 
  })
  .subscribe(processData, handleError);

The above I haven't tested but should fix your SO error.

I would probably also question the overall design of this. What exactly are you gaining by merging all the Observables together? You are still differentiating them by passing the socket object along with the message so it would seem these could all be distinct streams.

Upvotes: 4

Related Questions