Reputation: 80378
With Microsoft Reavtive Extensions (RX), I'm wondering if its possible to block until an event occurs?
Somthing like this:
observableStream.BlockUntilTrue(o => o.MyProperty == true);
What I have tried
I have tried observableStream.TakeUntil(o => o.MyProperty == true);
, but this exits immediately.
Upvotes: 2
Views: 1999
Reputation: 106926
I have rewritten my answer after reading your comments. In your case you can use First
but it changes the asynchronous nature of RX into synchronous code that blocks. I guess that is what your question is about.
var firstValue = observableStream.
.Where(o => o.MyProperty)
.First();
The call to First
will block and wait for the first value to arrive from the observable sequence which seems to be what you want.
Upvotes: 5
Reputation: 80378
This demo code works well. It adds an extension method that blocks until a single event occurs on the stream. If a timeout was added to the Take() for the BlockingCollection, it would wait until either an event occurred, or a timeout occurred.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace RX_2
{
public static class Program
{
static void Main(string[] args)
{
Subject<bool> stream = new Subject<bool>();
Task.Run(
() =>
{
for (int i = 0; i < 4; i++)
{
Thread.Sleep(TimeSpan.FromMilliseconds(500));
stream.OnNext(false);
}
stream.OnNext(true);
});
Console.Write("Start\n");
stream.Where(o => o == true).BlockUntilEvent();
Console.Write("Stop\n");
Console.ReadKey();
}
public static void BlockUntilEvent(this IObservable<bool> stream)
{
BlockingCollection<bool> blockingCollection = new BlockingCollection<bool>();
stream.Subscribe(blockingCollection.Add);
var result = blockingCollection.Take();
}
}
}
Upvotes: 0