Reputation: 39168
I'm playing with Reactive Programming, using RxJS, and stumbled upon something I'm not sure how to solve.
Let's say we implement a vending machine. You insert a coin, select an item, and the machine dispenses an item and returns change. We'll assume that price is always 1 cent, so inserting a quarter (25 cents) should return 24 cents back, and so on.
The "tricky" part is that I'd like to be able to handle cases like user inserting 2 coins and then selecting an item. Or selecting an item without inserting a coin.
It seems natural to implement inserted coins and selected items as streams. We can then introduce some sort of dependency between these 2 actions — merging or zipping or combining latest.
However, I quickly ran into an issue where I'd like coins to be accumulated up until an item is dispensed but not further. AFAIU, this means I can't use sum
or scan
since there's no way to "reset" previous accumulation at some point.
Here's an example diagram:
coins: ---25---5-----10------------|->
acc: ---25---30----40------------|->
items: ------------foo-----bar-----|->
combined: ---------30,foo--40,bar--|->
change:------------29------39------|->
And a corresponding code:
this.getCoinsStream()
.scan(function(sum, current) { return sum + current })
.combineLatest(this.getSelectedItemsStream())
.subscribe(function(cents, item) {
dispenseItem(item);
dispenseChange(cents - 1);
});
25 and 5 cents were inserted and then "foo" item was selected. Accumulating coins and then combining latest would lead to "foo" being combined with "30" (which is correct) and then "bar" with "40" (which is incorrect; should be "bar" and "10").
I looked through all of the methods for grouping and filtering and don't see anything that I can use.
An alternative solution I could use is to accumulate coins separately. But this introduces state outside of a stream and I'd really like to avoid that:
var centsDeposited = 0;
this.getCoinsStream().subscribe(function(cents) {
return centsDeposited += cents;
});
this.getSelectedItemsStream().subscribe(function(item) {
dispenseItem(item);
dispenseChange(centsDeposited - 1);
centsDeposited = 0;
});
Moreover, this doesn't allow for making streams dependent on each other, such as to wait for coin to be inserted until selected action can return an item.
Am I missing already existing method? What's the best way to achieve something like this — accumulating values up until the moment when they need to be merged with another stream, but also waiting for at least 1 value in 1st stream before merging it with the one from the 2nd?
Upvotes: 1
Views: 3288
Reputation: 7546
You can also use Window to group together multiple coin events, and use item selection as the window boundary.
Next we can use zip to acquire the item value.
Notice we instantly try to give out items. So the user does have to insert coins before he decide on an item.
Notice i decided to publish both selectedStream
and dispenser
for safety reasons, we don't want to cause a race-condition where events fire while we're building up the query and zip becomes unbalanced. That would be a very rare condition, but notice that when our sources had been cold Observables, they pretty much start generating as soon as we subscribe, and we must use Publish
to safeguard ourselves.
(Shamelessly stolen paulpdaniels example code).
var coinStream = Rx.Observable.merge(
Rx.Observable.fromEvent($('#add5'), 'click').map(5),
Rx.Observable.fromEvent($('#add10'), 'click').map(10),
Rx.Observable.fromEvent($('#add25'), 'click').map(25)
);
var selectedStream = Rx.Observable.merge(
Rx.Observable.fromEvent($('#coke'), 'click').map('Coke'),
Rx.Observable.fromEvent($('#sprite'), 'click').map('Sprite')
).publish();
var $selection = $('#selection');
var $change = $('#change');
function dispense(selection) {
$selection.text('Dispensed: ' + selection);
console.log("Dispensing Drink: " + selection);
}
function dispenseChange(change) {
$change.text('Dispensed change: ' + change);
console.log("Dispensing Change: " + change);
}
// Build the query.
var dispenser = Rx.Observable.zip(
coinStream
.window(selectedStream)
.flatMap(ob => ob.reduce((acc, cur) => acc + cur, 0)),
selectedStream,
(coins, selection) => ({coins : coins, selection: selection})
).filter(pay => pay.coins != 0) // Do not give out items if there are no coins.
.publish();
var dispose = new Rx.CompositeDisposable(
//Dole out the change
dispenser
.pluck('coins')
.map(function(c) { return c - 1;})
.subscribe(dispenseChange),
//Get the selection for dispensation
dispenser
.pluck('selection')
.subscribe(dispense),
//Wire it up
dispenser.connect(),
selectedStream.connect()
);
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
<button id="coke">Coke</button>
<button id="sprite">Sprite</button>
<button id="add5">5</button>
<button id="add10">10</button>
<button id="add25">25</button>
<div id="change"></div>
<div id="selection"></div>
Upvotes: 0
Reputation: 18665
Generally speaking you have the following set of equations:
inserted_coins :: independent source
items :: independent source
accumulated_coins :: sum(inserted_coins)
accumulated_paid :: sum(price(items))
change :: accumulated_coins - accumulated_paid
coins_in_machine :: when items : 0, when inserted_coins : sum(inserted_coins) starting after last emission of item
The hard part is coins_in_machine
. You need to switch the source observable based on some emissions from two sources.
function emits ( who ) {
return function ( x ) { console.log([who, ": "].join(" ") + x);};
}
function sum ( a, b ) {return a + b;}
var inserted_coins = Rx.Observable.fromEvent(document.getElementById("insert"), 'click').map(function ( x ) {return 15;});
var items = Rx.Observable.fromEvent(document.getElementById("item"), 'click').map(function ( x ) {return "snickers";});
console.log("running");
var accumulated_coins = inserted_coins.scan(sum);
var coins_in_machine =
Rx.Observable.merge(
items.tap(emits("items")).map(function ( x ) {return {value : x, flag : 1};}),
inserted_coins.tap(emits("coins inserted ")).map(function ( x ) {return {value : x, flag : 0};}))
.distinctUntilChanged(function(x){return x.flag;})
.flatMapLatest(function ( x ) {
switch (x.flag) {
case 1 :
return Rx.Observable.just(0);
case 0 :
return inserted_coins.scan(sum, x.value).startWith(x.value);
}
}
).startWith(0);
coins_in_machine.subscribe(emits("coins in machine"));
jsbin : http://jsbin.com/mejoneteyo/edit?html,js,console,output
[UPDATE]
Explanations:
coins_in_machine
. When it is the the insert_coins
we want to sum the incoming values, as that sum will represent the new amount of coins in the machine. That means the definition of insert_coins
switches from one stream to another under the logic defined before. That logic is what is implemented in the switchMapLatest
.switchMapLatest
and not not switchMap
as otherwise the coins_in_machine
stream would continue to receive emission from former switched streams, i.e. duplicated emission as in the end there are ever only two streams to and from which we switch. If I may, I would say this is a close and switch that we need.switchMapLatest
has to return a stream, so we jump through hoops to make a stream that emits 0 and never
ends (and does not block the computer, as using the repeat
operator would in that case)inserted_coins
emit the values we want. My first implementation was inserted_coins.scan(sum,0)
and that never worked. The key and I found that quite tricky, is that when we get to that point in the flow, inserted_coins
already emitted one of the values that is a part of the sum. That value is the one passed as a parameter of flatMapLatest
but it is not in the source anymore, so calling scan
after the fact won-t get it, so it is necessary to get that value from the flatMapLatest
and reconstitute the correct behaviour.Upvotes: 1
Reputation: 18663
You could use your scan/combineLatest approach and then finish the stream with a first
followed up with a repeat
so that it "starts over" the stream but your Observers would not see it.
var coinStream = Rx.Observable.merge(
Rx.Observable.fromEvent($('#add5'), 'click').map(5),
Rx.Observable.fromEvent($('#add10'), 'click').map(10),
Rx.Observable.fromEvent($('#add25'), 'click').map(25)
);
var selectedStream = Rx.Observable.merge(
Rx.Observable.fromEvent($('#coke'), 'click').map('Coke'),
Rx.Observable.fromEvent($('#sprite'), 'click').map('sprite')
);
var $selection = $('#selection');
var $change = $('#change');
function dispense(selection) {
$selection.text('Dispensed: ' + selection);
console.log("Dispensing Drink: " + selection);
}
function dispenseChange(change) {
$change.text('Dispensed change: ' + change);
console.log("Dispensing Change: " + change);
}
var dispenser = coinStream.scan(function(acc, delta) { return acc + delta; }, 0)
.combineLatest(selectedStream,
function(coins, selection) {
return {coins : coins, selection : selection};
})
//Combine latest won't emit until both Observables have a value
//so you can safely get the first which will be the point that
//both Observables have emitted.
.first()
//First will complete the stream above so use repeat
//to resubscribe to the stream transparently
//You could also do this conditionally with while or doWhile
.repeat()
//If you only will subscribe once, then you won't need this but
//here I am showing how to do it with two subscribers
.publish();
//Dole out the change
dispenser.pluck('coins')
.map(function(c) { return c - 1;})
.subscribe(dispenseChange);
//Get the selection for dispensation
dispenser.pluck('selection').subscribe(dispense);
//Wire it up
dispenser.connect();
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
<button id="coke">Coke</button>
<button id="sprite">Sprite</button>
<button id="add5">5</button>
<button id="add10">10</button>
<button id="add25">25</button>
<div id="change"></div>
<div id="selection"></div>
Upvotes: 3