Medo42
Medo42

Reputation: 3821

How to implement an event using Reactive Extensions

The Reactive Extensions allow you to easily subscribe to an event using Observable.FromEventPattern, but I can't find anything on how you might implement an event when you have an IObservable.

My situation is this: I need to implement an interface which contains an event. That event is supposed to be called whenever a certain value of my object changes, and for thread safety reasons I need to call this event on a certain SynchronizationContext. I am also supposed to call each event handler with the current value on registration.

public interface IFooWatcher
{
    event FooChangedHandler FooChanged;
}

Getting an observable that does what I want is rather easy with Rx using BehaviorSubject:

public class FooWatcher
{
    private readonly BehaviorSubject<Foo> m_subject;
    private readonly IObservable<Foo> m_observable;

    public FooWatcher(SynchronizationContext synchronizationContext, Foo initialValue)
    {
        m_subject = new BehaviorSubject<Foo>(initialValue);
        m_observable = m_subject
            .DistinctUntilChanged()
            .ObserveOn(synchronizationContext);
    }

    public event FooChangedHandler FooChanged
    {
        add { /* ??? */ }
        remove { /* ??? */ }
    }
}

Now I am looking for an easy way to have the add and remove functions subscribe and unsubscribe the passed FooChangedHandler as an Observer<Foo> on m_observable. My current implementation looks similar to this:

    add
    {
        lock (m_lock)
        {
            IDisposable disp = m_observable.Subscribe(value);
            m_registeredObservers.Add(
                new KeyValuePair<FooChangedHandler, IDisposable>(
                    value, disp));
        }
    }

    remove
    {
        lock (m_lock)
        {
            KeyValuePair<FooChangedHandler, IDisposable> observerDisposable =
                m_registeredObservers
                    .First(pair => object.Equals(pair.Key, value));
            m_registeredObservers.Remove(observerDisposable);
            observerDisposable.Value.Dispose();
        }
    }

However, I hope to find an easier solution, because I need to implement several of these events (of differing handler types). I tried to roll my own generic solution but it creates some additional problems that need to be worked around (in particular, how you generically work with a delegate that takes a parameter of T), so I would prefer to find an existing solution that bridges the gap in this direction - just as FromEventPattern does the reverse.

Upvotes: 7

Views: 1121

Answers (2)

AlSki
AlSki

Reputation: 6961

Given that you are already mixing the boundaries between reactive and more normal code, you could do a less reactive version. To start simply declare a normal event pattern

public event FooChangedHandler FooChanged;

protected void OnFooChanged(Foo)
{
  var temp = FooChanged;
  if (temp != null)
  {  
    temp(new FooChangedEventArgs(Foo));
  }
}

and then simply connect the observable to it in the constructor

m_Observable.Subscribe(foo => OnFooChanged(foo));

It's not very Rx but it is incredibly simple.

Upvotes: 0

Richard Anthony Hein
Richard Anthony Hein

Reputation: 10650

You could do this:

public event FooChangedHandler FooChanged
{
    add { m_observable.ToEvent().OnNext += value; }
    remove { m_observable.ToEvent().OnNext -= value; }
}

However, on the remove, I think perhaps you just may want to dispose of the subscription ... or perhaps get the Action from ToEvent() and store that as a member. Untested.

EDIT: You'll have to use Action instead of a FooChangedHandler delegate, however.

EDIT 2: Here's a tested version. I suppose you need to use FooChangedHandler, however, since you have a bunch of these pre-existing handlers?

void Main()
{
    IObservable<Foo> foos = new [] { new Foo { X = 1 }, new Foo { X = 2 } }.ToObservable();
    var watcher = new FooWatcher(SynchronizationContext.Current, new Foo { X = 12 });
    watcher.FooChanged += o => o.X.Dump();  
    foos.Subscribe(watcher.Subject.OnNext); 
}

// Define other methods and classes here

//public delegate void FooChangedHandler(Foo foo);
public interface IFooWatcher
{
    event Action<Foo> FooChanged;
}

public class Foo {
    public int X { get; set; }
}
public class FooWatcher
{

    private readonly BehaviorSubject<Foo> m_subject;
    public BehaviorSubject<Foo> Subject { get { return m_subject; } }
    private readonly IObservable<Foo> m_observable;

    public FooWatcher(SynchronizationContext synchronizationContext, Foo initialValue)
    {
        m_subject = new BehaviorSubject<Foo>(initialValue);

        m_observable = m_subject
            .DistinctUntilChanged();
    }

    public event Action<Foo> FooChanged
    {
        add { m_observable.ToEvent().OnNext += value; }
        remove { m_observable.ToEvent().OnNext -= value; }
    }
}

Upvotes: 2

Related Questions