NoPyGod
NoPyGod

Reputation: 5067

Observable for awaitable method C#

Observable.FromAsyncPattern can be used to make an observable from BeginX EndX style async methods.

Maybe I'm misunderstanding things, but is there a similar function to create an observable from the new async style methods - ie.. Stream.ReadAsync?

Upvotes: 3

Views: 1345

Answers (2)

NoPyGod
NoPyGod

Reputation: 5067

Note that Lee's answer is correct, but ultimately what I ended up doing was using Observable.Create to continually read from a stream, see below --

    public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader)
    {

       return Observable.Create<Command>(async (subject, token) =>
        {


            try
            {

                while (true)
                {

                    if (token.IsCancellationRequested)
                    {
                        subject.OnCompleted();
                        return;
                    }

                    Command cmd = await reader.ReadCommandAsync();

                    subject.OnNext(cmd);

                }

            }

            catch (Exception ex)
            {
                try
                {
                    subject.OnError(ex);
                }
                catch (Exception)
                {
                    Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere");
                    throw;
                }
            }

        }).Publish();

    }

Upvotes: 3

Lee
Lee

Reputation: 144126

You can create an IObservable<T> from a Task<T> using ToObservable:

using System.Reactive.Threading.Tasks;

Stream s = ...;
IObservable<int> o = s.ReadAsync(buffer, offset, count).ToObservable();

Upvotes: 6

Related Questions