Samet S.
Samet S.

Reputation: 475

How to observe dependent events in Reactive Extensions (Rx)?

What is the best way to handle dependent events such as;
There is an object for which I need to test if connection is succeeded or failed. But the object first needs to pass the initialization step which I test for success or failure and then continue to connection step.

My code is below. Is there a better way to handle those dependent events because I'm subscribing for connection inside initialization subscription?

If I have more dependent events like this will I keep nesting the subscriptions?

public static void Test()
{
    const int maxValue = 501;

    var random = new Random(BitConverter.ToInt32(Guid.NewGuid().ToByteArray(), 0));

    var initOk = Observable.Interval(TimeSpan.FromMilliseconds(random.Next(maxValue))).Select(i => true);
    var initKo = Observable.Interval(TimeSpan.FromMilliseconds(random.Next(maxValue))).Select(i => false);

    var connectOk = Observable.Interval(TimeSpan.FromMilliseconds(random.Next(maxValue))).Select(i => true);
    var connectKo = Observable.Interval(TimeSpan.FromMilliseconds(random.Next(maxValue))).Select(i => false);

    var initResult = initOk.Amb(initKo).Take(1);
    var connectResult = connectOk.Amb(connectKo).Take(1);

    var id =
        initResult.Subscribe(ir =>
                                 {
                                     if (ir)
                                     {
                                         var cd =
                                             connectResult.Subscribe(cr =>
                                                                         {
                                                                             Console.WriteLine(cr
                                                                                                   ? "Connection succeeded."
                                                                                                   : "Connection failed.");
                                                                         });
                                     }
                                     else
                                     {
                                         Console.WriteLine("Initialization failed thus connection failed.");
                                     }
                                 });
}

Upvotes: 2

Views: 854

Answers (2)

Enigmativity
Enigmativity

Reputation: 117019

You could use this:

var finalResult =
    initResult
        .Select(ir =>
            Observable.If(() => ir, connectResult, Observable.Return(false)))
        .Merge();

To get your messages out, you could change it like this:

var initResultText =
    initResult
        .Select(ir =>
            ir ? (string)null : "Initialization failed thus connection failed.");

var connectResultText =
    connectResult
        .Select(cr => 
            String.Format("Connection {0}.", cr ? "succeeded" : "failed"));

var finalResult =
    initResultText
        .Select(irt =>
            Observable.If(() =>
                irt == null, connectResultText, Observable.Return(irt)))
        .Merge();

If you need to nest further than this you should consider making an extension method that wraps up the complexity and thus composition would be much easier.

Upvotes: 0

James Hay
James Hay

Reputation: 12700

You can normally avoid nesting by utilising a variety of the rx operators to chain calls up.

Your example could be tidied up in using:

initResult.SelectMany(ir =>
       {   
           if (ir != null)
           {
             return connectResult;
           }

           Console.WriteLine("Initialization failed thus connection failed.");

           return Observable.Throw(new Exception("Some Exception"));
       })
       .Subscribe(cr =>
           {
              Console.WriteLine(cr != null
                 ? "Connection succeeded." 
                 : "Connection failed.");
           })

Upvotes: 2

Related Questions