Andy
Andy

Reputation: 1193

RX ObserveOn issue

I am new to RX and playing around with some samples, any ideas as to why the Console.writeLine in the Subscribe is not being invoked?

var obs = Observable.Create(i => {

              while(true)
            {
                Thread.Sleep(250);

                i.OnNext(2.0);
            }



            return () => { };
        })        
        .SubscribeOn(Scheduler.TaskPool)   
        .ObserveOn(Scheduler.CurrentThread)
        .Subscribe(i =>
        {
            Console.WriteLine("Inside Subscribe");
        });

If I remove the

.ObserveOn(Scheduler.CurrentThread) 

line all is fine?

Thanks

Upvotes: 2

Views: 2775

Answers (1)

JerKimball
JerKimball

Reputation: 16894

As I mentioned, SubscribeOn and ObserveOn are not the best-named methods; I HIGHLY recommend reading this all the way through:

http://blogs.msdn.com/b/rxteam/archive/2009/11/21/observable-context-observable-subscribeon-and-observable-observeon.aspx

Basically, SubscribeOn tells the system what context to do the actual Subscribe/Unsubscribe wiring on, while ObserveOn tells the system what context to "execute as" when new values arrive in the source.

To start, allow me to tweak your example a bit:

Console.WriteLine("Start Thread ID:{0}", Thread.CurrentThread.ManagedThreadId);
var subscription = Observable.Create<double>(i => 
    {
        Console.WriteLine("Observable thread ID:{0}", Thread.CurrentThread.ManagedThreadId);
        while(true)
        {
            Console.WriteLine("Pushing values from thread {0}", Thread.CurrentThread.ManagedThreadId);
            Thread.Sleep(250);            
            i.OnNext(2.0);
        }
        return () => { };
    })        
    .SubscribeOn(Scheduler.TaskPool)   
    .ObserveOn(Scheduler.CurrentThread)
    .Subscribe(i =>
    {
        Console.WriteLine("Subscribable thread ID:{0}", Thread.CurrentThread.ManagedThreadId);
        Console.WriteLine("Inside Subscribe");
    });    
Console.ReadLine();
subscription.Dispose();

If you run this, you'll see something like:

Start Thread ID:21
Observable thread ID:23
Pushing values from thread 23
Pushing values from thread 23
Pushing values from thread 23
Pushing values from thread 23

Now let's swap the threads we ObserveOn and SubscribeOn:

.SubscribeOn(Scheduler.CurrentThread)   
.ObserveOn(Scheduler.TaskPool)

Now we get:

Start Thread ID:26
Observable thread ID:26
Pushing values from thread 26
Pushing values from thread 26
Subscribable thread ID:27
Inside Subscribe
Pushing values from thread 26
Subscribable thread ID:27
Inside Subscribe
Pushing values from thread 26
Subscribable thread ID:27
Inside Subscribe

Upvotes: 4

Related Questions