Contango
Contango

Reputation: 80378

How to block until an event occurs in an RX stream?

With Microsoft Reavtive Extensions (RX), I'm wondering if its possible to block until an event occurs?

Somthing like this:

observableStream.BlockUntilTrue(o => o.MyProperty == true);

What I have tried

I have tried observableStream.TakeUntil(o => o.MyProperty == true);, but this exits immediately.

Upvotes: 2

Views: 1999

Answers (2)

Martin Liversage
Martin Liversage

Reputation: 106926

I have rewritten my answer after reading your comments. In your case you can use First but it changes the asynchronous nature of RX into synchronous code that blocks. I guess that is what your question is about.

var firstValue = observableStream.
  .Where(o => o.MyProperty)
  .First();

The call to First will block and wait for the first value to arrive from the observable sequence which seems to be what you want.

Upvotes: 5

Contango
Contango

Reputation: 80378

This demo code works well. It adds an extension method that blocks until a single event occurs on the stream. If a timeout was added to the Take() for the BlockingCollection, it would wait until either an event occurred, or a timeout occurred.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RX_2
{
    public static class Program
    {
        static void Main(string[] args)
        {
            Subject<bool> stream = new Subject<bool>();

            Task.Run(
                () =>
                {
                    for (int i = 0; i < 4; i++)
                    {
                        Thread.Sleep(TimeSpan.FromMilliseconds(500));
                        stream.OnNext(false);
                    }
                    stream.OnNext(true);
                });

            Console.Write("Start\n");
            stream.Where(o => o == true).BlockUntilEvent();
            Console.Write("Stop\n");
            Console.ReadKey();
        }

        public static void BlockUntilEvent(this IObservable<bool> stream)
        {
            BlockingCollection<bool> blockingCollection = new BlockingCollection<bool>();
            stream.Subscribe(blockingCollection.Add);
            var result = blockingCollection.Take();
        }
    }
}

Upvotes: 0

Related Questions