Reputation: 12900
I need to create a sequence of messages.
Now I finished with:
var math = require('mathjs')
var Rx = require('rx')
var _ = require('lodash')
var messagesSubject = new Rx.Subject()
var messagesPool = messagesSubject.map(function() { return [el]}).scan([], _.union)
Rx.Observable
.interval(500 /* ms */)
.timeInterval()
.filter(
function() { return math.randomInt(10) > 8;}
)
.do(function(x) {
messagesSubject.subscribe(function(msg) {
console.log('subscriber ' + x.value + ' do something with ' + msg.text)
})
}).subscribe()
Rx.Observable
.interval(500 /* ms */)
.timeInterval()
.filter(
function() { return math.randomInt(10) > 2;}
)
.map(function() {
return { text: math.pickRandom(['one', 'two', 'three'])}
}).subscribe(messagesSubject)
How can I notify every new subscriber with all the previous message(messagesPool
)?
Side questions:Is it valid use case of subject? Or should I choose another type of subject?
Upvotes: 0
Views: 109
Reputation: 10783
An example of the code without the use of subjects, unit tested with Replay semantics and transient subscribers. Runs on node with node-unit
(windows cmds)
npm install rx
npm install node-unit
.\node_modules\.bin\nodeunit.cmd tests
and the code in the test
directory.
var Rx = require('rx')
var onNext = Rx.ReactiveTest.onNext,
onError = Rx.ReactiveTest.onError,
onCompleted = Rx.ReactiveTest.onCompleted,
subscribe = Rx.ReactiveTest.subscribe;
exports.testingReplayWithTransientSubscribers = function(test){
//Declare that we expect to have 3 asserts enforced.
test.expect(3);
//Control time with a test scheduler
var scheduler = new Rx.TestScheduler();
//Create our known message that will be published at known times (all times in milliseconds).
var messages = scheduler.createColdObservable(
onNext(0500, 'one'),
onNext(1000, 'two'),
onNext(2000, 'three'),
onNext(3500, 'four'),
onNext(4000, 'five')
);
//Replay all messages, and connect the reply decorator.
var replay = messages.replay();
var connection = replay.connect();
//Create 3 observers to subscribe/unsubscribe at various times.
var observerA = scheduler.createObserver();
var observerB = scheduler.createObserver();
var observerC = scheduler.createObserver();
//Subscribe immediately
var subA = replay.subscribe(observerA);
//Subscribe late, missing 1 message
var subB = Rx.Disposable.empty;
scheduler.scheduleAbsolute(null, 0800, function(){subB = replay.subscribe(observerB);});
//Subscribe late, and dispose before any live message happen
var subC = Rx.Disposable.empty;
scheduler.scheduleAbsolute(null, 1100, function(){subC = replay.subscribe(observerC);});
scheduler.scheduleAbsolute(null, 1200, function(){subC.dispose();});
//Dispose early
scheduler.scheduleAbsolute(null, 3000, function(){subB.dispose();});
//Start virutal time. Run through all the scheduled work (publishing messages, subscribing and unsubscribing)
scheduler.start();
//Assert our assumptions.
test.deepEqual(observerA.messages, [
onNext(0500, 'one'),
onNext(1000, 'two'),
onNext(2000, 'three'),
onNext(3500, 'four'),
onNext(4000, 'five')
],
"ObserverA should receive all values");
test.deepEqual(observerB.messages, [
onNext(0800, 'one'),
onNext(1000, 'two'),
onNext(2000, 'three'),
],
"ObserverB should receive initial value on subscription, and then two live values");
test.deepEqual(observerC.messages, [
onNext(1100, 'one'),
onNext(1100, 'two'),
],
"ObserverC should only receive initial values on subscription");
test.done();
};
Upvotes: 1
Reputation: 10783
As others have pointed out, ReplaySubject
can be your friend here.
This may mean that you can remove your message pool feature.
You can also get rid of the subject entirely if you just compose your queries:
var math = require('mathjs')
var Rx = require('rx')
var _ = require('lodash')
var messages = Rx.Observable
.interval(500 /* ms */)
.timeInterval()
.filter(
function() { return math.randomInt(10) > 2;}
)
.map(function() {
return { text: math.pickRandom(['one', 'two', 'three'])}
})
.replay();
//Randomly add subscribers (but this would only be dummy code, not suitable for prod)
var randomSubsriberAdder = Rx.Observable
.interval(500 /* ms */)
.timeInterval()
.filter(
function() { return math.randomInt(10) > 8;}
)
.subscribe(function(x) {
messages.subscribe(function(msg) {
console.log('subscriber ' + x.value + ' do something with ' + msg.text);
var connection = messages.Connect();
//messages will now be collecting all values.
// Late subscribers will get all previous values.
// As new values are published, existing subscribers will get the new value.
You may be better off using hard coded sets of data and the Rx Testing tools/libs. This way you will have control over which edge cases you are testing (early subscriber, late subscriber, disconnecting subscriber, silence on the stream etc..)
Upvotes: 1
Reputation: 84804
Sounds like you're looking for ReplaySubject
rather than Subject
.
[ReplaySubject is a] Subject that buffers all items it observes and replays them to any Observer that subscribes.
Upvotes: 2