Reputation: 41
foreach (...) {
Observable.FromAsync(GetData()/* an async function() */)
.DoWhile(() => true)
.Subscribe(data => {
DoSomething(); // I want all of my Subscribe() to call this on the same thread
});
}
The GetData()
will return data asynchronously. I created multiple Observable
s, subscribe to them, and want to get notified (call DoSomething()
) on the same thread whenever data is returned.
I have tried BlockingCollection<>
, but in vain... :(
Anybody knows how to do it?
PS: I don't want to use a message queue (e.g. Redis pub/sub) to achieve this.
Upvotes: 1
Views: 294
Reputation: 12667
You want to process every message on the same thread, and block the thread until the next message arrives - which is a very common paradigm called an Event Loop.
And as it turns out, you can run things in an event loop in Rx, with the appropriately named EventLoopScheduler
.
var values = Observable.Interval(TimeSpan.FromSeconds(0.1));
void DoSomething(long value) => Console.WriteLine($"Value: {value}, Thread: {Thread.CurrentThread.Name}");
var eventloop = new EventLoopScheduler();
values.ObserveOn(eventloop).Subscribe(DoSomething);
Output:
Value: 0, Thread: Event Loop 1
Value: 1, Thread: Event Loop 1
Value: 2, Thread: Event Loop 1
Value: 3, Thread: Event Loop 1
Value: 4, Thread: Event Loop 1
Value: 5, Thread: Event Loop 1
Value: 6, Thread: Event Loop 1
Value: 7, Thread: Event Loop 1
Upvotes: 2