Peter Bons
Peter Bons

Reputation: 29780

Execute OnNext in parallel but sync subscription with UI thread

given a Subject like this:

var input = new Subject<int>();

and subscribers like this:

var observer1 = input
.Subscribe(ev =>
{
    Thread.Sleep(1000);
    listBox.Items.Add("o1: " + ev.ToString());
});

var observer2 = input
.Subscribe(ev =>
{
    Thread.Sleep(1000);
    listBox.Items.Add("o2: " + ev.ToString());
});

how can I tweak it so that when I do

Enumerable.Range(0, 1000).ToList().ForEach((i) =>
        {
            input.OnNext(i);                    
        });

the OnNext is called async but the subscription is synchronized to the UI thread. (It's a WinForms environment)

I have tried

var observer1 = input
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(ev =>
{
    Thread.Sleep(1000);
    listBox.Items.Add("o1: " + ev.ToString());
});

And I can see that observer1 and observer2 immediately start their subscribe action without waiting, due to the ObserveOn. But now the app crashes due to the fact that the listbox is called on the non-ui thread. As soon as I sync the action of the subscribe with the UI tread I loose the parallel processing of the OnNext call.

Any idea how to fix it?

Upvotes: 1

Views: 1107

Answers (2)

James World
James World

Reputation: 29786

You need to separate the job of retrieving the items from the job of updating the UI. Run the work on a background thread and then pass the list to the UI to update the the ListBox.

Try this:

input.ObserveOn(Scheduler.Default) // transition to background thread
     .Select(evt => /* synchronous code returning item */)
     .ObserveOn(frm) // use rx-winforms to get this extension method
     .Subscribe(item => listBox.Items.Add(item));

ObserveOn has a number of overloads, with rx-winforms you can pass a WinForms control (which can be the a host form or a constituent) and Rx will use this to transition to the UI thread. Another overloads accepts SynchronzationContext objects.

Depending on the number and frequency of items you have, you might want to consider using a Buffer with a TimeSpan either before of after the Select to batch items into a List and then update the UI with several items at a time. This reduces the number of context switches and can keep the UI more responsive.

You may find it helpful to consult this: ObserveOn and SubscribeOn - where the work is being done

Note that this will process each item one at a time - which may not be what you want. In this case you can use SelectMany. Assuming your background work is encapsulated in an async method say, like this:

async Task<string> GetItem(int evt);

Then you can do:

input.SelectMany(evt => GetItem(evt))
     .ObserveOn(frm) // use rx-winforms to get this extension method
     .Subscribe(item => listBox.Items.Add(item));

Since GetItem is now async, it will already transition to a background thread - as long as this happens swiftly (i.e. you should have an await in your GetItem for the first long-running action), there will be no need for the first ObserveOn.

SelectMany projects each source event into an observable stream, and flattens the result. It has an overload that accepts an async method, and encapsulates it in an IObservable<T> that returns a single result and completes.

This means that events are processed concurrently, and it's now possible for events to arrive back out of order - so be sure this is acceptable. There are ways to address this, but it's really drifting beyond the scope of the question. (hint: Select+Concat)

Upvotes: 1

Peter Bons
Peter Bons

Reputation: 29780

Just found out that if I do

var observer1 = input
.Subscribe(ev =>
{
    await Task.Yield(); 
    Thread.Sleep(1000);
    listBox.Items.Add("o1: " + ev.ToString());
});

var observer2 = input
.Subscribe(ev =>
{
    await Task.Yield(); 
    Thread.Sleep(1000);
    listBox.Items.Add("o2: " + ev.ToString());
});

it works. But is there not a cleaner, Rx based, solution? Basically I want a fire-and-forget action in the subscribe that can access the UI thread.

Upvotes: 0

Related Questions