resp78
resp78

Reputation: 1534

Rx.Net - Publish method missing first few items when subscribing to Cold Observable

Inspired by Akavache I am trying to create a solution that provides me with an IObservable<IArticle>. The method essentially first try to get all the articles that are present in the database, then it tries to fetch updated articles from the webservice and as it is getting the latest articles from webservice it tries to save them back to the database.

Since the webservice is essentially a cold observable and I don't want to subscribe twice, I used Publish to connect to it. My understanding is that I am using the correct version of the Publish method, however, many times the method tend to miss first couple of Articles from the GetNewsArticles. This was observed through the UI and also the Trace calls added in the call below.

Apart from solving the problem, it would be great to also understand how to debug/test this code (apart from introducing DI to inject NewsService).

public IObservable<IArticle> GetContents(string newsUrl, IScheduler scheduler)
{
    var newsService = new NewsService(new HttpClient());
    scheduler = scheduler ?? TaskPoolScheduler.Default;

    var fetchObject = newsService
        .GetNewsArticles(newsUrl,scheduler)
        .Do(x => Trace.WriteLine($"Parsing Articles {x.Title}"));

    return fetchObject.Publish(fetchSubject =>
    {
        var updateObs = fetchSubject
            .Do( x =>                         
            {
                // Save to database, all sync calls
            })
            .Where(x => false)
            .Catch(Observable.Empty<Article>());

        var dbArticleObs = Observable.Create<IArticle>(o =>
        {
            return scheduler.ScheduleAsync(async (ctrl, ct) =>
            {
                using (var session = dataBase.GetSession())
                {
                    var articles = await session.GetArticlesAsync(newsUrl, ct);
                    foreach (var article in articles)
                    {
                        o.OnNext(article);
                    }
                }
                o.OnCompleted();
            });
        });

        return
            dbArticleObs                // First get all the articles from dataBase cache
                .Concat(fetchSubject    // Get the latest articles from web service 
                    .Catch(Observable.Empty<Article>())
                    .Merge(updateObs))  // Update the database with latest articles
                .Do(x => Trace.WriteLine($"Displaying {x.Title}"));
    });
}

UPDATE - Added GetArticles

public IObservable<IContent> GetArticles(string feedUrl, IScheduler scheduler)
{
    return Observable.Create<IContent>(o =>
    {
        scheduler = scheduler ?? DefaultScheduler.Instance;
        scheduler.ScheduleAsync(async (ctrl, ct) =>
        {
            try
            {
                using (var inputStream = await Client.GetStreamAsync(feedUrl))
                {
                    var settings = new XmlReaderSettings
                    {
                        IgnoreComments = true,
                        IgnoreProcessingInstructions = true,
                        IgnoreWhitespace = true,
                        Async = true
                    };

                    //var parsingState = ParsingState.Channel;
                    Article article = null;
                    Feed feed = null;

                    using (var reader = XmlReader.Create(inputStream, settings))
                    {
                        while (await reader.ReadAsync())
                        {
                            ct.ThrowIfCancellationRequested();
                            if (reader.IsStartElement())
                            {
                                switch (reader.LocalName)
                                {
                                    ...
                                    // parsing logic goes here
                                    ...
                                }
                            }
                            else if (reader.LocalName == "item" &&
                                     reader.NodeType == XmlNodeType.EndElement)
                            {
                                o.OnNext(article);
                            }
                        }
                    }

                    o.OnCompleted();
                }
            }
            catch (Exception e)
            {
                o.OnError(e);
            }

        });
        return Disposable.Empty;
    });
}
UPDATE 2

Sharing the link to source code here.

Upvotes: 0

Views: 89

Answers (1)

Enigmativity
Enigmativity

Reputation: 117027

There's a few things I don't like about your code. I assume NewsService is an IDisposable as it takes an HttpClient (which is disposable). You're not doing a proper clean up.

Also, you haven't provided a complete method - because you've tried cutting it down for the question - but that makes it hard to reason about how to rewrite the code.

That said, the one thing that sticks out to me as quite horrid looking is the Observable.Create. Can you please try this code instead and see if it helps things work for you?

    var dbArticleObs =
        Observable
            .Using(
                () => dataBase.GetSession(),
                session =>
                    from articles in Observable.FromAsync(ct => session.GetArticlesAsync(newsUrl, ct))
                    from article in articles
                    select article);

Now, if that does, try rewriting fetchObject to use the same Observable.Using when newing up the `NewService.

In any case, it would be good if you could provide a complete implementation of GetContents, NewsService and your dataBase code in your question.

Upvotes: 0

Related Questions