kharandziuk
kharandziuk

Reputation: 12900

Subjects. Messages listeners example

I need to create a sequence of messages.

Now I finished with:

var math = require('mathjs')
var Rx = require('rx')
var _ = require('lodash')

var messagesSubject = new Rx.Subject()
var messagesPool = messagesSubject.map(function() { return [el]}).scan([], _.union)


Rx.Observable
  .interval(500 /* ms */)
  .timeInterval()

  .filter(
    function() { return math.randomInt(10) > 8;}
  )
  .do(function(x) {
    messagesSubject.subscribe(function(msg) {
      console.log('subscriber ' + x.value + ' do something with ' + msg.text)
    })
  }).subscribe()

Rx.Observable
  .interval(500 /* ms */)
  .timeInterval()
  .filter(
    function() { return math.randomInt(10) > 2;}
  )
  .map(function() {
    return { text: math.pickRandom(['one', 'two', 'three'])}
  }).subscribe(messagesSubject)

How can I notify every new subscriber with all the previous message(messagesPool)?

Side questions:Is it valid use case of subject? Or should I choose another type of subject?

Upvotes: 0

Views: 109

Answers (3)

Lee Campbell
Lee Campbell

Reputation: 10783

An example of the code without the use of subjects, unit tested with Replay semantics and transient subscribers. Runs on node with node-unit

(windows cmds)

npm install rx
npm install node-unit
.\node_modules\.bin\nodeunit.cmd tests

and the code in the test directory.

var Rx = require('rx')

var onNext = Rx.ReactiveTest.onNext,
    onError = Rx.ReactiveTest.onError,
    onCompleted = Rx.ReactiveTest.onCompleted,
    subscribe = Rx.ReactiveTest.subscribe;

exports.testingReplayWithTransientSubscribers = function(test){
    //Declare that we expect to have 3 asserts enforced.
    test.expect(3);

    //Control time with a test scheduler
    var scheduler = new Rx.TestScheduler();
    //Create our known message that will be published at known times (all times in milliseconds).
    var messages = scheduler.createColdObservable(
        onNext(0500, 'one'),
        onNext(1000, 'two'),
        onNext(2000, 'three'),
        onNext(3500, 'four'),
        onNext(4000, 'five')
    );

    //Replay all messages, and connect the reply decorator.
    var replay = messages.replay();
    var connection = replay.connect();

    //Create 3 observers to subscribe/unsubscribe at various times.
    var observerA = scheduler.createObserver();
    var observerB = scheduler.createObserver();
    var observerC = scheduler.createObserver();

    //Subscribe immediately
    var subA = replay.subscribe(observerA);
    //Subscribe late, missing 1 message
    var subB = Rx.Disposable.empty;
    scheduler.scheduleAbsolute(null, 0800, function(){subB = replay.subscribe(observerB);});
    //Subscribe late, and dispose before any live message happen
    var subC = Rx.Disposable.empty;
    scheduler.scheduleAbsolute(null, 1100, function(){subC = replay.subscribe(observerC);});
    scheduler.scheduleAbsolute(null, 1200, function(){subC.dispose();});
    //Dispose early 
    scheduler.scheduleAbsolute(null, 3000, function(){subB.dispose();});


    //Start virutal time. Run through all the scheduled work (publishing messages, subscribing and unsubscribing)
    scheduler.start();

    //Assert our assumptions.
    test.deepEqual(observerA.messages, [
            onNext(0500, 'one'),
            onNext(1000, 'two'),
            onNext(2000, 'three'),
            onNext(3500, 'four'),
            onNext(4000, 'five')
        ], 
        "ObserverA should receive all values");
    test.deepEqual(observerB.messages, [
            onNext(0800, 'one'),
            onNext(1000, 'two'),
            onNext(2000, 'three'),
        ], 
        "ObserverB should receive initial value on subscription, and then two live values");
    test.deepEqual(observerC.messages, [
            onNext(1100, 'one'),
            onNext(1100, 'two'),
        ], 
        "ObserverC should only receive initial values on subscription");
    test.done();
};

Upvotes: 1

Lee Campbell
Lee Campbell

Reputation: 10783

As others have pointed out, ReplaySubject can be your friend here.

This may mean that you can remove your message pool feature.

You can also get rid of the subject entirely if you just compose your queries:

var math = require('mathjs')
var Rx = require('rx')
var _ = require('lodash')

var messages = Rx.Observable
  .interval(500 /* ms */)
  .timeInterval()
  .filter(
    function() { return math.randomInt(10) > 2;}
  )
  .map(function() {
    return { text: math.pickRandom(['one', 'two', 'three'])}
  })
  .replay();

//Randomly add subscribers (but this would only be dummy code, not suitable for prod)
var randomSubsriberAdder = Rx.Observable
  .interval(500 /* ms */)
  .timeInterval()
  .filter(
    function() { return math.randomInt(10) > 8;}
  )
  .subscribe(function(x) {
    messages.subscribe(function(msg) {
      console.log('subscriber ' + x.value + ' do something with ' + msg.text);

var connection = messages.Connect();
//messages will now be collecting all values.
//  Late subscribers will get all previous values.
//  As new values are published, existing subscribers will get the new value.

You may be better off using hard coded sets of data and the Rx Testing tools/libs. This way you will have control over which edge cases you are testing (early subscriber, late subscriber, disconnecting subscriber, silence on the stream etc..)

Upvotes: 1

Richard Szalay
Richard Szalay

Reputation: 84804

Sounds like you're looking for ReplaySubject rather than Subject.

[ReplaySubject is a] Subject that buffers all items it observes and replays them to any Observer that subscribes.

Upvotes: 2

Related Questions