twon33
twon33

Reputation: 533

Wrapping legacy object in IConnectableObservable

I have a legacy event-based object that seems like a perfect fit for RX: after being connected to a network source, it raises events when a message is received, and may terminate with either a single error (connection dies, etc.) or (rarely) an indication that there will be no more messages. This object also has a couple projections -- most users are interested in only a subset of the messages received, so there are alternate events raised only when well-known message subtypes show up.

So, in the process of learning more about reactive programming, I built the following wrapper:

class LegacyReactiveWrapper : IConnectableObservable<TopLevelMessage>
{
    private LegacyType _Legacy;
    private IConnectableObservable<TopLevelMessage> _Impl;
    public LegacyReactiveWrapper(LegacyType t) 
    {
        _Legacy = t;
        var observable = Observable.Create<TopLevelMessage>((observer) => 
        {
            LegacyTopLevelMessageHandler tlmHandler = (sender, tlm) => observer.OnNext(tlm);
            LegacyErrorHandler errHandler = (sender, err) => observer.OnError(new ApplicationException(err.Message));
            LegacyCompleteHandler doneHandler = (sender) => observer.OnCompleted();

            _Legacy.TopLevelMessage += tlmHandler;
            _Legacy.Error += errHandler;
            _Legacy.Complete += doneHandler;

            return Disposable.Create(() => 
            {
                _Legacy.TopLevelMessage -= tlmHandler;
                _Legacy.Error -= errHandler;
                _Legacy.Complete -= doneHandler;
            });
        });

        _Impl = observable.Publish();
    }

    public IDisposable Subscribe(IObserver<TopLevelMessage> observer)
    {
        return _Impl.RefCount().Subscribe(observer);
    }

    public IDisposable Connect()
    {
        _Legacy.ConnectToMessageSource();
        return Disposable.Create(() => _Legacy.DisconnectFromMessageSource());
    }

    public IObservable<SubMessageA> MessageA
    {
        get
        {
            // This is the moral equivalent of the projection behavior
            // that already exists in the legacy type. We don't hook
            // the LegacyType.MessageA event directly.
            return _Impl.RefCount()
                    .Where((tlm) => tlm.MessageType == MessageType.MessageA)
                    .Select((tlm) => tlm.SubMessageA);
        }
    }

    public IObservable<SubMessageB> MessageB
    {
        get
        {
            return _Impl.RefCount()
                    .Where((tlm) => tlm.MessageType == MessageType.MessageB)
                    .Select((tlm) => tlm.SubMessageB);
        }
    }
}

Something about how it's used elsewhere feels... off... somehow, though. Here's sample usage, which works but feels strange. The UI context for my test application is WinForms, but it doesn't really matter.

// in Program.Main... 

MainForm frm = new MainForm();

// Updates the UI based on a stream of SubMessageA's
IObserver<SubMessageA> uiManager = new MainFormUiManager(frm);

LegacyType lt = new LegacyType();
// ... setup lt...

var w = new LegacyReactiveWrapper(lt);

var uiUpdateSubscription = (from msgA in w.MessageA
                            where SomeCondition(msgA)
                            select msgA).ObserveOn(frm).Subscribe(uiManager);

var nonUiSubscription = (from msgB in w.MessageB
                         where msgB.SubType == MessageBType.SomeSubType
                         select msgB).Subscribe(
                             m => Console.WriteLine("Got MsgB: {0}", m),
                             ex => Console.WriteLine("MsgB error: {0}", ex.Message),
                             () => Console.WriteLine("MsgB complete")
                         );

IDisposable unsubscribeAtExit = null;
frm.Load += (sender, e) => 
{
    var connectionSubscription = w.Connect();
    unsubscribeAtExit = new CompositeDisposable(
                               uiUpdateSubscription,
                               nonUiSubscription,
                               connectionSubscription);
};

frm.FormClosing += (sender, e) => 
{
    if(unsubscribeAtExit != null) { unsubscribeAtExit.Dispose(); }
};


Application.Run(frm);

This WORKS -- The form launches, the UI updates, and when I close it the subscriptions get cleaned up and the process exits (which it won't do if the LegacyType's network connection is still connected). Strictly speaking, it's enough to dispose just connectionSubscription. However, the explicit Connect feels weird to me. Since RefCount is supposed to do that for you, I tried modifying the wrapper such that rather than using _Impl.RefCount in MessageA and MessageB and explicitly connecting later, I used this.RefCount instead and moved the calls to Subscribe to the Load handler. That had a different problem -- the second subscription triggered another call to LegacyReactiveWrapper.Connect. I thought the idea behind Publish/RefCount was "first-in triggers connection, last-out disposes connection."

I guess I have three questions:

  1. Do I fundamentally misunderstand Publish/RefCount?
  2. Is there some preferred way to implement IConnectableObservable<T> that doesn't involve delegation to one obtained via IObservable<T>.Publish? I know you're not supposed to implement IObservable<T> yourself, but I don't understand how to inject connection logic into the IConnectableObservable<T> that Observable.Create().Publish() gives you. Is Connect supposed to be idempotent?
  3. Would someone more familiar with RX/reactive programming look at the sample for how the wrapper is used and say "that's ugly and broken" or is this not as weird as it seems?

Upvotes: 1

Views: 216

Answers (2)

James World
James World

Reputation: 29776

I'm not sure that you need to expose Connect directly as you have. I would simplify as follows, using Publish().RefCount() as an encapsulated implementation detail; it would cause the legacy connection to be made only as required. Here the first subscriber in causes connection, and the last one out causes disconnection. Also note this correctly shares a single RefCount across all subscribers, whereas your implementation uses a RefCount per message type, which isn't probably what was intended. Users are not required to Connect explicitly:

public class LegacyReactiveWrapper
{
    private IObservable<TopLevelMessage> _legacyRx; 

    public LegacyReactiveWrapper(LegacyType legacy)
    {
        _legacyRx = WrapLegacy(legacy).Publish().RefCount();
    }

    private static IObservable<TopLevelMessage> WrapLegacy(LegacyType legacy)
    {
        return Observable.Create<TopLevelMessage>(observer =>
        {
            LegacyTopLevelMessageHandler tlmHandler = (sender, tlm) => observer.OnNext(tlm);
            LegacyErrorHandler errHandler = (sender, err) => observer.OnError(new ApplicationException(err.Message));
            LegacyCompleteHandler doneHandler = sender => observer.OnCompleted();

            legacy.TopLevelMessage += tlmHandler;
            legacy.Error += errHandler;
            legacy.Complete += doneHandler;
            legacy.ConnectToMessageSource();

            return Disposable.Create(() =>
            {
                legacy.DisconnectFromMessageSource();
                legacy.TopLevelMessage -= tlmHandler;
                legacy.Error -= errHandler;
                legacy.Complete -= doneHandler;
            });
        });
    }

    public IObservable<TopLevelMessage> TopLevelMessage
    {
        get
        {
            return _legacyRx;
        }
    }

    public IObservable<SubMessageA> MessageA
    {
        get
        {
            return _legacyRx.Where(tlm => tlm.MessageType == MessageType.MessageA)
                            .Select(tlm => tlm.SubMessageA);
        }
    }

    public IObservable<SubMessageB> MessageB
    {
        get
        {
            return _legacyRx.Where(tlm => tlm.MessageType == MessageType.MessageB)
                            .Select(tlm => tlm.SubMessageB);
        }
    }
}

An additional observation is that Publish().RefCount() will drop the underlying subscription when it's subscriber count reaches 0. Typically I only use Connect over this choice when I need to maintain a subscription even when the subscriber count on the published source drops to zero (and may go back up again later). It's rare to need to do this though - only when connecting is more expensive than holding on to the subscription resource when you might not need to.

Upvotes: 2

Gideon Engelberth
Gideon Engelberth

Reputation: 6155

  1. Your understanding is not entirely wrong, but you do appear to have some points of misunderstanding.

    You seem to be under the belief that multiple calls to RefCount on the same source IObservable will result in a shared reference count. They do not; each instance keeps its own count. As such, you are causing multiple subscriptions to _Impl, one per call to subscribe or call to the Message properties.

    You also may be expecting that making _Impl an IConnectableObservable somehow causes your Connect method to be called (since you seem surprised you needed to call Connect in your consuming code). All Publish does is cause subscribers to the published object (returned from the .Publish() call) to share a single subscription to the underlying source observable (in this case, the object made from your call to Observable.Create).

    Typically, I see Publish and RefCount used immediately together (eg as source.Publish().RefCount()) to get the shared subscription effect described above or to make a cold observable hot without needing to call Connect to start the subscription to the original source. However, this relies on using the same object returned from the .Publish().RefCount() for all subscribers (as noted above).

  2. Your implementation of Connect seems reasonable. I don't know of any recommendations for if Connect should be idempotent, but I would not personally expect it to be. If you wanted it to be, you would just need to track calls to it the disposal of its return value to get the right balance.

    I don't think you need to use Publish the way you are, unless there is some reason to avoid multiple event handlers being attached to the legacy object. If you do need to avoid that, I would recommend changing _Impl to a plain IObservable and follow the Publish with a RefCount.

  3. Your MessageA and MessageB properties have potential to be a source of confusion for users, since they return an IObservable, but still require a call to Connect on the base object to start receiving messages. I would either change them to IConnectableObservables that somehow delegate to the original Connect (at which point the idempotency discussion becomes more relevant) or drop the properties and just let the users make the (fairly simple) projections themselves when needed.

Upvotes: 1

Related Questions