Reputation: 29780
given a Subject like this:
var input = new Subject<int>();
and subscribers like this:
var observer1 = input
.Subscribe(ev =>
{
Thread.Sleep(1000);
listBox.Items.Add("o1: " + ev.ToString());
});
var observer2 = input
.Subscribe(ev =>
{
Thread.Sleep(1000);
listBox.Items.Add("o2: " + ev.ToString());
});
how can I tweak it so that when I do
Enumerable.Range(0, 1000).ToList().ForEach((i) =>
{
input.OnNext(i);
});
the OnNext is called async but the subscription is synchronized to the UI thread. (It's a WinForms environment)
I have tried
var observer1 = input
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(ev =>
{
Thread.Sleep(1000);
listBox.Items.Add("o1: " + ev.ToString());
});
And I can see that observer1 and observer2 immediately start their subscribe action without waiting, due to the ObserveOn. But now the app crashes due to the fact that the listbox is called on the non-ui thread. As soon as I sync the action of the subscribe with the UI tread I loose the parallel processing of the OnNext call.
Any idea how to fix it?
Upvotes: 1
Views: 1107
Reputation: 29786
You need to separate the job of retrieving the items from the job of updating the UI. Run the work on a background thread and then pass the list to the UI to update the the ListBox.
Try this:
input.ObserveOn(Scheduler.Default) // transition to background thread
.Select(evt => /* synchronous code returning item */)
.ObserveOn(frm) // use rx-winforms to get this extension method
.Subscribe(item => listBox.Items.Add(item));
ObserveOn
has a number of overloads, with rx-winforms you can pass a WinForms control (which can be the a host form or a constituent) and Rx will use this to transition to the UI thread. Another overloads accepts SynchronzationContext
objects.
Depending on the number and frequency of items you have, you might want to consider using a Buffer
with a TimeSpan
either before of after the Select
to batch items into a List
and then update the UI with several items at a time. This reduces the number of context switches and can keep the UI more responsive.
You may find it helpful to consult this: ObserveOn and SubscribeOn - where the work is being done
Note that this will process each item one at a time - which may not be what you want. In this case you can use SelectMany
. Assuming your background work is encapsulated in an async method say, like this:
async Task<string> GetItem(int evt);
Then you can do:
input.SelectMany(evt => GetItem(evt))
.ObserveOn(frm) // use rx-winforms to get this extension method
.Subscribe(item => listBox.Items.Add(item));
Since GetItem
is now async, it will already transition to a background thread - as long as this happens swiftly (i.e. you should have an await
in your GetItem
for the first long-running action), there will be no need for the first ObserveOn
.
SelectMany
projects each source event into an observable stream, and flattens the result. It has an overload that accepts an async method, and encapsulates it in an IObservable<T>
that returns a single result and completes.
This means that events are processed concurrently, and it's now possible for events to arrive back out of order - so be sure this is acceptable. There are ways to address this, but it's really drifting beyond the scope of the question. (hint: Select
+Concat
)
Upvotes: 1
Reputation: 29780
Just found out that if I do
var observer1 = input
.Subscribe(ev =>
{
await Task.Yield();
Thread.Sleep(1000);
listBox.Items.Add("o1: " + ev.ToString());
});
var observer2 = input
.Subscribe(ev =>
{
await Task.Yield();
Thread.Sleep(1000);
listBox.Items.Add("o2: " + ev.ToString());
});
it works. But is there not a cleaner, Rx based, solution? Basically I want a fire-and-forget action in the subscribe that can access the UI thread.
Upvotes: 0