Reputation: 367
I'm having trouble (probably trouble understanding) with ReactiveExtensions, I have a WCF service I connect to in a WinForms application, and from following tutorials this Is what I've come up with, and this was never working so I changed it from the regular synchronous methods to the asynchronous ones, I started to syntax errors instead. Why does Task<T[]>.ToObservable()
give me a T[]
when I subscribe to it? I may have a lot more than that wrong. Thanks for any help!
var users =
_chatServiceClient.GetAllUsersAsync()
.ToObservable()
.SubscribeOn(TaskPoolScheduler.Default)
.ObserveOn(this)
.Throttle(TimeSpan.FromMilliseconds(500))
.DistinctUntilChanged();
var messages =
_chatServiceClient.GetNewMessagesAsync(_me)
.ToObservable()
.SubscribeOn(TaskPoolScheduler.Default)
.ObserveOn(this)
.DistinctUntilChanged()
.Throttle(TimeSpan.FromMilliseconds(100))
.Subscribe(m => chatRichTextBox.AppendText(string.Format("{0}:{1}: {2}", m.Date, m.User, m.Content)));
userList.DataSource = users;
Upvotes: 1
Views: 107
Reputation: 117037
I think that this should be simple enough if you look at the ToObservable
extension method for converting a task to an observable.
It looks like this:
IObservable<TResult> ToObservable<TResult>(this Task<TResult> task)
So if you start with a Task<T[]>
then you're going to get an IObservable<T[]>
out.
So, for example, this holds:
Task<int[]> task = Task.Run(() => new [] { 1, 2, 3 });
IObservable<int[]> observable = task.ToObservable();
Now if you want to get an IObservable<int>
out then you need to do this:
IObservable<int> observable2 = observable.SelectMany(x => x);
Upvotes: 1
Reputation: 18125
The reason you're getting an Observable<T[]>
is because ToObservable
on a Task<T>
returns Observable<T>
. It must do this to ensure any type of Task can be converted to an Observable
. In this case, a Task is used to represent an asynchronous function which returns a single value. An Observable is used to represent an asynchronous function with multiple values. So, when we convert a Task to an Observable, we convert an asynchronous function which can only return a single value -> an asynchronous function which will only return a single value.
Therefore, once we have an asynchronous function which returns only one value, and that value is an object which can contain multiple values, we must create another asynchronous function which returns all values contained in the object. To do so, we can use SelectMany
.
Task<T[]> taskOfCollection = ...;
Observable<T[]> obsOfCollection = TaskOfCollection.ToObservable();
Observable<T> obsOfItems = obsOfCollection.SelectMany(x => x.ToObservable());
However, since x
is an array, it is Enumerable
, and SelectMany
knows how to handle Enumerable
, so we can just return x
.
Observable<T> obsOfItems = obsOfCollection.SelectMany(x => x);
The above is simply a long-winded way of doing a Merge
.
Observable<T> obsOfItems = obsOfCollection.Merge();
And, since there is only one item returned from the obsOfCollection, Concat
works too (Concat
just means "Merge on thing at a time".)
Observable<T> obsOfItems = obsOfCollection.Concat();
So, your final expression becomes:
Observable<T> obs = task.ToObservable().Merge();
// or
Observable<T> obs = task.ToObservable().Merge(1);
// or
Observable<T> obs = task.ToObservable().Concat();
The reason ToObservable
on a Task<T[]>
does not return an Observable<T>
, is because there is no overload to do so. However, the reason there is no overload, is because it would then prevent users from using an Observable
to await
an array of things.
Upvotes: 4