Jensen
Jensen

Reputation: 3538

Rx Retry() not working as expected

I am experimenting with Reactive Extensions to fetch a bunch of RSS items. I based myself on a blog post by Tim Greenfield: Silverlight Rx DataClient within MVVM.

I'm using it within a desktop application, but the code is similar.

The problem I'm having is in understanding the Retry() functions. It doesn't seem to be doing what I expect and on what I'm expecting it.

var items = new List<RssItem>();
WebHelper.DownloadXmlFileAsync<RssItem>(new Uri(URI), "item")
    .Retry(2)
    .Finally(PublishResults)
    .Subscribe(items.Add, ProcessError, () => ProcessCompleted(items));

When I pass in a valid URI, this works without any issues. When I make a typo in the URI it reports a 404 error through the ProcessError() function, as one would expect, but it's only reported once. I would have expected it to show this error twice.

So it seems that the Retry() function is not operating on my web request, but it looks like it actually applies on the functions which are passed to Subscribe(). I could be wrong here though.

How can I make sure the Retry() call applies on the web request?

Extra code:

public static class WebHelper
{
    public static HttpWebRequest CreateHttp(Uri uri)
    {
        return CreateHttp(uri, "GET");
    }

    public static HttpWebRequest CreateHttp(Uri uri, string method)
    {
        if (uri.Scheme != Uri.UriSchemeHttp && uri.Scheme != Uri.UriSchemeHttps)
        {
            throw new ArgumentException("The specified URI does not use HTTP or HTTPS.", "uri");
        }

        var request = (HttpWebRequest)WebRequest.Create(uri);
        request.Method = method;

        return request;
    }

    public static IObservable<T> DownloadXmlFileAsync<T>(Uri uri, string elementName) where T : class
    {
        return (from request in Observable.Return(CreateHttp(uri))
                from response in Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()
                let stream = response.GetResponseStream()
                where stream != null
                from item in XmlReader.Create(stream).GetXmlItem<T>(elementName).ToObservable()
                select item);
    }
}

public static class XmlExtensions
{
    public static IEnumerable<T> GetXmlItem<T>(this XmlReader reader, string elementName) where T : class
    {
        var serializer = new XmlSerializer(typeof (T));
        while (reader.GoToElement(elementName))
        {
            yield return serializer.Deserialize(reader) as T;
        }
    }

    public static bool GoToElement(this XmlReader reader, string elementName)
    {
        do
        {
            if (reader.NodeType == XmlNodeType.Element && reader.Name == elementName)
            {
                return true;
            }
        } while (reader.Read());

        return false;
    }
}

XmlRoot("item")]
public class RssItem
{
    [XmlElement("description")]
    public string Description { get; set; }

    [XmlElement("link")]
    public string Link { get; set; }

    [XmlElement("pubDate")]
    public string PublishDate { get; set; }

    [XmlElement("title")]
    public string Title { get; set; }

    public override string ToString()
    {
        return string.Format("Title: {0}", Title);
    }
}

Upvotes: 5

Views: 4164

Answers (2)

Lee Campbell
Lee Campbell

Reputation: 10783

Asti's answer is spot on. I just wanted to add some additional information in case you wanted to know how to expose multiple Errors for a single logical sequence.

As Asti points out, you can only terminate a sequence once. This termination can be either an error or a completion (OnError|OnCompleted).

However there is nothing stopping you having nested observable sequences! If you did want to see multiple error messages then consider a scenario where you returned an IObservable<IObservable<T>>. The inner sequence is the data sequence (the sequence you currently have). When this sequence Errors then it can no longer be used, so the Outer sequence could produce an new inner data sequence.

This may seem a bit odd, but it is a supported concept in Rx as operators like Merge and Switch already cater for these nested sequences. This style of Rx is touched in in my book, IntroToRx in the Nested Sequences paragraph and then again in more detail in Sequences of Coincidence chapter

I hopes this helps you to see other possibilities of how to Rx in the future.

Upvotes: 4

Asti
Asti

Reputation: 12667

The Rx grammar for sequences is defined as:

OnNext* (OnError | OnCompleted)?

Receiving either an OnError or an OnCompleted signals the end of the sequence and subscriptions on the pipeline are expected to be torn down.

In the context of the operators:

observable.Retry(n) is: Re-subscribe to observable when an OnError is received, upto n times.

observable.Finally(action) is: Execute action on receiving OnError|OnCompleted

Retry is meant to be used with cold observables (Lee Campbell has a good post on this) where subscription essentially causes the source to start.

Similarly Repeat is exactly like Retry except it resubscribes on receiving an OnCompleted.

To see this in action, we can create an observable which will "fail" for the first n times, and then succeeds. Now for some code:

    private static IObservable<int> ErrorProducer(int i)
    {
        int count = 0;
        return Observable.Create<int>(observer =>
        {
            Console.WriteLine("Doing work");

            if (count++ < i)
            {
                Console.WriteLine("Failed");
                observer.OnError(new Exception());
            }
            else
            {
                Console.WriteLine("Done");
                observer.OnNext(count);
                observer.OnCompleted();                    
            }
            return Disposable.Empty;
        });
    }

For a producer which fails always:

      print(ErrorProducer(3).Retry(2));

Gives:

Doing work <-- Subscription
Failed
Doing work <-- Resubscription
Failed
OnError(System.Exception)
Finally

For a producer which eventually succeeds:

print(ErrorProducer(2).Retry(3));

Doing work
Failed
Doing work
Failed
Doing work
Done
OnNext(3) <-- Succeeded
OnCompleted()
Finally

If you wanted your process error function to be called as many times as it retries, it should be placed before the Retry.

i.e., seq.Do(value => { }, exception => { }).Retry(n)

You can read up on using hot/cold observables, and using the async pattern with Rx to clarify your understanding.

Upvotes: 16

Related Questions