jlvaquero
jlvaquero

Reputation: 8785

Rx .NET: Filter observable until task is done

I´m learning Rx for .NET and a colleague send me a simple example to start with but there is something ugly I don't like.

The code:

using System;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading.Tasks;
using System.Windows.Forms;
using System.Collections.Generic;

namespace WindowsFormsApplication1
{
    public partial class Form1 : Form
    {
        public IObservable<Content> contentStream;
        public static bool isRunning = false;

        public Form1()
        {

            InitializeComponent();

            contentStream = Observable.FromEventPattern<ScrollEventArgs>(dataGridView1, "Scroll")  // create scroll event observable
                .Where(e => (dataGridView1.Rows.Count - e.EventArgs.NewValue < 50 && !isRunning)) //discart event if scroll is not down enough
                //or we are already retrieving items (isRunning)
                .Select(e => { isRunning = true; return 100; }) //transform to 100--100--100--> stream, discart next events until we finish 
                .Scan((x, y) => x + y) //get item index by accumulating stream items
                .StartWith(0) //start with 0 before event gets triggered
                .SelectMany(i => getContent(i).ToObservable());//create a stream with the result of an async function and merge them into just one stream

            contentStream.Subscribe(c => invokeUpdateList(c)); //just update the control every time a item is in the contentStream

        }

        async private Task<Content> getContent(int index)
        {

            await Task.Delay(1000);//request to a web api...
            return new Content(index);//mock the response
        }

        private void invokeUpdateList(Content c)
        {
            dataGridView1.Invoke((MethodInvoker)delegate
            {
                updateList(c);
            });
        }

        private void updateList(Content c)
        {
            foreach (var item in c.pageContent)
            {
                dataGridView1.Rows.Add(item);
            }
            isRunning = false; //unlocks event filter
        }

    }

    public class Content
    {
        public List<string> pageContent = new List<string>();
        public const string content_template = "This is the item {0}.";
        public Content()
        {
        }
        public Content(int index)
        {

            for (int i = index; i < index + 100; i++)
            {
                pageContent.Add(string.Format(content_template, i));
            }

        }
    }
}

What I don't like is the isRunning filter. Is there a better way to discart some event in the stream until the control is updated?

Although @Shlomo approach seems right it does not start populating on load:

 var index = new BehaviorSubject<int>(0);

      var source = Observable.FromEventPattern<ScrollEventArgs>(dataGridView2, "Scroll")
          .Where(e => dataGridView2.Rows.Count - e.EventArgs.NewValue < 50)
          .Select(_ => Unit.Default)
          .StartWith(Unit.Default)
          .Do(i => Console.WriteLine("Event triggered"));

      var fetchStream = source
          .WithLatestFrom(index, (u, i) => new {unit = u,index = i } )
          .Do(o => Console.WriteLine("Merge result" + o.unit + o.index ))
          .DistinctUntilChanged()
          .Do(o => Console.WriteLine("Merge changed" + o.unit + o.index))
          .SelectMany(i => getContent(i.index).ToObservable());

       var contentStream = fetchStream.WithLatestFrom(index, (c, i) => new { Content = c, Index = i })
          .ObserveOn(dataGridView2)
          .Subscribe(a =>
          {
            updateGrid(a.Content);
            index.OnNext(a.Index + 100);
          });

I can see "Event triggered" in output log but seems first source element (StartWith(Unit.Default)) is lost once I reach into WithLatestFrom.

Upvotes: 2

Views: 289

Answers (1)

Shlomo
Shlomo

Reputation: 14350

This looks like some sort of pagination auto-scroll implementation? Conceptually, it could help to split up your observable:

var index = new BehaviorSubject<int>(0);

var source = Observable.FromEventPattern<ScrollEventArgs>(dataGridView1, "Scroll") 
    .Where(e => dataGridView1.Rows.Count - e.EventArgs.NewValue < 50)
    .Select(_ => Unit.Default)
    .StartWith(Unit.Default);

var fetchStream = source
    .WithLatestFrom(index, (_, i) => i)
    .DistinctUntilChanged()
    .SelectMany(i => getContent(i).ToObservable());

So source is a series of Units, basically empty notifications that the user wants to initiate a list update. index represents the next index to downloaded. fetchstream merges source with index to make sure there's only one request for a given index, then it initiates the fetch.

Now that we have a stream of requests that are distinct, we need to subscribe and update the UI and index.

var contentStream =
    fetchStream .WithLatestFrom(index, (c, i) => new { Content = c, Index = i })
    .ObserveOn(dataGridView1)
    .Subscribe(a =>
        {
            updateList(a.Content);
            index.OnNext(a.Index + 100);
        });

Note ObserveOn(datagridView1) accomplishes the same thing as your InvokeUpdateList method but in a cleaner form (requires Nuget System.Reactive.Windows.Forms), so you could eliminate that method.

All of this can go in the constructor, so you can hide all the state changes in there.

Upvotes: 1

Related Questions