Reputation: 25631
I have the following Rx method chain and I am looking to clean/improve the Select
method shown below, can this be done with an existing Rx operator, tried using Amb
but queried all ISINs and return the first Price produced.
I want the first Price produced for an ISIN and not bother querying any other ISIN, in otherwords I want to call GetPrice in a serial manner - try first ISIN, try second ISIN, try third ISIN etc...
public class Class1
{
public Class1()
{
GetIsins()
.Select(isins =>
{
*decimal? price = null;
foreach (var isin in isins)
{
price = GetPrice(isin)
.Take(1)
.Wait();
if (price.HasValue)
break;
}
return price;*
})
.Subscribe(price =>
{
if (price.HasValue)
Debug.WriteLine("Got a price...");
});
}
public IObservable<string[]> GetIsins()
{
return Observable.Return(new []
{
"US9311421039",
"TR9311421033",
"UK3342130394"
});
}
public IObservable<decimal?> GetPrice(string isin)
{
return Observable.Return((decimal?)100m);
}
}
Upvotes: 0
Views: 53
Reputation: 43525
Assuming that I have understood the question correctly, you probably need to make the GetPrice
operation a side-effect of subscribing to the observable.
GetIsins()
.SelectMany(isins => isins)
.Select(isin => Observable.Defer(() =>
{
return GetPrice(isin);
}))
.Concat() // or .Merge(1)
.Subscribe(price =>
{
if (price.HasValue)
Debug.WriteLine("Got a price...");
});
The Concat
operator ensures that each IObservable<decimal?>
in the sequence will have to complete before subscribing to the next one.
The Defer
operator ensures that each IObservable<decimal?>
will not start before it is subscribed (ensures that it is a cold observable in other words).
Upvotes: 1