Reputation: 21271
I have a typed message broker similar to what Caliburn provides:
public interface IMessageBroker
{
void Publish<T>(T message);
IDisposable Subscribe<T>(Action<T> subscriber);
}
How can I to convert subscriptions to IObservable?
I want an extension method, something like this:
public static IObservable<T> Subscribe<T>(this IMessageBroker messageBroker)
{
var subject = new Subject<T>();
messageBroker.Subscribe<T>(subject.OnNext);
return subject;
}
problem in this implementation is that I can't unsubscribe and so it leaks.
Better name for Subscribe method is also welcomed.
Upvotes: 0
Views: 353
Reputation: 2431
Try this (Tested)
How can I to convert subscriptions to IObservable?
You can, using the the Observable.Create
by creating the following extension method:
public static IObservable<T> AsObservable<T>(this IMessageBroker messageBroker)
{
return Observable.Create<T>(observer => messageBroker.Subscribe<T>(observer.OnNext));
}
Note: System.Reactive nuget package doesn´t have Observable.CreateWithDisposable
Or without Rx (why? may be not wanting dependencies):
public static IObservable<T> AsObservable<T>(this IMessageBroker messageBroker)
{
return new DelegateObservable(observer => messageBroker.Subscribe<T>(observer.OnNext));
}
public class DelegateObservable<T> : IObservable<T>
{
private Func<IObserver<T>, IDisposable> subscriber;
public DelegateObservable(Func<IObserver<T>, IDisposable> subscriber)
{
this.subscriber = subscriber;
}
public IDisposable Subscribe(IObserver<T> observer)
{
return this.subscriber(observer);
}
}
Upvotes: 0
Reputation: 66531
Try this (untested):
public static IObservable<T> ToObservable<T>(this IMessageBroker messageBroker)
{
IObservable<T> observable = Observable.CreateWithDisposable<T>(o =>
{
return messageBroker.Subscribe<T>(o.OnNext);
});
return observable;
}
Which you should be able to use like this:
var observableBroker = messageBroker.ToObservable<int>();
var subject = new Subject<int>();
observableBroker.Subscribe(subject.OnNext);
//alternatively, there are overloads of Observerable.Subscribe which take lambdas:
observableBroker.Subscribe(t => DoSomethingWith(t));
Upvotes: 3