NeddySpaghetti
NeddySpaghetti

Reputation: 13495

Publish last value of event for new subscribers

I have a class Foo with an event that publishes a FooState enum. I want to turn this event into an observable that replays the last value for new subscribers.

Even if there are no subscribers, any new subscriber should get the last value.

public enum FooState
{
    Stopped = 0,
    Starting = 1,
    Running = 2,        
}

public delegate void FooEventHandler(object sender, FooEventArgs e);

public class FooEventArgs : EventArgs
{
    public FooEventArgs(FooState fooState)
    {
        this.State = fooState;
    }

    public FooState State {get; private set;}
}

public class Foo
{
    public event FooEventHandler FooEvent;

    public void OnFooEvent(FooState state)
    {
        var fooEvent = FooEvent;

        if(fooEvent != null)
        {
            fooEvent(this, new FooEventArgs(state));
        }
    }
}

My attempts so far revolved around using Publish, RefCount and Replay. But none of the combinations I tried work if I subscribe to the observable after I fire the event.

Replay(1).RefCount() works as long there is already at least one subscription but I need to work for the first late subscription as well.

var foo = new Foo();

   var obs =  Observable.FromEventPattern<FooEventHandler, FooEventArgs>(
                                        h => foo.FooEvent += h,
                                        h => foo.FooEvent -= h)
                                    .DistinctUntilChanged()
                                    .Replay(1)
                                    .RefCount();

    // Works if this line is uncomented.
    //obs.Subscribe(x => Console.WriteLine("Early Subscriber = " + x.EventArgs.State));

    foo.OnFooEvent(FooState.Running);

    obs.Subscribe(x => Console.WriteLine("Late Subscriber = " + x.EventArgs.State));

Does anyone know how to do this with Rx?

Upvotes: 3

Views: 1162

Answers (2)

paulpdaniels
paulpdaniels

Reputation: 18663

RefCount connects only after the first subscription. If you want to have fine grained control of when the connection occurs you should use Replay + Connect.

So do instead:

var publishedSource = eventSource.DistinctUntilChanged().Replay(1);

var connection = publishedSource.Connect();

//Subscribe to publishedSource to receive events and dispose of 
connection when you are done.

Posted from my phone so apologies for any syntax errors in advance.

Upvotes: 3

H_Andr
H_Andr

Reputation: 179

Rx is doing the right thing converting your event notifications to your stream and replaying them, but what you are asking is: "Why when I subscribe to the event, don't I get the initial state".

Events don't work like that. If I do a += on foo.FooEvent, I don't get an immediate trigger with the current value. I only get notified when it changes. As you have noticed, 'Replay' will replay subsequent events, but not provide the state at the time of subscription.

To solve your problem, you'll need to ensure that the current value is put onto the stream before you hook up the stream for change notifications. Check out Observable.StartWith().

i.e. Do ".StartWith(foo.State)" before the.DistinctUntilChanged() call (immediately after the .FromEventPattern).

Upvotes: 1

Related Questions