Cel
Cel

Reputation: 6689

Observable that produces a single item per subscription - one event per handler

Let's say I would like to expose an observable that notifies either immediately, if an Internet connection is available right now, or if the device is not connected to the Internet the notification would be pushed out when it does become available:

IObservable<DateTime> InternetBecameAvailableSignalledOncePerSubscriber { get; }

Furthermore, there should be only one notification per subscription without requiring the subscriber to do .Take(1) or anything like that.

i.e. the client that depends on an Internet resource would use this observable to do something right now or as soon as the Internet becomes available, but would not do it more than once - there would be no more signalling to that subscriber if Internet became unavailable and available a second time..

How could this be implemented with Reactive Extensions (Rx) ?

Upvotes: 2

Views: 1327

Answers (1)

Lee Campbell
Lee Campbell

Reputation: 10783

This should be easily solved with Rx. The question is how will you know if the internet is available? Is it based on other consumers subscribing, is it based on another method (like a Connect() method) or some event that is being pushed to you (like a WCF Channel state changed event)?

Depending on this answer, it seems you just need to encapsulate your Take(1) and Replay(1).

public class IServiceClient
{
    IObservable<DateTime> LastConnnected { get; }
}

public class ServiceClient : IServiceClient, IDisposable
{
    private readonly IDisposable _connection;
    private readonly IObservable<DateTime> _lastConnnected;

    public ServiceClient()
    {
        //Question 1) where does the 'Connected' sequence come from i.e. what is it that tells you that you have internet connectivity?
        //Question 2) When should the subscription be made to 'Connected'? Here I cheat and do it in the ctor, not great.
        var connected = Connected.Replay(1)
                                .Where(isConnected=>isConnected)
                                .Take(1)
                                .Select(_=>DateTime.UtcNow);

        _lastConnnected = connected;
        _connection = connected.Connect();
    }

    public IObservable<DateTime> LastConnnected{ get {return _lastConnnected; } }

    public void Dispose()
    {
        _connection.Dispose();
    }
}

This does leave you with some other questions to answer e.g. what is the thing that tells you if you have internet connectivity and what is the resource management plan for this?

Updated code

public interface IServiceClient
{
    IObservable<DateTime> LastConnnected { get; }
}

public class ServiceClient : IServiceClient, IDisposable
{
    private readonly IDisposable _connection;
    private readonly IObservable<bool> _lastConnnected;

    public ServiceClient(IObservable<ConnectionState> connectedStates)
    {
        var cachedStates = connectedStates.Select(state=>state.IsConnected).Replay(1);
        _lastConnnected = cachedStates;
        _connection = cachedStates.Connect();
    }

    public IObservable<DateTime> LastConnnected
    { 
        get 
        {
            return _lastConnnected.StartWith(IsConnected())
                                  .Where(isConnected=>isConnected)
                                  .Take(1)
                                  .Select(_=>DateTime.UtcNow); 
        } 
    }

    //....

    public void Dispose()
    {
        _connection.Dispose();
    }
}

Upvotes: 1

Related Questions