Mark
Mark

Reputation: 33

Simple Rx example either blocks or has race condition

After watching: Intro to Reactive Programming I'm trying to replicate the example using C# and reactiveX. In summary it's a currency converter. The user specifies the amount and type of currency they want. The current exchange rate is retrieved from a server and will take a variable amount of time to return. Because of this variable server time it's possible for an old currency request to come in after a more recent one. This caveat is used as motivation to make things reactive to avoid a race condition bugs. I can avoid the race condition in my reactive code by using Wait() on the Task representing the currency exchange rate but this then blocks the GUI. How should I be approaching this? Perhaps I should cancel the other Tasks? Below is my code that doesn't block but has a race condition problem.

public CurrencyWindow()
{
    InitializeComponent();

    var amount = Observable.FromEventPattern<TextChangedEventArgs>(
        txtAmount, "TextChanged").Select(t=>int.Parse(txtAmount.Text));

    var yen = Observable.FromEventPattern<RoutedEventArgs>(
        rdoYen, "Checked").Select(t => "Yen");
    var dollar = Observable.FromEventPattern<RoutedEventArgs>(
        rdoDollar, "Checked").Select(t => "Dollar");
    var pound = Observable.FromEventPattern<RoutedEventArgs>(
        rdoPound, "Checked").Select(t => "Pound");

    // merge the currency selection into one stream.
    var currencyType = yen.Merge(dollar).Merge(pound);

    // Lets now get a stream of currency rates.
    var exchangeRate = currencyType.Select(s => GetExchangeRate(s));


    amount.CombineLatest(exchangeRate, async (i, d) => i * await d).Subscribe(async v =>
    {
        var val = await v;
        txtCost.Text = val.ToString();
    });
}

private Task<double> GetExchangeRate(string currency)
{
    double exchange = 0;
    switch(currency)
    {
        case "Yen":
            exchange = 1;
            break;
        case "Pound":
            exchange = 2;
            break;
        case "Dollar":
            exchange = 3;
            break;
    }

    return Task.Factory.StartNew(() => 
    {
        System.Threading.Thread.Sleep((int)exchange * 3000);
        return exchange;
    });
}

Upvotes: 0

Views: 177

Answers (1)

Enigmativity
Enigmativity

Reputation: 117175

Have a go with this code and see if it works:

var exchangeRate = currencyType
    .Select(s => Observable.FromAsync(() => GetExchangeRate(s)))
    .Switch();

amount
    .CombineLatest(exchangeRate, (i, d) => i * d)
    .Subscribe(v =>
    {
        var val = v;
        txtCost.Text = val.ToString();
    });

This code will ensure that only the latest currencyType produced will ever return an exchange rate. If a new currency comes along then all previous in-flight calls to GetExchangeRate will be ignored (even if they come in after the current one).

Upvotes: 1

Related Questions