Reputation: 185
I'm using Azure Service Bus and the MassTransit library. I ran into a pretty strange error when using state machine. The NewDataRequested, OldDateRequested message are triggered at the same time and can time out differently. Then NotificationRequested -> Finished is called. Once all these messages run out, I switch to Finished using composite event. This will cause my message to be published.
public class DataState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public int CurrentState { get; set; }
public int ReadyEventStatus { get; set; }
public Guid RequestId { get; set; }
public bool IsError { get; set; }
}
public class DataStateMachine : MassTransitStateMachine<DataState>
{
public Event<NewDataRequested>? NewDataRequested { get; }
public Event<Fault<NewDataRequested>>? NewDataRequestedFault { get; }
public Event<NewDataFinished>? NewDataFinished { get; }
public Event<Fault<NewDataFinished>>? NewDataFinishedFault { get; }
public Event<OldDataRequested>? OldDataRequested { get; }
public Event<Fault<OldDataRequested>>? OldDataRequestedFault { get; }
public Event<OldDataFinished>? OldDataFinished { get; }
public Event<Fault<OldDataFinished>>? OldDataFinishedFault { get; }
public Event<NotificationRequested>? NotificationRequested { get; }
public Event<Fault<NotificationRequested>>? NotificationRequestedFault { get; }
public Event<NotificationFinished>? NotificationFinished { get; }
public Event<Fault<NotificationFinished>>? NotificationFinishedFault { get; }
public MassTransit.Event? Finished { get; }
public RunDataStateMachine(ILogger<RunDataStateMachine> logger)
{
InstanceState(x => x.CurrentState);
Event(() => NewDataRequested, x => x.CorrelateById(context => context.Message.RequestId));
Event(() => NewDataRequestedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));
Event(() => NewDataFinished, x => x.CorrelateById(context => context.Message.RequestId));
Event(() => NewDataFinishedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));
Event(() => OldDataRequested, x => x.CorrelateById(context => context.Message.RequestId));
Event(() => OldDataRequestedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));
Event(() => OldDataFinished, x => x.CorrelateById(context => context.Message.RequestId));
Event(() => OldDataFinishedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));
Event(() => NotificationRequested, x => x.CorrelateById(context => context.Message.RequestId));
Event(() => NotificationRequestedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));
Event(() => NotificationFinished, x => x.CorrelateById(context => context.Message.RequestId));
Event(() => NotificationFinishedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));
Initially(
When(NewDataRequested)
.Then(x =>
{
x.Saga.RequestId = x.Message.RequestId;
}),
When(NewDataRequestedFault)
.Then(x =>
{
if (!x.Saga.IsError)
{
x.Publish(Fault(x.Saga));
x.Saga.IsError = true;
}
}),
When(NewDataFinished),
When(NewDataFinishedFault)
.Then(x =>
{
if (!x.Saga.IsError)
{
x.Publish(Fault(x.Saga));
x.Saga.IsError = true;
}
}),
When(OldDataRequested),
When(OldDataRequestedFault)
.Then(x =>
{
if (!x.Saga.IsError)
{
x.Publish(Fault(x.Saga));
x.Saga.IsError = true;
}
}),
When(OldDataFinished),
When(OldDataFinishedFault)
.Then(x =>
{
if (!x.Saga.IsError)
{
x.Publish(Fault(x.Saga));
x.Saga.IsError = true;
}
}),
When(NotificationRequested),
When(NotificationRequestedFault)
.Then(x =>
{
if (!x.Saga.IsError)
{
x.Publish(Fault(x.Saga));
x.Saga.IsError = true;
}
}),
When(NotificationFinished),
When(NotificationFinishedFault)
.Then(x =>
{
if (!x.Saga.IsError)
{
x.Publish(Fault(x.Saga));
x.Saga.IsError = true;
}
}),
);
CompositeEvent(() => Finished,
x => x.ReadyEventStatus,
CompositeEventOptions.IncludeInitial,
NewDataRequested,
NewDataFinished,
OldDataRequested,
OldDataFinished,
NotificationRequested,
NotificationFinished
);
During(Initial,
When(Finished)
.Then(x =>
{
x.Publish(new DataFinished() { RequestId = x.Saga.RequestId });
})
.Finalize()
);
SetCompletedWhenFinalized();
}
private FaultMessage Fault(DataState runDataState)
{
return new FaultMessage() { RequestId = runDataState.RequestId };
}
}
The error occurs really differently and not very often. It stays seeded in the error queue state machine with the following error and stack trace.
MT-Fault-ExceptionType - MassTransit.UnhandledEventException
MT-Fault-Message - The NotificationFinished event is not handled during the Final state for the DataStateMachine state machine
MT-Fault-StackTrace - at MassTransit.MassTransitStateMachine`1.DefaultUnhandledEventCallback(UnhandledEventContext`1 context) in /_/src/MassTransit/SagaStateMachine/MassTransitStateMachine.cs:line 222 at MassTransit.MassTransitStateMachine`1.UnhandledEvent(BehaviorContext`1 context, State state) in /_/src/MassTransit/SagaStateMachine/MassTransitStateMachine.cs:line 1266 at MassTransit.MassTransitStateMachine`1.<.ctor>b__19_2(BehaviorContext`1 context, State state) in /_/src/MassTransit/SagaStateMachine/MassTransitStateMachine.cs:line 55 at MassTransit.MassTransitStateMachine`1.StateMachineState.MassTransit.State<TInstance>.Raise[T](BehaviorContext`2 context) in /_/src/MassTransit/SagaStateMachine/SagaStateMachine/StateMachineState.cs:line 165 at MassTransit.MassTransitStateMachine`1.MassTransit.StateMachine<TInstance>.RaiseEvent[T](BehaviorContext`2 context) in /_/src/MassTransit/SagaStateMachine/MassTransitStateMachine.cs:line 135 at MassTransit.Middleware.StateMachineSagaMessageFilter`2.Send(SagaConsumeContext`2 context, IPipe`1 next) in /_/src/MassTransit/Middleware/StateMachineSagaMessageFilter.cs:line 66
I don't fully understand what could be wrong. I tried searching for a monge solution and a similar scenario. I found that the problem could be in the order, but I don't think so because it shouldn't have an effect here.
Upvotes: 0
Views: 144