Reputation: 7746
If I say
void ChangeQuote(string symbol, double bid, double ask)
{
}
string[] TechETF = {"AAPL", "MSFT"};
var quotesSubscription = quotesObservable.Where(quote => (TechETF.Contains(quote.Symbol)));
quotesSubscription.Subscribe(quote => this.ChangeQuote(quote.Symbol, quote.Bid, quote.Ask));
If the underlying events are fired asynchronously, is this code inside subscribe()
also handing those callbacks asynchronously on their own thread (threadpool) ? Or does this code serialize (block on) the processing of those callbacks?
Upvotes: 4
Views: 171
Reputation: 16117
If you have two OnNext handlers for the same Observable running in parallel, you can no longer guarantee that the values provided by the Observable are processed in the sequence they were produced. This is part of the Rx contract, as Enigmativity says. For that reason, as you can see in Enigmativity's example, Rx serializes the notifications.
Using ObserveOn with a Threadpool or NewThread scheduler doesn't mean that every notification can run in parallel. It means that the notification handling code can run in series in a different context than the producer of the notification.
If you truly want to handle notifications in parallel, and accept the race condition consequences of this, then you can always spawn new threadpool tasks or similar inside the notification handler. Generally this is not what you really want to do.
Upvotes: 2
Reputation: 117175
Rx always serializes calls. It's part of the behaviour contract.
Here's an example to show that this is the case.
Start with two timers:
var timers = new []
{
new System.Timers.Timer()
{
Interval = 200.0,
AutoReset = true,
Enabled = true,
},
new System.Timers.Timer()
{
Interval = 250.0,
AutoReset = true,
Enabled = true,
},
};
Now build two observables from the timers:
var observables = new []
{
Observable.FromEventPattern
<System.Timers.ElapsedEventHandler, System.Timers.ElapsedEventArgs>
(h => timers[0].Elapsed += h, h => timers[0].Elapsed -= h),
Observable.FromEventPattern
<System.Timers.ElapsedEventHandler, System.Timers.ElapsedEventArgs>
(h => timers[1].Elapsed += h, h => timers[1].Elapsed -= h),
};
Then merge them into a single observable:
var observable = observables.Merge();
And finally subscribe to see what happens:
observable
.Subscribe(ep =>
{
Console.WriteLine("Start: " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(2000);
Console.WriteLine("End: " + Thread.CurrentThread.ManagedThreadId);
});
The kind of result I get is this:
Start: 15
End: 15
Start: 11
End: 11
Start: 14
End: 14
Start: 10
End: 10
Start: 4
End: 4
Start: 13
End: 13
Start: 12
End: 12
Start: 3
End: 3
So even though my timers are firing asynchronously to each other, Rx is ensuring that the single subscription always handles each value in series even when they come in on different threads.
Upvotes: 4