Reputation: 4763
I'm experiencing Reactive Programming (Rx) and one of its interesting feature is subscribing and observing on different threads. But here somehow it blocks the UI thread. Technically I don't have any method returning Task (async method), so here I'm trying to mimic a long process with Thread.Sleep
:
IEnumerable<Item> _search(string searchText)
{
Thread.Sleep(3000);
//return result by querying ...
//...
return someResult;
}
I have a ViewModel class like this:
public class ViewModel {
public ViewModel(){
//this SubscribeOn may not be necessary but I just try it here for sure
SearchTextStream.SubscribeOn(NewThreadScheduler.Default)
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(searchText => {
var items = _search(searchText);
}, ex => {
//handle error
});
}
public string SearchText
{
get
{
return _searchText.FirstAsync().Wait();
}
set
{
_searchText.OnNext(value);
}
}
ISubject<string> _searchText = new BehaviorSubject<string>("");
public IObservable<string> SearchTextStream
{
get
{
return _searchText.AsObservable().DistinctUntilChanged();
}
}
}
Actually without using Thread.Sleep
, I can still see it blocks the UI but not very obvious, so I just use it to make it more obvious. As I said, the scenario here is that I have just a normal method without any task or async. It may be a long-running method. Using with RX, I don't know which should be done to make it behave like async (as when using a Task.Run
)?
I'm testing on a WPF
application if it matters.
Upvotes: 1
Views: 552
Reputation: 117010
You're calling _search(searchText)
on the DispatcherScheduler.Current
scheduler - and hence, with the Thread.Sleep
you're blocking the UI.
You really should make _search
return an observable.
IObservable<IEnumerable<Item>> _search(string searchText)
{
Thread.Sleep(3000);
//return result by querying ...
//...
return Observable.Return(new [] { new Item() });
}
Now the constructor should look like this:
public ViewModel()
{
SearchTextStream
.ObserveOn(System.Reactive.Concurrency.Scheduler.Default)
.SelectMany(searchText => _search(searchText))
.ObserveOnDispatcher()
.Subscribe(items =>
{
/* do something with `items` */
}, ex =>
{
//handle error
});
}
Upvotes: 1