Reputation: 33
After watching: Intro to Reactive Programming I'm trying to replicate the example using C# and reactiveX. In summary it's a currency converter. The user specifies the amount and type of currency they want. The current exchange rate is retrieved from a server and will take a variable amount of time to return. Because of this variable server time it's possible for an old currency request to come in after a more recent one. This caveat is used as motivation to make things reactive to avoid a race condition bugs. I can avoid the race condition in my reactive code by using Wait() on the Task representing the currency exchange rate but this then blocks the GUI. How should I be approaching this? Perhaps I should cancel the other Tasks? Below is my code that doesn't block but has a race condition problem.
public CurrencyWindow()
{
InitializeComponent();
var amount = Observable.FromEventPattern<TextChangedEventArgs>(
txtAmount, "TextChanged").Select(t=>int.Parse(txtAmount.Text));
var yen = Observable.FromEventPattern<RoutedEventArgs>(
rdoYen, "Checked").Select(t => "Yen");
var dollar = Observable.FromEventPattern<RoutedEventArgs>(
rdoDollar, "Checked").Select(t => "Dollar");
var pound = Observable.FromEventPattern<RoutedEventArgs>(
rdoPound, "Checked").Select(t => "Pound");
// merge the currency selection into one stream.
var currencyType = yen.Merge(dollar).Merge(pound);
// Lets now get a stream of currency rates.
var exchangeRate = currencyType.Select(s => GetExchangeRate(s));
amount.CombineLatest(exchangeRate, async (i, d) => i * await d).Subscribe(async v =>
{
var val = await v;
txtCost.Text = val.ToString();
});
}
private Task<double> GetExchangeRate(string currency)
{
double exchange = 0;
switch(currency)
{
case "Yen":
exchange = 1;
break;
case "Pound":
exchange = 2;
break;
case "Dollar":
exchange = 3;
break;
}
return Task.Factory.StartNew(() =>
{
System.Threading.Thread.Sleep((int)exchange * 3000);
return exchange;
});
}
Upvotes: 0
Views: 177
Reputation: 117175
Have a go with this code and see if it works:
var exchangeRate = currencyType
.Select(s => Observable.FromAsync(() => GetExchangeRate(s)))
.Switch();
amount
.CombineLatest(exchangeRate, (i, d) => i * d)
.Subscribe(v =>
{
var val = v;
txtCost.Text = val.ToString();
});
This code will ensure that only the latest currencyType
produced will ever return an exchange rate. If a new currency comes along then all previous in-flight calls to GetExchangeRate
will be ignored (even if they come in after the current one).
Upvotes: 1