pietro
pietro

Reputation: 185

.NET Mass Transit - State machine - The NotificationFinished event is not handled during the Final state

I'm using Azure Service Bus and the MassTransit library. I ran into a pretty strange error when using state machine. The NewDataRequested, OldDateRequested message are triggered at the same time and can time out differently. Then NotificationRequested -> Finished is called. Once all these messages run out, I switch to Finished using composite event. This will cause my message to be published.

public class DataState : SagaStateMachineInstance
    {
        public Guid CorrelationId { get; set; }

        public int CurrentState { get; set; }

        public int ReadyEventStatus { get; set; }

        public Guid RequestId { get; set; }
        public bool IsError { get; set; }
    }

    public class DataStateMachine : MassTransitStateMachine<DataState>
    {
        public Event<NewDataRequested>? NewDataRequested { get; }
        public Event<Fault<NewDataRequested>>? NewDataRequestedFault { get; }

        public Event<NewDataFinished>? NewDataFinished { get; }
        public Event<Fault<NewDataFinished>>? NewDataFinishedFault { get; }

        public Event<OldDataRequested>? OldDataRequested { get; }
        public Event<Fault<OldDataRequested>>? OldDataRequestedFault { get; }

        public Event<OldDataFinished>? OldDataFinished { get; }
        public Event<Fault<OldDataFinished>>? OldDataFinishedFault { get; }

        public Event<NotificationRequested>? NotificationRequested { get; }
        public Event<Fault<NotificationRequested>>? NotificationRequestedFault { get; }

        public Event<NotificationFinished>? NotificationFinished { get; }
        public Event<Fault<NotificationFinished>>? NotificationFinishedFault { get; }

        public MassTransit.Event? Finished { get; }

        public RunDataStateMachine(ILogger<RunDataStateMachine> logger)
        {
            InstanceState(x => x.CurrentState);

            Event(() => NewDataRequested, x => x.CorrelateById(context => context.Message.RequestId));
            Event(() => NewDataRequestedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));

            Event(() => NewDataFinished, x => x.CorrelateById(context => context.Message.RequestId));
            Event(() => NewDataFinishedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));

            Event(() => OldDataRequested, x => x.CorrelateById(context => context.Message.RequestId));
            Event(() => OldDataRequestedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));

            Event(() => OldDataFinished, x => x.CorrelateById(context => context.Message.RequestId));
            Event(() => OldDataFinishedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));

            Event(() => NotificationRequested, x => x.CorrelateById(context => context.Message.RequestId));
            Event(() => NotificationRequestedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));

            Event(() => NotificationFinished, x => x.CorrelateById(context => context.Message.RequestId));
            Event(() => NotificationFinishedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));

            Initially(
                 When(NewDataRequested)
                    .Then(x =>
                    {
                        x.Saga.RequestId = x.Message.RequestId;
                    }),
                 When(NewDataRequestedFault)
                   .Then(x =>
                   {
                       if (!x.Saga.IsError)
                       {
                           x.Publish(Fault(x.Saga));
                           x.Saga.IsError = true;
                       }
                   }),

                 When(NewDataFinished),
                 When(NewDataFinishedFault)
                   .Then(x =>
                   {
                       if (!x.Saga.IsError)
                       {
                           x.Publish(Fault(x.Saga));
                           x.Saga.IsError = true;
                       }
                   }),

                 When(OldDataRequested),
                 When(OldDataRequestedFault)
                   .Then(x =>
                   {
                       if (!x.Saga.IsError)
                       {
                           x.Publish(Fault(x.Saga));
                           x.Saga.IsError = true;
                       }
                   }),

                 When(OldDataFinished),
                 When(OldDataFinishedFault)
                   .Then(x =>
                   {
                       if (!x.Saga.IsError)
                       {
                           x.Publish(Fault(x.Saga));
                           x.Saga.IsError = true;
                       }
                   }),

                When(NotificationRequested),
                When(NotificationRequestedFault)
                   .Then(x =>
                   {
                       if (!x.Saga.IsError)
                       {
                           x.Publish(Fault(x.Saga));
                           x.Saga.IsError = true;
                       }
                   }),

                When(NotificationFinished),
                When(NotificationFinishedFault)
                   .Then(x =>
                   {
                       if (!x.Saga.IsError)
                       {
                           x.Publish(Fault(x.Saga));
                           x.Saga.IsError = true;
                       }
                   }),
            );

            CompositeEvent(() => Finished,
                x => x.ReadyEventStatus,
                CompositeEventOptions.IncludeInitial,
                NewDataRequested,
                NewDataFinished,
                OldDataRequested,
                OldDataFinished,
                NotificationRequested,
                NotificationFinished
            );

            During(Initial,
                When(Finished)
                .Then(x =>
                {
                    x.Publish(new DataFinished() { RequestId = x.Saga.RequestId });
                })
                .Finalize()
            );

            SetCompletedWhenFinalized();
        }

        private FaultMessage Fault(DataState runDataState)
        {
            return new FaultMessage() { RequestId = runDataState.RequestId };
        }
    }

The error occurs really differently and not very often. It stays seeded in the error queue state machine with the following error and stack trace.

MT-Fault-ExceptionType - MassTransit.UnhandledEventException
MT-Fault-Message - The NotificationFinished event is not handled during the Final state for the DataStateMachine state machine
MT-Fault-StackTrace - at MassTransit.MassTransitStateMachine`1.DefaultUnhandledEventCallback(UnhandledEventContext`1 context) in /_/src/MassTransit/SagaStateMachine/MassTransitStateMachine.cs:line 222 at MassTransit.MassTransitStateMachine`1.UnhandledEvent(BehaviorContext`1 context, State state) in /_/src/MassTransit/SagaStateMachine/MassTransitStateMachine.cs:line 1266 at MassTransit.MassTransitStateMachine`1.<.ctor>b__19_2(BehaviorContext`1 context, State state) in /_/src/MassTransit/SagaStateMachine/MassTransitStateMachine.cs:line 55 at MassTransit.MassTransitStateMachine`1.StateMachineState.MassTransit.State<TInstance>.Raise[T](BehaviorContext`2 context) in /_/src/MassTransit/SagaStateMachine/SagaStateMachine/StateMachineState.cs:line 165 at MassTransit.MassTransitStateMachine`1.MassTransit.StateMachine<TInstance>.RaiseEvent[T](BehaviorContext`2 context) in /_/src/MassTransit/SagaStateMachine/MassTransitStateMachine.cs:line 135 at MassTransit.Middleware.StateMachineSagaMessageFilter`2.Send(SagaConsumeContext`2 context, IPipe`1 next) in /_/src/MassTransit/Middleware/StateMachineSagaMessageFilter.cs:line 66

I don't fully understand what could be wrong. I tried searching for a monge solution and a similar scenario. I found that the problem could be in the order, but I don't think so because it shouldn't have an effect here.

Upvotes: 0

Views: 144

Answers (0)

Related Questions