Reputation: 3414
I'm using the latest Rx2 from NuGet. In example code below, the subscription is processed on a new thread and doesn't block the generation of new values. That's all good. However, why are the values generated by the observable processed on the new thread sequentially?
static void Main(string[] args)
{
printThreadId("Started");
var observable = Observable.Generate<int, int>(
0,
x => {
printThreadId("Comparing " + x);
return x < 5; },
x => {
printThreadId("Incrementing " + x);
return x + 1; },
x => {
printThreadId("Selecting " + x);
return x; },
NewThreadScheduler.Default
);
var disp = observable
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(state =>
{
printThreadId("Processing " + state);
Thread.Sleep(1000);
});
Console.ReadLine();
disp.Dispose();
}
static Action<string> printThreadId = (prefix) => Console.WriteLine("{0} on [Worker.{1}]", prefix, Thread.CurrentThread.ManagedThreadId);
The above code takes around 5 seconds to complete. I expected each action to be executed on a new thread and this is not happening.
Code below schedules a new task and the whole process completes in a little over 1 second.
var disp = observable
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(state =>
{
printThreadId("Processing " + state);
NewThreadScheduler.Default.Schedule(() =>
{
printThreadId("Long running task " + state);
Thread.Sleep(1000);
});
});
Is there a better way to schedule long running tasks to run concurrently using ObserveOn
and Subscribe
?
Upvotes: 1
Views: 2076
Reputation: 117027
The reactive framework specifically ensures that subsequent value are serialized with no overlap. This helps stop many concurrency issues. This behaviour is by design.
The NewThreadScheduler
queues up actions, so if there are actions scheduled back-to-back the scheduler doesn't bother with creating a new thread - it just uses the currect thread. When there is a gap (something might be queued but it is for the future) then the current thread is allowed to end and a new one created when the action is due.
If you want the subscriptions to run in parallel you have to go out of your way to make it happen. But the good news is that it's not too hard.
If you keep your existing observable
then you can try this:
Func<int, IObservable<Unit>> process = state =>
Observable.Start(() =>
{
printThreadId("Processing " + state);
Thread.Sleep(1000);
return Unit.Default;
});
var query =
from x in observable
select process(x);
var disp =
query
.Merge()
.ObserveOn(NewThreadScheduler.Default)
.Subscribe();
I got this result when I ran it:
Started on [Worker.23]
Comparing 0 on [Worker.26]
Selecting 0 on [Worker.26]
Processing 0 on [Worker.20]
Incrementing 0 on [Worker.26]
Comparing 1 on [Worker.26]
Selecting 1 on [Worker.26]
Incrementing 1 on [Worker.26]
Comparing 2 on [Worker.26]
Selecting 2 on [Worker.26]
Processing 1 on [Worker.18]
Incrementing 2 on [Worker.26]
Comparing 3 on [Worker.26]
Selecting 3 on [Worker.26]
Processing 3 on [Worker.13]
Incrementing 3 on [Worker.26]
Comparing 4 on [Worker.26]
Selecting 4 on [Worker.26]
Processing 4 on [Worker.12]
Incrementing 4 on [Worker.26]
Comparing 5 on [Worker.26]
Processing 2 on [Worker.8]
Upvotes: 3