Reputation: 9076
I have a program that I'm currently refactoring to use the Membus message bus for event aggregation, and this message bus allows me to "observe" events on the bus by returning instances of IObservable
that I can subscribe to.
In my unit tests, I want to ensure that my components only publish particular messages on the bus when it's appropriate. The way I've tried to do this is by including the following sort of setup logic in my test specification classes:
private readonly IBus messageBus;
private readonly IObservable<Model> myObservable;
public ComponentModelGatewaySpec()
{
messageBus = TestHelper.DefaultMessageBus;
myObservable = messageBus.Observe<ModelPublishedEventMessage>().Select(it => it.Model);
}
Then in a test case, I'd like to do something like the following:
public async Task Only_Publish_Incomplete_Models_After_Receiving_Request()
{
var defaultTimeout = TimeSpan.FromMilliseconds(1000);
// GIVEN a component model gateway and an incomplete model update.
var modelUpdate = new Model { IntProperty = 1, BoolProperty = null };
Assert.False(modelUpdate.IsComplete);
var gateway = MockComponentModelGateway;
gateway.SetMessageBus(messageBus);
// EXPECT no current model is published after publishing the incomplete model update.
Task<bool> noModelPublished = myObservable.WithinWindow(defaultTimeout).NoEmissionsOccurred().ToTask();
messageBus.Publish(new ModelUpdateEventMessage(modelUpdate));
Assert.True(await noModelPublished);
// WHEN we publish a current model query.
Task<Model> publishedModel = myObservable.WithinWindow(defaultTimeout).FirstAsync().ToTask();
messageBus.Publish(new ModelQueryRequestedEventMessage());
// THEN the model should be published.
Assert.Equal(modelUpdate, await publishedModel);
}
What I'm essentially after is a way of testing either:
I'd like to be able to handle all this logic asynchronously, or else I'll have a bunch of test cases that end up blocking for 1 or more seconds.
It may be possible to use Timeout
for this, but Timeout
causes an exception to be thrown on timeouts which seems like a cludgy way of handling things when I'm expecting them to be thrown. I do use Timeout
in observable compositions, but only in cases where a timeout occurring means the test should fail.
Currently, I'm trying to use various combinations of Window
, Buffer
, FirstAsync
, etc. to accomplish this but I'm not getting the behaviour I expect in all of my test cases.
Edit
I've added my own solution but I'm treating it as a temporary measure until I can incorporate Lee Campbell's advice (see his answer below).
Upvotes: 1
Views: 1339
Reputation: 9076
Let me preface by saying that I developed this solution before acting on Lee Campbell's advice, and a solution based on his advice would probably be a lot better since he (literally) wrote the book on the subject. That said, the solution I came up with works well enough for my specific use case.
Using the same example test case that I used in the original post, I now have this instead:
[Theory]
[PairwiseData]
public async Task Adapted_Component_Model_Gateway_Should_Publish_Current_Model_When_Requested(
[CombinatorialValues(null, 1, 2)] int? intValue,
[CombinatorialValues(null, true, false)] bool? boolValue)
{
var model = new AllowStaleDetailsMockModel { IntProperty = intValue, BoolProperty = boolValue };
if (model.IsComplete) return;
// GIVEN an initialized adapted component model gateway and a given _INCOMPLETE_ current model.
var adaptedGateway = AdaptGateway(MockComponentModelGateway);
adaptedGateway.SetMessageBus(messageBus);
adaptedGateway.Initialize();
// EXPECT no current model is published after publishing the incomplete model update.
var messagePublished = allowStaleCurrentModelObservable.BufferEmissions().ContainsEvents();
messageBus.Publish(new CurrentModelUpdateReadyEventArgs<AllowStaleDetailsMockModel>(model));
Assert.False(await messagePublished);
// WHEN we publish a current model query.
var actualModel = allowStaleCurrentModelObservable.WaitForEmission();
messageBus.Publish(new CurrentModelQueryRequestedEventArgs());
// THEN the current model should be published.
Assert.Equal(model, await actualModel);
}
In a "test utility" class, I created the following:
public static class TestHelper
{
public static readonly TimeSpan DefaultTimeout = TimeSpan.FromMilliseconds(1000);
public static IBus DefaultMessageBus
{
get
{
return BusSetup.StartWith<Conservative>().Construct();
}
}
public static async Task<bool> ContainsEvents<T>(this Task<IList<T>> eventBufferTask)
{
return (await eventBufferTask).Any();
}
}
public static class ObservableExtensions
{
public static Task<T> WaitForEmission<T>(this IObservable<T> observable)
{
return observable.WaitForEmission(TestHelper.DefaultTimeout);
}
public static Task<T> WaitForEmission<T>(this IObservable<T> observable, TimeSpan timeout)
{
return observable.FirstAsync().Timeout(timeout).ToTask();
}
public static Task<IList<T>> BufferEmissions<T>(this IObservable<T> observable)
{
return observable.BufferEmissions(TestHelper.DefaultTimeout);
}
public static Task<IList<T>> BufferEmissions<T>(this IObservable<T> observable, TimeSpan bufferWindow)
{
return observable.Buffer(bufferWindow).FirstAsync().ToTask();
}
}
I use Observable.WaitForEmissions
in test cases right before doing something that I expect should publish a particular kind of message. This returns a task that will either:
I use Observable.BufferEmissions
in test cases where I'm either expecting multiple values to be published and I want to collect them all, or I want to check whether values were published in a given length of time or not without catching TimeoutException
errors (Task<IList<T>>.ContainsEvents
works great for this).
All of the test cases in my project perform as expected, and my ~600 odd test cases are discovered and executed in about 30 seconds, which I'm happy enough with.
Upvotes: 0
Reputation: 10783
You want to avoid concurrency (well multi threading) in your unit tests if you can. Concurrent unit tests can be non-deterministic and also run a lot slower i.e. have to run in real time. For example if you are trying to prove a timeout of 10seconds will throw an error, you will have to have your test run for 10 seconds. This is not a scalable practice.
Instead consider using the TestScheduler
.
This will mean that you will need to have seams where you can provide schedulers to your operators.
Hopefully the API that is exposing these Observable sequences is friendly to testing.
public async Task Only_Publish_Incomplete_Models_After_Receiving_Request()
{
var gateway = MockComponentModelGateway;
gateway.SetMessageBus(messageBus);
var defaultTimeout = TimeSpan.FromMilliseconds(1000);
var scheduler = new TestScheduler();
// GIVEN a component model gateway and an incomplete model update.
var modelUpdate = new Model { IntProperty = 1, BoolProperty = null };
Assert.False(modelUpdate.IsComplete);
scheduler.Schedule(TimeSpan.FromMilliseconds(100),() => {
messageBus.Publish(new ModelUpdateEventMessage(modelUpdate));
});
scheduler.Schedule(TimeSpan.FromMilliseconds(200),() =>
{
messageBus.Publish(new ModelQueryRequestedEventMessage());
});
var observer = scheduler.CreateObserver<Model>();
myObservable.Subscribe(observer);
scheduler.Start();
CollectionAssert.AreEqual(
new[]{
ReactiveTest.OnNext(TimeSpan.FromMilliseconds(200).Ticks, modelUpdate)
},
observer.Messages);
}
Here you dont have to test for absence (Assert.True(await noModelPublished);
), because you can see in the output that the value is not pushed until the point in virtual time (200ms) that the messageBus.Publish(new ModelQueryRequestedEventMessage());
was executed.
Now your tests should run synchronously but be able to verify an otherwise async flow.
Upvotes: 2