Jonas Follesø
Jonas Follesø

Reputation: 6531

Converting event based API to Rx.Net

I'm trying to convert an existing event-based API to a Reactive Observable API. The concrete API I'm working with is the NSNetServiceBrowser in Xamarin.iOS. This API let you browse for network devices using Zeroconf/Bonjour. However, the question would apply to any API of this kind.

The NsNetServiceBrowser offers various events of interest: - FoundService - NotSearched - ServiceRemoved

The FoundService event is raised when a service is discovered, and the NotSearched is raised when the search fails.

I would like to combine the FoundService and NotSerched events, into an observable of NSNetService.

My current implementation looks like this:

public IObservable<NSNetService> Search()
{
    var foundObservable = Observable
        .FromEventPattern<NSNetServiceEventArgs>(
            h => serviceBrowser.FoundService += h,
            h => serviceBrowser.FoundService -= h)
        .Select(x => x.EventArgs);

    var notSearchedObservable = Observable
        .FromEventPattern<NSNetServiceErrorEventArgs>(
            h => serviceBrowser.NotSearched += h,
            h => serviceBrowser.NotSearched -= h)
        .Select(x => x.EventArgs);

    var serviceObservable = Observable.Create(
        (IObserver<NSNetServiceEventArgs> observer) =>
        {
            notSearchedObservable.Subscribe(n =>
            {
                string errorMessage = $"Search for {serviceType} failed:";
                foreach (var kv in n.Errors)
                {
                    log.Error($"\t{kv.Key}: {kv.Value}");
                    errorMessage += $" ({kv.Key}, {kv.Value})";
                }
                observer.OnError(new Exception(errorMessage));
            });
            foundObservable.Subscribe(observer);
            return System.Reactive.Disposables.Disposable.Empty;
    }).Select(x => x.Service);

    serviceBrowser.SearchForServices(serviceType, domain);
    return serviceObservable;
}

The code looks clunky and I have a gut feeling I'm not using System.Reactive correctly? Is there a more elegant way to combine event pairs, where one is producing and the other is signaling error? This is a common pattern in existing event based APIs in .NET.


Here is a small console app (depending only on System.Reactive) illustrating the type of API I want to Reactify:

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace ReactiveLearning
{
    class Program
    {
        static void Main(string[] args)
        {
            var browser = new ServiceBrowser();

            var observableFound =
                Observable.FromEventPattern<ServiceFoundEventArgs>(
                    h => browser.ServiceFound += h,
                    h => browser.ServiceFound -= h)
                .Select(e => e.EventArgs.Service);

            var observableError =
                Observable.FromEventPattern<ServiceSearchErrorEventArgs>(
                    h => browser.ServiceError += h,
                    h => browser.ServiceError -= h);

            var foundSub = observableFound.Subscribe(s =>
            {
                Console.WriteLine($"Found service: {s.Name}");
            }, () => 
            {
                Console.WriteLine("Found Completed");
            });

            var errorSub = observableError.Subscribe(e =>
            {
                Console.WriteLine("ERROR!");
            }, () => 
            {
                Console.WriteLine("Error Completed");
            });

            browser.Search();

            Console.ReadLine();

            foundSub.Dispose();
            errorSub.Dispose();

            Console.WriteLine();
        }
    }

    class ServiceBrowser
    {
        public EventHandler<ServiceFoundEventArgs> ServiceFound;
        public EventHandler<ServiceSearchErrorEventArgs> ServiceError;

        public void Search()
        {
            Task.Run(async () =>
            {
                for (var i = 0; i < 5; ++i)
                {
                    await Task.Delay(1000);
                    ServiceFound?.Invoke(this, new ServiceFoundEventArgs(new Service($"Service {i}")));
                }

                var r = new Random();
                if (r.NextDouble() > 0.5)
                {
                    ServiceError?.Invoke(this, new ServiceSearchErrorEventArgs());
                }
            });
        }
    }

    class ServiceFoundEventArgs : EventArgs
    {
        public Service Service { get; private set;  }
        public ServiceFoundEventArgs(Service service) => Service = service;
    }

    class ServiceSearchErrorEventArgs : EventArgs {}

    class Service
    {
        public event EventHandler<EventArgs> AddressResolved;
        public event EventHandler<EventArgs> ErrorResolvingAddress;
        public string Name { get; private set; }
        public string Address { get; private set; }
        public Service(string name) => Name = name;

        public void ResolveAddress()
        {
            Task.Run(async () =>
            {
                await Task.Delay(500);
                var r = new Random();
                if (r.NextDouble() > 0.5)
                {                    
                    Address = $"http://{Name}.com";
                    AddressResolved?.Invoke(this, EventArgs.Empty);
                }
                else
                {
                    ErrorResolvingAddress?.Invoke(this, EventArgs.Empty);
                }
            });
        }
    }
}

Upvotes: 1

Views: 198

Answers (1)

Enigmativity
Enigmativity

Reputation: 117027

Thank you for the excellent sample code. You need to make use of the excellent Materialize & Dematerialize operators. Here's how:

var observableFoundWithError =
    observableFound
        .Materialize()
        .Merge(
            observableError
                .Materialize()
                .Select(x =>
                    Notification
                        .CreateOnError<Service>(new Exception("Error"))))
        .Dematerialize()
        .Synchronize();

using (observableFoundWithError.Subscribe(
    s => Console.WriteLine($"Found service: {s.Name}"),
    ex => Console.WriteLine($"Found error: {ex.Message}"),
    () => Console.WriteLine("Found Completed")))
{
    browser.Search();
    Console.ReadLine();
}

The Materialize() operator turns an IObservable<T> into and IObservable<Notification<T>> which allows the standard OnError and OnCompleted to be emitted through the OnNext call. You can use Notification.CreateOnError<T>(new Exception("Error")) to construct elements of observable which you can turn back into an IObservable<T> with Dematerialize().

I've thrown the Synchronize() to ensure that you've created a valid observable. The use of Materialize() does let you construct observables that don't follow the regular observable contract. Part of what Synchronize() does is just ensure only one OnError and only one OnCompleted and drops any OnNext that comes after either of the two.


Try this as a way to do what you wanted in the comments:

static void Main(string[] args)
{
    var browser = new ServiceBrowser();

    var observableFound =
        Observable.FromEventPattern<ServiceFoundEventArgs>(
            h => browser.ServiceFound += h,
            h => browser.ServiceFound -= h)
        .Select(e => e.EventArgs.Service);

    var observableError =
        Observable.FromEventPattern<ServiceSearchErrorEventArgs>(
            h => browser.ServiceError += h,
            h => browser.ServiceError -= h);

    var observableFoundWithError = observableFound
        .Materialize()
        .Merge(
            observableError
                .Materialize()
                .Select(x => Notification.CreateOnError<Service>(new Exception("Error"))))
        .Dematerialize()
        .Synchronize();

    Func<Service, IObservable<Service>> resolveService = s =>
        Observable.Create<Service>(o =>
        {
            var observableResolved = Observable.FromEventPattern<EventArgs>(
                h => s.AddressResolved += h,
                h => s.AddressResolved -= h);

            var observableResolveError = Observable.FromEventPattern<EventArgs>(
                h => s.ErrorResolvingAddress += h,
                h => s.ErrorResolvingAddress -= h);

            var observableResolvedWithError =
                observableResolved
                    .Select(x => s)
                    .Materialize()
                    .Merge(
                        observableResolveError
                        .Do(e => Console.WriteLine($"Error resolving: {s.Name}"))
                        .Materialize()
                        .Select(x => Notification.CreateOnError<Service>(new Exception($"Error resolving address for service: {s.Name}"))))
                    .Dematerialize()
                    .Synchronize();

            s.ResolveAddress();

            return observableResolvedWithError.Subscribe(o);
        });

    using (
        observableFoundWithError
            .Select(s => resolveService(s))
            .Switch()
            .Subscribe(
                s => Console.WriteLine($"Found and resolved service: {s.Name} ({s.Address})"),
                ex => Console.WriteLine($"Found error: {ex.Message}"),
                () => Console.WriteLine("Found Completed")))
    {
        browser.Search();
        Console.ReadLine();
    }
}

public class ServiceBrowser
{
    public event EventHandler<ServiceFoundEventArgs> ServiceFound;
    public event EventHandler<ServiceSearchErrorEventArgs> ServiceError;

    public void Search() { }
}

public class Service
{
    public event EventHandler<EventArgs> AddressResolved;
    public event EventHandler<EventArgs> ErrorResolvingAddress;

    public string Name;
    public string Address;

    public void ResolveAddress() { }
}

public class ServiceFoundEventArgs : EventArgs
{
    public Service Service;
}

public class ServiceSearchErrorEventArgs : EventArgs
{

}

I might require a bit of tweaking - perhaps an Observable.Delay in there. Let me know if it works.

Upvotes: 2

Related Questions