Reputation: 6986
I am using RX 2.2.5
In the sample below, I expect the handler (the delegate to Subscribe()
) to run on a new thread however when running the app all of the 10 numbers are consumed on the same thread one after another.
Console.WriteLine("Main Thread: {0}", Thread.CurrentThread.ManagedThreadId);
var source = Observable.Range(1, 10, TaskPoolScheduler.Default)
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(n =>
{
Thread.Sleep(1000);
Console.WriteLine(
"Value: {0} Thread: {1} IsPool: {2}",
n, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
});
Output:
Main Thread: 1
Value: 1 on Thread: 4 IsPool: False
Value: 2 on Thread: 4 IsPool: False
Value: 3 on Thread: 4 IsPool: False
Value: 4 on Thread: 4 IsPool: False
Value: 5 on Thread: 4 IsPool: False
Value: 6 on Thread: 4 IsPool: False
Value: 7 on Thread: 4 IsPool: False
Value: 8 on Thread: 4 IsPool: False
Value: 9 on Thread: 4 IsPool: False
Value: 10 on Thread: 4 IsPool: False
The fact that they run sequentially is a mystery as well since I am using the TaskPoolScheduler
to generate the numbers.
Even if I replace the NewThreadScheduler
with TaskPoolScheduler
or ThreadPoolScheduler
I still get one thread and the more interesting part is that in both those cases the Thread.CurrentThread.IsThreadPoolThread
is False
.
I cannot explain this behaviour as when I look at the ThreadPoolScheduler
I see:
public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
SingleAssignmentDisposable d = new SingleAssignmentDisposable();
ThreadPool.QueueUserWorkItem((WaitCallback) (_ =>
{
if (d.IsDisposed)
return;
d.Disposable = action((IScheduler) this, state);
}), (object) null);
return (IDisposable) d;
}
I can clearly see ThreadPool.QueueUserWorkItem...
so why IsPool == False
?
What am I missing here?
Upvotes: 3
Views: 1218
Reputation: 10783
As per my comment above, It looks like you are trying to use Rx (a library for querying and composing observable sequences of data) as a Parallel computation library?
I think to see the results you were expecting you could hack your query to be like this
Console.WriteLine("Main Thread: {0}", Thread.CurrentThread.ManagedThreadId);
var source = Observable.Range(1, 10, TaskPoolScheduler.Default)
.SelectMany(i=>Observable.Start(()=>i, NewThreadScheduler.Default))
//.ObserveOn(NewThreadScheduler.Default)
.Subscribe(n =>
{
Thread.Sleep(1000);
Console.WriteLine(
"Value: {0} Thread: {1} IsPool: {2}",
n, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
});
output:
Main Thread: 11
Value: 1 Thread: 15 IsPool: False
Value: 4 Thread: 21 IsPool: False
Value: 2 Thread: 14 IsPool: False
Value: 3 Thread: 13 IsPool: False
Value: 5 Thread: 21 IsPool: False
Value: 6 Thread: 21 IsPool: False
Value: 7 Thread: 21 IsPool: False
Value: 8 Thread: 21 IsPool: False
Value: 9 Thread: 21 IsPool: False
Value: 10 Thread: 21 IsPool: False
Note that we see multiple threads in play here, but also note that we now get out-of-order values (obviously as we are introducing concurrency that we no longer control). So it is a case of picking your poison (or your appropriate lib)
Upvotes: 2
Reputation: 6986
Apparently this is by design! As Microsoft explains:
...Both the TaskPool and CLR ThreadPool schedulers support long-running operations. The former does so through TaskCreationOptions.LongRunning (which really - in today's implementation of the TPL - amounts to creating a new thread); the latter calls into NewThread to achieve the same effect. What matters more than the specific type of scheduler used is the achieved effect. In this particular case, one expects asynchrony caused by introduction of additional concurrency.
It also goes on to say:
In case you really want to have a thread pool thread under any circumstance (e.g. to enforce a global maximum degree of parallelism for your app - though keep in mind things like TaskCreationOptions.LongRunning are wildcards to bypass this mechanism), you'll have to apply the DisableOptimizations extension method to the ThreadPoolScheduler instance to make it fall back to recursive behavior. Notice you can pass in the interfaces you want to disable (specifying none means disabling all optimizations), in this case typeof(ISchedulerLongRunning) suffices.
Upvotes: 1
Reputation: 117010
To start with Rx has a behaviour contract. It will only handle a single value through an observer at one time. If more than one observer is attached to an observable (usually only hot observables) then a single value is run through each observer, one at a time, before the next value is produced.
That explains why your values are produced one after another.
Now secondly, as to why they run on the same thread. That's basically a result of optimizing performance based on the above behaviour contract. It takes time and resources to spin up new threads, so the Rx team try to reuse threads where possible to avoid the overhead.
It should be clear than an observable running on one thread might produce values faster than its observers can process them. If that's the case they queue up. So, for any of the schedulers that spin up new threads to observe on then the logic is this - if the observers finish processing a value then the currently running thread inspects the queue to see if there is another value to process and if there is it processes it - no new thread needed. If there isn't then the thread is ended or returned to the TaskPool
etc, so when a new value becomes available that thread is gone and an alternative is needed. The NewThreadScheduler
spins one up. The TaskPoolScheduler
gets one from the TaskPool
etc.
So, this boils down to a simple optimization to speed up processing when values are queued up to be processed sequentially.
In my tests I couldn't create a single observable sample that uses new threads for each new value when using NewThreadScheduler
, but then I found this in the NewThreadScheduler
source:
public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
EventLoopScheduler eventLoopScheduler = new EventLoopScheduler(this._threadFactory);
eventLoopScheduler.ExitIfEmpty = true;
return eventLoopScheduler.Schedule<TState>(state, dueTime, action);
}
So, under the hood it is creating a new EventLoopScheduler
(single non-changing thread scheduler) and handing over the scheduling to this. No wonder the thread doesn't change.
Upvotes: 1