Reputation: 637
I am trying to test / execute a State Machine Saga from MassTransit but I am receiving this error
Message:
MassTransit.ConfigurationException : Failed to create the state machine connector for Invoices.Features.EntryInvoices.ProcessEntryInvoiceSagaState
----> MassTransit.ConfigurationException : The state machine was not properly configured:
[Failure] ProcessInvoiceReceived was not specified
Have tried to search on the documentation but didn't find anything related. Don't know where I am missing, maybe saga configuration or something els.
here is my Saga:
using ...;
namespace Invoices.Features.EntryInvoices
{
public class ProcessEntryInvoiceSaga:MassTransitStateMachine<ProcessEntryInvoiceSagaState>
{
public State Started { get; private set; }
public State Processing { get; private set; }
public State Closed { get; private set; }
public Event<ProcessInvoice> ProcessInvoiceReceived { get; private set; }
public Event<ProductsAdded> ProductsAddedEvent { get; private set; }
public Event<DuplicatesRegistered> DuplicatesRegisteredEvent { get; private set; }
public ProcessEntryInvoiceSaga()
{
InstanceState(x => x.CurrentState);
Event(() => ProcessInvoiceReceived);
Event(() => ProductsAddedEvent);
Event(() => DuplicatesRegisteredEvent);
Initially(
When(ProcessInvoiceReceived)
.Then(x => x.Saga.InvoiceId = x.Message.InvoiceId)
.Activity(x=> x.OfType<ProcessEntryInvoiceStartActivity>())
.TransitionTo(Started)
);
During(Started,
When(ProductsAddedEvent)
.TransitionTo(Processing),
When(DuplicatesRegisteredEvent)
.TransitionTo(Processing));
During(Processing,
When(ProductsAddedEvent)
.TransitionTo(Closed),
When(DuplicatesRegisteredEvent)
.TransitionTo(Closed));
WhenEnter(Closed, binder => binder
.Activity(x=>x.OfType<ProcessEntryInvoiceFinishActivity>())
);
SetCompleted(async instance =>
{
State currentState = await this.GetState(instance);
return Closed.Equals(currentState);
});
}
}
public class ProcessEntryInvoiceSagaState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public long InvoiceId { get; set; }
}
}
And the test that I am trying to execute:
using ...;
namespace Invoices.Test.Features
{
public class ProcessEntryInvoiceSagaTest
{
Mock<IEntryInvoiceRepository> _entryInvoiceRepository;
ITestHarness _harness;
[SetUp]
public async Task Setup()
{
_entryInvoiceRepository = new Mock<IEntryInvoiceRepository>();
var provider = new ServiceCollection()
.AddSingleton(_entryInvoiceRepository.Object)
.AddSingleton<ISagaRepository<ProcessEntryInvoiceSagaState>, InMemorySagaRepository<ProcessEntryInvoiceSagaState>>()
.AddMassTransitTestHarness(cfg =>
{
cfg.AddSagaStateMachine<ProcessEntryInvoiceSaga, ProcessEntryInvoiceSagaState>().InMemoryRepository();
cfg.AddConsumer<MockSagaStepConsumers>();
})
.BuildServiceProvider(true);
_harness = provider.GetRequiredService<ITestHarness>();
await _harness.Start();
}
[Test]
public async Task TestSetNewProductItemWithSuccess()
{
var openedInvoice = new EntryInvoice() {...};
var form = new ProcessInvoice()
{
InvoiceId = 123
};
await _harness.Bus.Publish(form);
Assert.That(await _harness.Consumed.Any<ProcessInvoice>());
Assert.That(await _harness.Published.Any<AddProductsToInventory>());
Assert.That(await _harness.Published.Any<RegisterDuplicatesOfInvoice>());
Assert.That(await _harness.Consumed.Any<DuplicatesRegistered>());
Assert.That(await _harness.Consumed.Any<ProductsAdded>());
Assert.That(await _harness.Published.Any<EntryInvoiceClosed>());
_entryInvoiceRepository.Verify(i => i.Update(It.IsAny<EntryInvoice>()), Times.AtLeast(2));
_entryInvoiceRepository.Verify(i => i.SaveChanges(), Times.AtLeast(2));
openedInvoice.Status.Should().Be(InvoiceStatus.Closed);
}
}
public class MockSagaStepConsumers : IConsumer<AddProductsToInventory>, IConsumer<RegisterDuplicatesOfInvoice>
{
public async Task Consume(ConsumeContext<AddProductsToInventory> context)
{
await context.Publish(new ProductsAdded()
{
CorrelationId = context.MessageId ?? Guid.Empty
});
}
public async Task Consume(ConsumeContext<RegisterDuplicatesOfInvoice> context)
{
await context.Publish(new DuplicatesRegistered()
{
CorrelationId = context.MessageId ?? Guid.Empty,
InvoiceId = context.Message.InvoiceId
});
}
}
}
Upvotes: 1
Views: 830
Reputation: 637
Found the issue on
Event(() => ProcessInvoiceReceived);
Event(() => ProductsAddedEvent);
Event(() => DuplicatesRegisteredEvent);
The documentation says that the events are autoconfigured but only if they have CorrelationId
or CorrelatedBy<Guid>
interface.
I just added the interface CorrelatedBy<Guid>
to my messages and set a CorrelationID
to the first message and this allowed me to continue the process.
Upvotes: 3