Gimly
Gimly

Reputation: 6175

Poll a webservice using Reactive Extensions and bind the last x results

I'm trying to use Reactive Extensions (Rx) for a task where it seems to be a good fit, polling at a specific interval a web service and display its last x results.

I have a web service that sends me the status of an instrument I want to monitor. I would like to poll this instrument at a specific rate and display in a list the last 20 status that have been polled.

So my list would be like a "moving window" of the service result.

I'm developing a WPF app with Caliburn.Micro, but I don't think this is very relevant.

What I managed to get until now is the following (just a sample app that I hacked quickly, I'm not going to do this in the ShellViewModel in the real app):

public class ShellViewModel : Caliburn.Micro.PropertyChangedBase, IShell
{
    private ObservableCollection<string> times;
    private string currentTime;

    public ShellViewModel()
    {
        times = new ObservableCollection<string>();

        Observable
            .Interval(TimeSpan.FromSeconds(1))
            .SelectMany(x => this.GetCurrentDate().ToObservable())
            .ObserveOnDispatcher()
            .Subscribe(x =>
            {
                this.CurrentTime = x;
                this.times.Add(x);
            });
    }

    public IEnumerable<string> Times
    {
        get
        {
            return this.times;
        }
    }

    public string CurrentTime
    {
        get
        {
            return this.currentTime;
        }
        set
        {
            this.currentTime = value;
            this.NotifyOfPropertyChange(() => this.CurrentTime);
        }
    }

    private async Task<string> GetCurrentDate()
    {
        var client = new RestClient("http://www.timeapi.org");
        var request = new RestRequest("/utc/now.json");

        var response = await client.ExecuteGetTaskAsync(request);

        return response.Content;
    }
}

In the view I have just a label bound to the CurrentTime property and a list bound to the Times property.

The issue I have is:

Second edit:

Previous edit below was me being stupid and very confused, it triggers events continuously because Interval is something continuous that never ends. Brandon's solution is correct and works as expected.

Edit:

Based on Brandon's example, I tried to do the following code in LinqPad:

Observable
    .Merge(Observable.Interval(TimeSpan.FromSeconds(2)), Observable.Interval(TimeSpan.FromSeconds(10)))
    .Repeat()
    .Scan(new List<double>(), (list, item) => { list.Add(item); return list; })
    .Subscribe(x => Console.Out.WriteLine(x))

And I can see that the write to the console occurs every 2 seconds, and not every 10. So the Repeat doesn't wait for both Observable to be finished before repeating.

Upvotes: 7

Views: 1456

Answers (2)

Brandon
Brandon

Reputation: 39182

Try this:

// timer that completes after 1 second
var intervalTimer = Observable
    .Empty<string>()
    .Delay(TimeSpan.FromSeconds(1));

// queries one time whenever subscribed
var query = Observable.FromAsync(GetCurrentDate);

// query + interval timer which completes
// only after both the query and the timer
// have expired

var intervalQuery = Observable.Merge(query, intervalTimer);

// Re-issue the query whenever intervalQuery completes
var queryLoop = intervalQuery.Repeat();

// Keep the 20 most recent results
// Note.  Use an immutable list for this
// https://www.nuget.org/packages/microsoft.bcl.immutable
// otherwise you will have problems with
// the list changing while an observer
// is still observing it.
var recentResults = queryLoop.Scan(
    ImmutableList.Create<string>(), // starts off empty
    (acc, item) =>
    {
        acc = acc.Add(item);
        if (acc.Count > 20)
        {
            acc = acc.RemoveAt(0);
        }

        return acc;
    });

// store the results
recentResults
    .ObserveOnDispatcher()
    .Subscribe(items =>
    {
        this.CurrentTime = items[0];
        this.RecentItems = items;
    });

Upvotes: 7

codeworx
codeworx

Reputation: 2745

This should skip the interval messages while a GetCurrentDate is in Progress.

Observable
    .Interval(TimeSpan.FromSeconds(1))
    .GroupByUntil(p => 1,p => GetCurrentDate().ToObservable().Do(x => {
                            this.CurrentTime = x;
                            this.times.Add(x);
                        }))
    .SelectMany(p => p.LastAsync())
    .Subscribe();

Upvotes: 0

Related Questions