MaYaN
MaYaN

Reputation: 6986

Why is RX not running handler on a new thread?

I am using RX 2.2.5

In the sample below, I expect the handler (the delegate to Subscribe()) to run on a new thread however when running the app all of the 10 numbers are consumed on the same thread one after another.

Console.WriteLine("Main Thread: {0}", Thread.CurrentThread.ManagedThreadId);

var source = Observable.Range(1, 10, TaskPoolScheduler.Default)
    .ObserveOn(NewThreadScheduler.Default)
    .Subscribe(n =>
    {
        Thread.Sleep(1000);
        Console.WriteLine(
            "Value: {0} Thread: {1} IsPool: {2}", 
            n, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
    });

Output:

Main Thread: 1
Value: 1  on Thread: 4 IsPool: False
Value: 2  on Thread: 4 IsPool: False
Value: 3  on Thread: 4 IsPool: False
Value: 4  on Thread: 4 IsPool: False
Value: 5  on Thread: 4 IsPool: False
Value: 6  on Thread: 4 IsPool: False
Value: 7  on Thread: 4 IsPool: False
Value: 8  on Thread: 4 IsPool: False
Value: 9  on Thread: 4 IsPool: False
Value: 10 on Thread: 4 IsPool: False

The fact that they run sequentially is a mystery as well since I am using the TaskPoolScheduler to generate the numbers.

Even if I replace the NewThreadScheduler with TaskPoolScheduler or ThreadPoolScheduler I still get one thread and the more interesting part is that in both those cases the Thread.CurrentThread.IsThreadPoolThread is False.

I cannot explain this behaviour as when I look at the ThreadPoolScheduler I see:

public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
  if (action == null)
    throw new ArgumentNullException("action");
  SingleAssignmentDisposable d = new SingleAssignmentDisposable();
  ThreadPool.QueueUserWorkItem((WaitCallback) (_ =>
  {
    if (d.IsDisposed)
      return;
    d.Disposable = action((IScheduler) this, state);
  }), (object) null);
  return (IDisposable) d;
}

I can clearly see ThreadPool.QueueUserWorkItem... so why IsPool == False?

What am I missing here?

Upvotes: 3

Views: 1218

Answers (3)

Lee Campbell
Lee Campbell

Reputation: 10783

As per my comment above, It looks like you are trying to use Rx (a library for querying and composing observable sequences of data) as a Parallel computation library?

I think to see the results you were expecting you could hack your query to be like this

Console.WriteLine("Main Thread: {0}", Thread.CurrentThread.ManagedThreadId);

var source = Observable.Range(1, 10, TaskPoolScheduler.Default)
    .SelectMany(i=>Observable.Start(()=>i, NewThreadScheduler.Default))
    //.ObserveOn(NewThreadScheduler.Default)
    .Subscribe(n =>
    {
        Thread.Sleep(1000);
        Console.WriteLine(
            "Value: {0} Thread: {1} IsPool: {2}",
            n, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
    });

output:

Main Thread: 11
Value: 1 Thread: 15 IsPool: False
Value: 4 Thread: 21 IsPool: False
Value: 2 Thread: 14 IsPool: False
Value: 3 Thread: 13 IsPool: False
Value: 5 Thread: 21 IsPool: False
Value: 6 Thread: 21 IsPool: False
Value: 7 Thread: 21 IsPool: False
Value: 8 Thread: 21 IsPool: False
Value: 9 Thread: 21 IsPool: False
Value: 10 Thread: 21 IsPool: False

Note that we see multiple threads in play here, but also note that we now get out-of-order values (obviously as we are introducing concurrency that we no longer control). So it is a case of picking your poison (or your appropriate lib)

Upvotes: 2

MaYaN
MaYaN

Reputation: 6986

Apparently this is by design! As Microsoft explains:

...Both the TaskPool and CLR ThreadPool schedulers support long-running operations. The former does so through TaskCreationOptions.LongRunning (which really - in today's implementation of the TPL - amounts to creating a new thread); the latter calls into NewThread to achieve the same effect. What matters more than the specific type of scheduler used is the achieved effect. In this particular case, one expects asynchrony caused by introduction of additional concurrency.

It also goes on to say:

In case you really want to have a thread pool thread under any circumstance (e.g. to enforce a global maximum degree of parallelism for your app - though keep in mind things like TaskCreationOptions.LongRunning are wildcards to bypass this mechanism), you'll have to apply the DisableOptimizations extension method to the ThreadPoolScheduler instance to make it fall back to recursive behavior. Notice you can pass in the interfaces you want to disable (specifying none means disabling all optimizations), in this case typeof(ISchedulerLongRunning) suffices.

Upvotes: 1

Enigmativity
Enigmativity

Reputation: 117010

To start with Rx has a behaviour contract. It will only handle a single value through an observer at one time. If more than one observer is attached to an observable (usually only hot observables) then a single value is run through each observer, one at a time, before the next value is produced.

That explains why your values are produced one after another.

Now secondly, as to why they run on the same thread. That's basically a result of optimizing performance based on the above behaviour contract. It takes time and resources to spin up new threads, so the Rx team try to reuse threads where possible to avoid the overhead.

It should be clear than an observable running on one thread might produce values faster than its observers can process them. If that's the case they queue up. So, for any of the schedulers that spin up new threads to observe on then the logic is this - if the observers finish processing a value then the currently running thread inspects the queue to see if there is another value to process and if there is it processes it - no new thread needed. If there isn't then the thread is ended or returned to the TaskPool etc, so when a new value becomes available that thread is gone and an alternative is needed. The NewThreadScheduler spins one up. The TaskPoolScheduler gets one from the TaskPool etc.

So, this boils down to a simple optimization to speed up processing when values are queued up to be processed sequentially.


In my tests I couldn't create a single observable sample that uses new threads for each new value when using NewThreadScheduler, but then I found this in the NewThreadScheduler source:

public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
  if (action == null)
    throw new ArgumentNullException("action");
  EventLoopScheduler eventLoopScheduler = new EventLoopScheduler(this._threadFactory);
  eventLoopScheduler.ExitIfEmpty = true;
  return eventLoopScheduler.Schedule<TState>(state, dueTime, action);
}

So, under the hood it is creating a new EventLoopScheduler (single non-changing thread scheduler) and handing over the scheduling to this. No wonder the thread doesn't change.

Upvotes: 1

Related Questions