Reputation: 6531
I'm trying to convert an existing event-based API to a Reactive Observable API. The concrete API I'm working with is the NSNetServiceBrowser
in Xamarin.iOS. This API let you browse for network devices using Zeroconf/Bonjour. However, the question would apply to any API of this kind.
The NsNetServiceBrowser
offers various events of interest:
- FoundService
- NotSearched
- ServiceRemoved
The FoundService
event is raised when a service is discovered, and the NotSearched
is raised when the search fails.
I would like to combine the FoundService
and NotSerched
events, into an observable of NSNetService
.
My current implementation looks like this:
public IObservable<NSNetService> Search()
{
var foundObservable = Observable
.FromEventPattern<NSNetServiceEventArgs>(
h => serviceBrowser.FoundService += h,
h => serviceBrowser.FoundService -= h)
.Select(x => x.EventArgs);
var notSearchedObservable = Observable
.FromEventPattern<NSNetServiceErrorEventArgs>(
h => serviceBrowser.NotSearched += h,
h => serviceBrowser.NotSearched -= h)
.Select(x => x.EventArgs);
var serviceObservable = Observable.Create(
(IObserver<NSNetServiceEventArgs> observer) =>
{
notSearchedObservable.Subscribe(n =>
{
string errorMessage = $"Search for {serviceType} failed:";
foreach (var kv in n.Errors)
{
log.Error($"\t{kv.Key}: {kv.Value}");
errorMessage += $" ({kv.Key}, {kv.Value})";
}
observer.OnError(new Exception(errorMessage));
});
foundObservable.Subscribe(observer);
return System.Reactive.Disposables.Disposable.Empty;
}).Select(x => x.Service);
serviceBrowser.SearchForServices(serviceType, domain);
return serviceObservable;
}
The code looks clunky and I have a gut feeling I'm not using System.Reactive
correctly? Is there a more elegant way to combine event pairs, where one is producing and the other is signaling error? This is a common pattern in existing event based APIs in .NET.
Here is a small console app (depending only on System.Reactive) illustrating the type of API I want to Reactify:
using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading.Tasks;
namespace ReactiveLearning
{
class Program
{
static void Main(string[] args)
{
var browser = new ServiceBrowser();
var observableFound =
Observable.FromEventPattern<ServiceFoundEventArgs>(
h => browser.ServiceFound += h,
h => browser.ServiceFound -= h)
.Select(e => e.EventArgs.Service);
var observableError =
Observable.FromEventPattern<ServiceSearchErrorEventArgs>(
h => browser.ServiceError += h,
h => browser.ServiceError -= h);
var foundSub = observableFound.Subscribe(s =>
{
Console.WriteLine($"Found service: {s.Name}");
}, () =>
{
Console.WriteLine("Found Completed");
});
var errorSub = observableError.Subscribe(e =>
{
Console.WriteLine("ERROR!");
}, () =>
{
Console.WriteLine("Error Completed");
});
browser.Search();
Console.ReadLine();
foundSub.Dispose();
errorSub.Dispose();
Console.WriteLine();
}
}
class ServiceBrowser
{
public EventHandler<ServiceFoundEventArgs> ServiceFound;
public EventHandler<ServiceSearchErrorEventArgs> ServiceError;
public void Search()
{
Task.Run(async () =>
{
for (var i = 0; i < 5; ++i)
{
await Task.Delay(1000);
ServiceFound?.Invoke(this, new ServiceFoundEventArgs(new Service($"Service {i}")));
}
var r = new Random();
if (r.NextDouble() > 0.5)
{
ServiceError?.Invoke(this, new ServiceSearchErrorEventArgs());
}
});
}
}
class ServiceFoundEventArgs : EventArgs
{
public Service Service { get; private set; }
public ServiceFoundEventArgs(Service service) => Service = service;
}
class ServiceSearchErrorEventArgs : EventArgs {}
class Service
{
public event EventHandler<EventArgs> AddressResolved;
public event EventHandler<EventArgs> ErrorResolvingAddress;
public string Name { get; private set; }
public string Address { get; private set; }
public Service(string name) => Name = name;
public void ResolveAddress()
{
Task.Run(async () =>
{
await Task.Delay(500);
var r = new Random();
if (r.NextDouble() > 0.5)
{
Address = $"http://{Name}.com";
AddressResolved?.Invoke(this, EventArgs.Empty);
}
else
{
ErrorResolvingAddress?.Invoke(this, EventArgs.Empty);
}
});
}
}
}
Upvotes: 1
Views: 198
Reputation: 117027
Thank you for the excellent sample code. You need to make use of the excellent Materialize
& Dematerialize
operators. Here's how:
var observableFoundWithError =
observableFound
.Materialize()
.Merge(
observableError
.Materialize()
.Select(x =>
Notification
.CreateOnError<Service>(new Exception("Error"))))
.Dematerialize()
.Synchronize();
using (observableFoundWithError.Subscribe(
s => Console.WriteLine($"Found service: {s.Name}"),
ex => Console.WriteLine($"Found error: {ex.Message}"),
() => Console.WriteLine("Found Completed")))
{
browser.Search();
Console.ReadLine();
}
The Materialize()
operator turns an IObservable<T>
into and IObservable<Notification<T>>
which allows the standard OnError
and OnCompleted
to be emitted through the OnNext
call. You can use Notification.CreateOnError<T>(new Exception("Error"))
to construct elements of observable which you can turn back into an IObservable<T>
with Dematerialize()
.
I've thrown the Synchronize()
to ensure that you've created a valid observable. The use of Materialize()
does let you construct observables that don't follow the regular observable contract. Part of what Synchronize()
does is just ensure only one OnError
and only one OnCompleted
and drops any OnNext
that comes after either of the two.
Try this as a way to do what you wanted in the comments:
static void Main(string[] args)
{
var browser = new ServiceBrowser();
var observableFound =
Observable.FromEventPattern<ServiceFoundEventArgs>(
h => browser.ServiceFound += h,
h => browser.ServiceFound -= h)
.Select(e => e.EventArgs.Service);
var observableError =
Observable.FromEventPattern<ServiceSearchErrorEventArgs>(
h => browser.ServiceError += h,
h => browser.ServiceError -= h);
var observableFoundWithError = observableFound
.Materialize()
.Merge(
observableError
.Materialize()
.Select(x => Notification.CreateOnError<Service>(new Exception("Error"))))
.Dematerialize()
.Synchronize();
Func<Service, IObservable<Service>> resolveService = s =>
Observable.Create<Service>(o =>
{
var observableResolved = Observable.FromEventPattern<EventArgs>(
h => s.AddressResolved += h,
h => s.AddressResolved -= h);
var observableResolveError = Observable.FromEventPattern<EventArgs>(
h => s.ErrorResolvingAddress += h,
h => s.ErrorResolvingAddress -= h);
var observableResolvedWithError =
observableResolved
.Select(x => s)
.Materialize()
.Merge(
observableResolveError
.Do(e => Console.WriteLine($"Error resolving: {s.Name}"))
.Materialize()
.Select(x => Notification.CreateOnError<Service>(new Exception($"Error resolving address for service: {s.Name}"))))
.Dematerialize()
.Synchronize();
s.ResolveAddress();
return observableResolvedWithError.Subscribe(o);
});
using (
observableFoundWithError
.Select(s => resolveService(s))
.Switch()
.Subscribe(
s => Console.WriteLine($"Found and resolved service: {s.Name} ({s.Address})"),
ex => Console.WriteLine($"Found error: {ex.Message}"),
() => Console.WriteLine("Found Completed")))
{
browser.Search();
Console.ReadLine();
}
}
public class ServiceBrowser
{
public event EventHandler<ServiceFoundEventArgs> ServiceFound;
public event EventHandler<ServiceSearchErrorEventArgs> ServiceError;
public void Search() { }
}
public class Service
{
public event EventHandler<EventArgs> AddressResolved;
public event EventHandler<EventArgs> ErrorResolvingAddress;
public string Name;
public string Address;
public void ResolveAddress() { }
}
public class ServiceFoundEventArgs : EventArgs
{
public Service Service;
}
public class ServiceSearchErrorEventArgs : EventArgs
{
}
I might require a bit of tweaking - perhaps an Observable.Delay
in there. Let me know if it works.
Upvotes: 2