user12857692
user12857692

Reputation: 41

Subscribe to multiple Observables and gets notified on the same thread

foreach (...) {
    Observable.FromAsync(GetData()/* an async function() */)
        .DoWhile(() => true)
        .Subscribe(data => {
            DoSomething(); // I want all of my Subscribe() to call this on the same thread
        });
}

The GetData() will return data asynchronously. I created multiple Observables, subscribe to them, and want to get notified (call DoSomething()) on the same thread whenever data is returned. I have tried BlockingCollection<>, but in vain... :(

Anybody knows how to do it?

PS: I don't want to use a message queue (e.g. Redis pub/sub) to achieve this.

Upvotes: 1

Views: 294

Answers (1)

Asti
Asti

Reputation: 12667

You want to process every message on the same thread, and block the thread until the next message arrives - which is a very common paradigm called an Event Loop.

And as it turns out, you can run things in an event loop in Rx, with the appropriately named EventLoopScheduler.

    var values = Observable.Interval(TimeSpan.FromSeconds(0.1));

    void DoSomething(long value) => Console.WriteLine($"Value: {value}, Thread: {Thread.CurrentThread.Name}");

    var eventloop = new EventLoopScheduler();

    values.ObserveOn(eventloop).Subscribe(DoSomething);

Output:

Value: 0, Thread: Event Loop 1
Value: 1, Thread: Event Loop 1
Value: 2, Thread: Event Loop 1
Value: 3, Thread: Event Loop 1
Value: 4, Thread: Event Loop 1
Value: 5, Thread: Event Loop 1
Value: 6, Thread: Event Loop 1
Value: 7, Thread: Event Loop 1

Upvotes: 2

Related Questions