user3291110
user3291110

Reputation: 365

Trying to make my own RxJs observable

I'm trying to convert an existing API to work with RxJS... fairly new to node, and very new to RxJs, so please bear with me.

I have an existing API (getNextMessage), that either blocks (asynchronously), or returns a new item or error via a node-style (err, val) callback, when the something becomes available.

so it looks something like:

getNextMessage(nodeStyleCompletionCallback);

You could think of getNextMessage like an http request, that completes in the future, when the server responds, but you do need to call getNextMessage again, once a message is received, to keep getting new items from the server.

So, in order to make it into an observable collection, I have to get RxJs to keep calling my getNextMessage function until the subscriber is disposed();

Basically, I'm trying to create my own RxJs observable collection.

The problems are:

  1. I don't know how to make subscriber.dispose() kill the async.forever
  2. I probably shouldn't be using async.forever in the first place
  3. I'm not sure I should be even getting 'completed' for each message - shouldn't that be at the end of a sequence
  4. I'd like to eventually remove the need for using fromNodeCallback, to have a first class RxJS observable
  5. Clearly I'm a little confused.

Would love a bit of help, thanks!

Here is my existing code:

var Rx = require('rx');
var port = require('../lib/port');
var async = require('async');

function observableReceive(portName)
{
    var observerCallback;
    var listenPort = new port(portName);
    var disposed = false;

    var asyncReceive = function(asyncCallback)
    {
        listenPort.getNextMessage(
            function(error, json)
            {
                observerCallback(error, json);

                if (!disposed)
                    setImmediate(asyncCallback);
            }
        );
    }

    return function(outerCallback)
    {
        observerCallback = outerCallback;
        async.forever(asyncReceive);
    }
}

var receive = Rx.Observable.fromNodeCallback(observableReceive('rxtest'));
var source = receive();

var subscription = source.forEach(
    function (json)
    {
        console.log('receive completed: ' + JSON.stringify(json));
    },
    function (error) {
        console.log("receive failed: " + error.toString());
    },
    function () {
        console.log('Completed');
        subscription.dispose();
    }
);

Upvotes: 5

Views: 9058

Answers (2)

paulpdaniels
paulpdaniels

Reputation: 18663

I realize this is over a year old, but I think a better solution for this would be to make use of recursive scheduling instead:

Rx.Observable.forever = function(next, scheduler) {
  scheduler = scheduler || Rx.Scheduler.default,
  //Internally wrap the the callback into an observable
  next = Rx.Observable.fromNodeCallback(next);    

  return Rx.Observable.create(function(observer) {
    var disposable = new Rx.SingleAssignmentDisposable(),
        hasState = false;
    disposable.setDisposable(scheduler.scheduleRecursiveWithState(null, 
      function(state, self) { 
        hasState && observer.onNext(state);
        hasState = false;  
        next().subscribe(function(x){
          hasState = true;
          self(x);
        }, observer.onError.bind(observer));

      }));

    return disposable;

  });

};

The idea here is that you can schedule new items once the previous one has completed. You call next() which invokes the passed in method and when it returns a value, you schedule the next item for invocation.

You can then use it like so:

Rx.Observable.forever(getNextMessage)
.take(11)
.subscribe(function(message) {
 console.log(message);
});

See a working example here

Upvotes: 5

cwharris
cwharris

Reputation: 18125

So here's probably what I would do.

var Rx = require('Rx');

// This is just for kicks. You have your own getNextMessage to use. ;)
var getNextMessage = (function(){

  var i = 1;

  return function (callback) {
    setTimeout(function () {
      if (i > 10) {
        callback("lawdy lawd it's ova' ten, ya'll.");
      } else {
        callback(undefined, i++);
      }
    }, 5);
  };

}());

// This just makes an observable version of getNextMessage.
var nextMessageAsObservable = Rx.Observable.create(function (o) {
  getNextMessage(function (err, val) {
    if (err) {
      o.onError(err);
    } else {
      o.onNext(val);
      o.onCompleted(); 
    }
  });
});

// This repeats the call to getNextMessage as many times (11) as you want.
// "take" will cancel the subscription after receiving 11 items.
nextMessageAsObservable
  .repeat()
  .take(11)
  .subscribe(
    function (x)   { console.log('next', x);    },
    function (err) { console.log('error', err); },
    function ()    { console.log('done');       }
  );

Upvotes: 14

Related Questions