Chris Bednarski
Chris Bednarski

Reputation: 3414

How to use IObservable<T>.Subscribe(Action<T>) so that each observed value is actioned on a new thread?

I'm using the latest Rx2 from NuGet. In example code below, the subscription is processed on a new thread and doesn't block the generation of new values. That's all good. However, why are the values generated by the observable processed on the new thread sequentially?

static void Main(string[] args)
{
  printThreadId("Started");
  var observable = Observable.Generate<int, int>(
      0,
      x => {
        printThreadId("Comparing " + x);
        return x < 5; },
      x => {
        printThreadId("Incrementing " + x);
        return x + 1; },
      x => {
        printThreadId("Selecting " + x);
        return x; },
      NewThreadScheduler.Default
  );

  var disp = observable
    .ObserveOn(NewThreadScheduler.Default)
    .Subscribe(state =>
    {
      printThreadId("Processing " + state);
      Thread.Sleep(1000);
    });

  Console.ReadLine();
  disp.Dispose();
}

static Action<string> printThreadId = (prefix) => Console.WriteLine("{0} on [Worker.{1}]", prefix, Thread.CurrentThread.ManagedThreadId);

The above code takes around 5 seconds to complete. I expected each action to be executed on a new thread and this is not happening.

Code below schedules a new task and the whole process completes in a little over 1 second.

var disp = observable
 .ObserveOn(NewThreadScheduler.Default)
 .Subscribe(state =>
 {
   printThreadId("Processing " + state);
   NewThreadScheduler.Default.Schedule(() =>
   {
      printThreadId("Long running task " + state);
      Thread.Sleep(1000);
   });
 });

Is there a better way to schedule long running tasks to run concurrently using ObserveOn and Subscribe?

Upvotes: 1

Views: 2076

Answers (1)

Enigmativity
Enigmativity

Reputation: 117027

The reactive framework specifically ensures that subsequent value are serialized with no overlap. This helps stop many concurrency issues. This behaviour is by design.

The NewThreadScheduler queues up actions, so if there are actions scheduled back-to-back the scheduler doesn't bother with creating a new thread - it just uses the currect thread. When there is a gap (something might be queued but it is for the future) then the current thread is allowed to end and a new one created when the action is due.

If you want the subscriptions to run in parallel you have to go out of your way to make it happen. But the good news is that it's not too hard.

If you keep your existing observable then you can try this:

Func<int, IObservable<Unit>> process = state =>
    Observable.Start(() =>
    {
        printThreadId("Processing " + state);
        Thread.Sleep(1000);
        return Unit.Default;
    });

var query =
    from x in observable
    select process(x);

var disp =
    query
        .Merge()
        .ObserveOn(NewThreadScheduler.Default)
        .Subscribe();

I got this result when I ran it:

Started on [Worker.23]
Comparing 0 on [Worker.26]
Selecting 0 on [Worker.26]
Processing 0 on [Worker.20]
Incrementing 0 on [Worker.26]
Comparing 1 on [Worker.26]
Selecting 1 on [Worker.26]
Incrementing 1 on [Worker.26]
Comparing 2 on [Worker.26]
Selecting 2 on [Worker.26]
Processing 1 on [Worker.18]
Incrementing 2 on [Worker.26]
Comparing 3 on [Worker.26]
Selecting 3 on [Worker.26]
Processing 3 on [Worker.13]
Incrementing 3 on [Worker.26]
Comparing 4 on [Worker.26]
Selecting 4 on [Worker.26]
Processing 4 on [Worker.12]
Incrementing 4 on [Worker.26]
Comparing 5 on [Worker.26]
Processing 2 on [Worker.8]

Upvotes: 3

Related Questions