Reputation: 4424
I have been creating a series of IObservable extension methods, which are domain-specific but basically all have the form of passing the existing sequence through another class (which implements ISubject), and providing the net output.
After I'd done a few of these, I twigged that there was a pattern, and managed to distill what I was doing right down to the following:
/// <summary>
/// Returns an observable sequence that is the output of subscribing the existing sequence to
/// a subject (the result of <paramref name="subjectFactory"/>) and consuming the result
/// </summary>
/// <returns>Kinda suspicious that this doesn't exist out of the box, now I've pared
/// it right down to what it basically is</returns>
public static IObservable<T> Decorate<T>(this IObservable<T> input, Func<ISubject<T>> subjectFactory)
{
return Observable.Create<T>(observer =>
{
var processor = subjectFactory();
var subscriptions = new[]
{
processor.Subscribe(observer),
input.Subscribe(processor),
};
return new CompositeDisposable(subscriptions);
});
}
Putting it like this reduced all the other extension methods to one-liners. But this is so generic that I was quite suspicious that this didn't exist already. But I couldn't find one.
So the question is: does this functionality exist in the base RX libraries anywhere? I'm looking for one that takes an ISubject<T>
, handles all the subscription ceremony and spits out the result (like the above)
--
Edit: yes, comments regarding avoiding Subject implementations and leaning on Create are well made. In this case I didn't want to expose the operations as methods for a couple of reasons:
IEnumerable<Func<IObservable<T>,IObservable<T>>>
and effectively created the subscriber chain that way - ie pass the input observable through each func in turn to get the output)The operations themselves lean on an internal Subject<T>
to do all the heavy lifting, so it's not like they're implementing ISubject<T>
from scratch, which I know is the road to hell...
Upvotes: 0
Views: 179
Reputation: 39222
Yes I think your function is just a variation of Multicast:
public static IObservable<T> Decorate<T>(this IObservable<T> input, Func<ISubject<T>> subjectFactory)
{
return input.Multicast(subjectFactory, t => t);
}
But to be honest, I agree with Dave's comments. Chances are you don't really need multicast functionality. The common pattern I find is that the domain specific classes should not be ISubject
implementations, but should just have methods that take an IObservable
as input and return a new IObservable
as output (with their domain logic applied). Something like:
IObservable<Bar> DoLogic(IObservable<Foo> foos)
{
return foos.Select(foo => ...);
}
Upvotes: 1