Reputation: 8145
I'm having trouble testing reactive code that's consuming a Task based service. In my class under test I consume the task and use ToObservable to do reactive-y things with it.
public void Method()
{
_svc.MyTaskServiceMethod().ToObservable().Select(....) //pipe it elsewhere and do interesting things.
}
Now in a unit test I'm testing some timing (using Moq for the service)
svcMock.Setup(x => x.MyTaskServiceMethod()).Returns(() =>
Observable.Return("VALUE", testScheduler)
.Delay(TimeSpan.FromMilliseconds(100), testScheduler)
.ToTask()
);
The problem is that despite using the test scheduler in the Return/Delay calls, the task itself is still completing on a separate thread. I'm seeing this by adding a couple of Console writes of the current managed thread id to the code.
svcMock.Setup(x => x.MyServiceMethod()).Returns(() =>
{
var task = Observable.Return("VALUE", testScheduler)
.Delay(TimeSpan.FromMilliseconds(1000), testScheduler)
.Do(x => { Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + " Obs"); })
.ToTask();
task.ContinueWith((_) =>
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + " Task");
});
return task;
});
The Do(..) executes on the primary testing thread, and happens exactly when I expect after a testSchduler.AdvanceBy(..) call.
The task continuation is still happening in a separate thread and basically doesn't execute until after the body of the unit test has finished. So in the body of my target, nothing ever really gets pushed through my task.ToObservable() observable.
Upvotes: 2
Views: 380
Reputation: 1652
Some time ago I co-authored a unit test library based on NUnit to help with precisely with Rx and TPL testing. For that we built a Test TPL scheduler to force all TPL tasks to run without concurrency. You can see the relevant code here: https://github.com/Testeroids/Testeroids/blob/master/solution/src/app/Testeroids/TplTestPlatformHelper.cs#L87
Upvotes: 1
Reputation: 29786
Task continuations will use a task pool thread by default, so your continuation escapes the control of the test scheduler. If you specify the option TaskContinuationOptions.ExecuteSynchronously
, it will use the same thread and the result will be posted to the observable as desired:
task.ContinueWith((_) =>
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + " Task");
}, TaskContinuationOptions.ExecuteSynchronously);
You may find this related discussion on the Rx site quite illuminating on the subject of concurrency in TPL -> Rx transitions, and ToObservable()
in particular.
Upvotes: 3