Anton K.
Anton K.

Reputation: 73

Processing values of a collection in parallel using ReactiveExtensions

I am new to Rx. According to documentation, it looks like it is possible to run OnNext handler for each value in parallel, on different threads. I just need to ObserveOn appropriate scheduler implementation.

But this simple code runs synchronously:

var dict = new Dictionary<int, object> {{1, null}, {5, null}, {6, null}};

dict.ToObservable()
  .SubscribeOn(ThreadPoolScheduler.Default)
  .Subscribe(kv => {
     Console.WriteLine("Thread {0}, key {1}", Thread.CurrentThread.ManagedThreadId, kv.Key);
     Thread.Sleep(1000);
  });

On the screen I see the same thread Id for each iteration. What am I missing here?

Upvotes: 1

Views: 94

Answers (1)

James World
James World

Reputation: 29786

Rx defines a grammar that specifically prohibits OnNext from overlapping from the standpoint of any given Observer. It is possible that multiple subscribers to a stream can be called in parallel (but that depends on the implementer of the Rx operator).

Here we have two subscribers that handle OnNext at different rates from the same stream:

void Main()
{
    var stream = Observable.Interval(TimeSpan.FromSeconds(1));    

    var sub1 = stream.Subscribe(x => {
        Console.WriteLine("Sub1 handler start: " + Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(4000);
        Console.WriteLine("Sub1 handler end");
    });

    var sub2 = stream.Subscribe(x => {
        Console.WriteLine("Sub2 handler start: " + Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(2000);
        Console.WriteLine("Sub2 handler end");
    });

    Console.ReadLine();
}

Here is the output, see how Sub2 is racing ahead of Sub1, and each is on their own thread.

Sub2 handler start: 18
Sub1 handler start: 12
Sub2 handler end
Sub2 handler start: 18
Sub2 handler end
Sub2 handler start: 18
Sub1 handler end
Sub1 handler start: 12
Sub2 handler end
Sub2 handler start: 18
Sub2 handler end
Sub1 handler end
Sub2 handler start: 18
Sub1 handler start: 12

Note that there's nothing to say that each subscription get's it's own thread - that's down to how the scheduler and operator are implemented. As long as they conform to the Rx grammar of OnNext* (OnError | OnCompleted) anything goes.

For your particular scenario, I would look into PLINQ / TPL - it feels like a better fit than Rx.

By the way, Lee Campbell's www.introtorx.com is a good resource if you are just starting out.

Upvotes: 3

Related Questions