Reputation: 25408
I'm teaching myself reactive programming by flailing my way through random problems and asking stupid newbie questions with no shame. While figuring out how thread scheduling works, I managed to stump myself. While I'm pretty sure this code doesn't make logical sense, I also can't understand what's going on. Figuring that out would probably help me out. Here's the code:
var testScheduler = new TestScheduler();
var newThreadScheduler = new NewThreadScheduler();
var emitter = new Subject<string>();
testScheduler.Schedule(TimeSpan.FromSeconds(0.1), () => emitter.OnNext("one"));
testScheduler.Schedule(TimeSpan.FromSeconds(0.2), () => emitter.OnCompleted());
var subscription = emitter.SubscribeOn(newThreadScheduler)
.Subscribe(
item => Console.WriteLine(item),
error => Console.WriteLine(error),
() => Console.WriteLine("Complete!")
);
testScheduler.AdvanceBy(TimeSpan.FromSeconds(1).Ticks);
Console.WriteLine("DONE.");
Console.ReadLine();
What I expected was maybe:
one
DONE.
Complete!
With possible interleaving, as I wasn't quite sure what SubscribeOn() would do. What I got instead was:
DONE.
Complete!
What exactly is happening here? Why didn't the item get produced before the completion? ObserveOn() works as I expected in this case, and I understand why: it's running the delegates on some other thread and they can interleave with "DONE." So what exactly is SubscribeOn() doing?
Upvotes: 3
Views: 319
Reputation: 10783
What you have here is simply a race condition.
If we rip back all the code to just
var emitter = new Subject<string>();
emitter.OnNext("one");
emitter.OnCompleted();
var subscription = emitter
.Subscribe(
item => Console.WriteLine(item),
error => Console.WriteLine(error),
() => Console.WriteLine("Complete!")
);
Console.WriteLine("DONE.");
Console.ReadLine();
We get the same result.
By using a Subject<T>
you will not get any caching behavior, with the exception for the the OnCompleted
notification.
The SubscribeOn
operator will schedule any subscription work to be done on the provided IScheduler
instance.
In the case of subscribing to a Subject<T>
there is almost not work to be done.
It is almost as simple as registering a callback to a list of callbacks.
Scheduling work on to a NewThreadScheduler
will create a new thread, and then create an internal event loop to process the scheduled work.
This pretty quick, but does require creating a new thread, an EventloopScheduler and performing the context switch to the new thread.
In your example, you schedule the OnNext
and OnCompleted
notifications on a TestScheduler
.
You then SubscribeOn
with a NewThreadScheduler
.
Following that, you start to process all the scheduled work for the TestScheduler
instance.
The processing of these virtually scheduled items, is just iterating over the scheduled items, execting the delegate and advancing a virtual clock.
This is incredibly fast.
To be more concrete, the code below is analogous to what you have written
var newThreadScheduler = new NewThreadScheduler();
var callbacks = new List<Action<string>>();
newThreadScheduler.Schedule(()=>callbacks.Add(str=>Console.WriteLine(str)));
foreach (var callback in callbacks)
{
callback("one");
}
Console.WriteLine("Done");
Here we simply have a list of callback actions (call them subscribers or observers). We then asynchronously schedule on a new thread the addition of one of these callbacks. And then immediately iterated the callbacks and send the string "one" to each of them. The result is
Done
The NewThreadScheduler
just isn't given enough time to start up a new thread, schedule the action, and then execute that action, before the main thread can iterate over the collection.
So there are a couple of guidelines that I think you are failing to follow:
1) Avoid Subjects ;-)
2) Don't mix threading and unit tests. I assume the presence of the TestScheduler
is because you are testing this. You could however use two instances of TestScheduler
e.g. background and foreground instances.
To be more helpful, I would offer the positive guidance of suggesting that you just remove the second scheduler from your test.
Use the TestScheduler
instance in your SubscribeOn
operator.
Next I would suggest replacing the use of subjects+scheduling with the use of the TestScheduler
's Observable sequence factory methods i.e. CreateColdObservable
.
Finally I dont know if the advancing to the speicifc time of 1s gains anything, over just using the Start
method.
I think that would reduce the noise and use of the magic value 1s.
var testScheduler = new TestScheduler();
var source = testScheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(TimeSpan.FromSeconds(0.1).Ticks, "one"),
ReactiveTest.OnCompleted<string>(TimeSpan.FromSeconds(0.2).Ticks));
var subscription = source.SubscribeOn(testScheduler)
.Subscribe(
item => Console.WriteLine(item),
error => Console.WriteLine(error),
() => Console.WriteLine("Complete!")
);
testScheduler.Start();
Console.WriteLine("DONE.");
Console.ReadLine();
The only issue now is that the SubscribeOn
call is pretty redundant.
FYI: Code for the NewThreadScheduler
-
https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs
Upvotes: 2