therealsowho
therealsowho

Reputation: 139

Masstransit: TransitionTo doesn't update the Saga state in SQLServer (EntityFrameworkCoreSaga)

I recently started investigating how to add MassTransit to my project, and I've stumbled on this issue along the way. I'm not sure that I'm doing all of this right (I'm new to MassTransit) so any comment with the explanation would be awesome!

Basically I have a REST API with the endpoint that (when hit) needs to trigger some chain of events on separate services (that are communicating between each other using RMQ). I've decided to implement Saga with persistent state (EnitityFrameworkCore Saga) and here's what I did:

REST API endpoint:

[HttpPost]
    [SwaggerOperation(Summary = MethodDescriptions.StartExport)]
    [SwaggerResponse(StatusCodes.Status200OK, type: typeof(ResponseModel))]
    [SwaggerResponse(StatusCodes.Status400BadRequest, type: typeof(ResponseModel))]
    [SwaggerResponse(StatusCodes.Status500InternalServerError, type: typeof(ResponseModel))]
    //[ServiceFilter(typeof(ValidationFilterAttribute))]
    [Route("start-export")]
    public async Task<IActionResult> ExportProcessInitializeEvent([FromBody] ExportCommonRequest request)
    {
        try
        {
            if (!request.IsRequestResponsePattern)
            {
                await _publishEndpoint.Publish<ExportProcessInitializeEvent>(new { request.ExportId });
            }

            return StatusCode(StatusCodes.Status200OK, new ResponseModel
            {
                Content = new NoContentResult(),
                StatusCode = StatusCodes.Status200OK,
                Message = string.Empty
            });
        }
        catch(Exception ex)
        {
            _logger.LogError(ex, $"Something went wrong while trying to start export (ExportId: {request.ExportId}).");
            return StatusCode(StatusCodes.Status500InternalServerError, new ResponseModel
            {
                Content = new NoContentResult(),
                StatusCode = StatusCodes.Status200OK,
                Message = ex.Message
            });
        }
    }

it publishes event to RMQ that's later on consumed on separate service. (Notice: I'm not waitig for response from those services because the caller will be notified about the export status as a last step of the event chain)

REST API MassTransit setup:

public static void ConfigureMassTransit(this IServiceCollection services, IConfiguration configuration)
    {
        var messageBrokerQueueSettings = configuration.GetSection("MessageBroker:QueueSettings").Get<MessageBrokerQueueSettings>();

        services.AddMassTransit(x =>
        {
            x.UsingRabbitMq((context, cfg) =>
            {
                cfg.Host(messageBrokerQueueSettings.HostName, messageBrokerQueueSettings.VirtualHost, h => {
                    h.Username(messageBrokerQueueSettings.UserName);
                    h.Password(messageBrokerQueueSettings.Password);
                });

                cfg.ConfigureEndpoints(context);
            });

            x.AddConsumer<ExportProcessInitializeEventConsumer>();
            x.AddRequestClient<ExportProcessInitializeEventConsumer>();
            x.AddConsumer<ExportProcessInitializeFaultEventConsumer>();
            x.AddRequestClient<ExportProcessInitializeFaultEventConsumer>();

            x.AddConsumer<DownloadMHTMLSEventConsumer>();
            x.AddRequestClient<DownloadMHTMLSEventConsumer> ();
            x.AddConsumer<DownloadMHTMLSFaultEventConsumer>();
            x.AddRequestClient<DownloadMHTMLSFaultEventConsumer>();

            x.AddConsumer<TakeScreenshotsEventConsumer>();
            x.AddRequestClient<TakeScreenshotsEventConsumer>();
            x.AddConsumer<TakeScreenshotsFaultEventConsumer>();
            x.AddRequestClient<TakeScreenshotsFaultEventConsumer>();


            x.AddConsumer<CreatePPTsEventConsumer>();
            x.AddRequestClient<CreatePPTsEventConsumer>();
            x.AddConsumer<CreatePPTsFaultEventConsumer>();
            x.AddRequestClient<CreatePPTsFaultEventConsumer>();

            x.AddConsumer<CreateResultZIPFileEventConsumer>();
            x.AddRequestClient<CreateResultZIPFileEventConsumer>();
            x.AddConsumer<CreateResultZIPFileFaultEventConsumer>();
            x.AddRequestClient<CreateResultZIPFileFaultEventConsumer>();

            x.AddConsumer<NotifyWebsiteInstanceEventConsumer>();
            x.AddRequestClient<NotifyWebsiteInstanceEventConsumer>();
            x.AddConsumer<NotifyWebsiteInstanceFaultEventConsumer>();
            x.AddRequestClient<NotifyWebsiteInstanceFaultEventConsumer>();

            x.AddConsumer<ExportProcessFinalizedEventConsumer>();
            x.AddRequestClient<ExportProcessFinalizedEventConsumer>();
        });
    }

SAGA MassTransit setup:

public class ExportState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
    public DateTime ExportStartDate { get; set; }
    public DateTime? ExportEndDate { get; set; }
    public byte[] RowVersion { get; set; }
}

DbContext:

public class ExportStateMap : SagaClassMap<ExportState>
{
    protected override void Configure(EntityTypeBuilder<ExportState> entity, ModelBuilder model)
    {
        entity.Property(x => x.CurrentState).HasMaxLength(64);
        entity.Property(x => x.ExportStartDate);
        entity.Property(x => x.ExportEndDate);
        entity.Property(x => x.RowVersion).IsRowVersion();
    }
}

public class ExportStateDbContext : SagaDbContext
{
    public ExportStateDbContext(DbContextOptions options)
    : base(options)
    {
    }

    protected override IEnumerable<ISagaClassMap> Configurations
    {
        get { yield return new ExportStateMap(); }
    }
}

Program.cs

Host.CreateDefaultBuilder(args)
.ConfigureAppConfiguration(builder =>
{
    var configurationBuilder = new ConfigurationBuilder();
    var configuration = configurationBuilder.AddEnvironmentVariables().AddJsonFile("appsettings.json")
        .AddJsonFile($"appsettings.{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT")}.json")
        .Build();

    builder.Sources.Clear();
    builder.AddConfiguration(configuration);
})
.ConfigureServices((hostContext, services) =>
{
    var messageBrokerQueueSettings = hostContext.Configuration.GetSection("MessageBroker:QueueSettings").Get<MessageBrokerQueueSettings>();
    var messageBrokerPersistenceSettings = hostContext.Configuration.GetSection("MessageBroker:StateMachinePersistence").Get<MessageBrokerPersistenceSettings>();

    services.AddMassTransit(x =>
    {
        x.AddSagaStateMachine<ExportStateMachine, ExportState>().EntityFrameworkRepository(r =>
        {
            r.ConcurrencyMode = ConcurrencyMode.Optimistic;

            r.AddDbContext<DbContext, ExportStateDbContext>((provider, builder) =>
            {
                builder.UseSqlServer(messageBrokerPersistenceSettings.ConnectionString, m =>
                {
                    m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
                    m.MigrationsHistoryTable($"__{nameof(ExportStateDbContext)}");
                });
            });
        });

        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host(messageBrokerQueueSettings.HostName, messageBrokerQueueSettings.VirtualHost, h =>
            {
                h.Username(messageBrokerQueueSettings.UserName);
                h.Password(messageBrokerQueueSettings.Password);
            });

            cfg.ConfigureEndpoints(context);
        });
    });
})
.Build()
.Run();

ExportStateMachine:

public class ExportStateMachine : MassTransitStateMachine<ExportState>
{
    public ExportStateMachine()
    {
        #region Events Definitions

        Event(() => ExportProcessInitializeEvent);
        Event(() => ExportProcessInitializeFaultEvent,
            x => x.CorrelateById(context => context.InitiatorId ?? context.Message.Message.ExportId));

        Event(() => DownloadMHTMLsEvent);
        Event(() => DownloadMHTMLsFaultEvent,
            x => x.CorrelateById(context => context.InitiatorId ?? context.Message.Message.ExportId));

        Event(() => TakeScreenshotsEvent);
        Event(() => TakeScreenshotsFaultEvent,
            x => x.CorrelateById(context => context.InitiatorId ?? context.Message.Message.ExportId));

        Event(() => CreatePPTsEvent);
        Event(() => CreatePPTsFaultEvent,
            x => x.CorrelateById(context => context.InitiatorId ?? context.Message.Message.ExportId));

        Event(() => CreateResultZIPFileEvent);
        Event(() => CreateResultZIPFileFaultEvent,
            x => x.CorrelateById(context => context.InitiatorId ?? context.Message.Message.ExportId));

        Event(() => NotifyWebsiteInstanceEvent);
        Event(() => NotifyWebsiteInstanceFaultEvent,
            x => x.CorrelateById(context => context.InitiatorId ?? context.Message.Message.ExportId));

        Event(() => ExportProcessFinalizedEvent);

        #endregion

        InstanceState(x => x.CurrentState);

        #region Flow

        During(Initial,
            When(ExportProcessInitializeEvent)
                .Then(x => x.Saga.ExportStartDate = DateTime.UtcNow)
                .TransitionTo(ExportProcessInitializedState));

        During(ExportProcessInitializedState,
            When(DownloadMHTMLsEvent)
                .TransitionTo(DownloadMHTMLsState));

        During(DownloadMHTMLsState,
            When(TakeScreenshotsEvent)
                .TransitionTo(TakeScreenshotsState));

        During(TakeScreenshotsState,
            When(CreatePPTsEvent)
                .TransitionTo(CreatePPTsState));

        During(CreatePPTsState,
            When(CreateResultZIPFileEvent)
                .TransitionTo(CreateResultZIPFileState));

        During(CreateResultZIPFileState,
            When(NotifyWebsiteInstanceEvent)
                .TransitionTo(NotifyWebsiteInstanceState));

        #endregion

        #region Fault-Companse State

        // if the export fails on any of the steps that are in the chain, it automatically should default to last event and that's notifying MW instance that export failed.

        DuringAny(When(ExportProcessInitializeFaultEvent)
            .TransitionTo(ExportProcessInitializedFaultedState)
            .Then(context => context.Publish<NotifyWebsiteInstanceEvent>(new { context.Message })));

        DuringAny(When(DownloadMHTMLsFaultEvent)
            .TransitionTo(DownloadMHTMLsFaultedState)
            .Then(context => context.Publish<NotifyWebsiteInstanceEvent>(new { context.Message })));

        DuringAny(When(TakeScreenshotsFaultEvent)
            .TransitionTo(TakeScreenshotsFaultedState)
            .Then(context => context.Publish<NotifyWebsiteInstanceEvent>(new { context.Message })));

        DuringAny(When(CreatePPTsFaultEvent)
            .TransitionTo(CreatePPTsFaultedState)
            .Then(context => context.Publish<NotifyWebsiteInstanceEvent>(new { context.Message })));

        DuringAny(When(CreateResultZIPFileFaultEvent)
            .TransitionTo(CreateResultZIPFileFaultedState)
            .Then(context => context.Publish<NotifyWebsiteInstanceEvent>(new { context.Message })));

        DuringAny(When(ExportProcessFinalizedEvent)
                .Then(x => x.Saga.ExportEndDate = DateTime.UtcNow)
                .TransitionTo(ExportProcessFinalizedState));
        #endregion
    }

    #region Events

    public Event<ExportProcessInitializeEvent> ExportProcessInitializeEvent { get; }
    public Event<Fault<ExportProcessInitializeEvent>> ExportProcessInitializeFaultEvent { get; }

    public Event<DownloadMHTMLsEvent> DownloadMHTMLsEvent { get; }
    public Event<Fault<DownloadMHTMLsEvent>> DownloadMHTMLsFaultEvent { get; }

    public Event<TakeScreenshotsEvent> TakeScreenshotsEvent { get; }
    public Event<Fault<TakeScreenshotsEvent>> TakeScreenshotsFaultEvent { get; }

    public Event<CreatePPTsEvent> CreatePPTsEvent { get; }
    public Event<Fault<CreatePPTsEvent>> CreatePPTsFaultEvent { get; }

    public Event<CreateResultZIPFileEvent> CreateResultZIPFileEvent { get; }
    public Event<Fault<CreateResultZIPFileEvent>> CreateResultZIPFileFaultEvent { get; }

    public Event<NotifyWebsiteInstanceEvent> NotifyWebsiteInstanceEvent { get; }
    public Event<Fault<NotifyWebsiteInstanceEvent>> NotifyWebsiteInstanceFaultEvent { get; }

    public Event<ExportProcessFinalizedEvent> ExportProcessFinalizedEvent { get; }
    #endregion

    #region States

    public State ExportProcessInitializedState { get; private set; }
    public State ExportProcessInitializedFaultedState { get; private set; }

    public State DownloadMHTMLsState { get; private set; }
    public State DownloadMHTMLsFaultedState { get; private set; }

    public State TakeScreenshotsState { get; private set; }
    public State TakeScreenshotsFaultedState { get; private set; }

    public State CreatePPTsState { get; private set; }
    public State CreatePPTsFaultedState { get; private set; }

    public State CreateResultZIPFileState { get; private set; }
    public State CreateResultZIPFileFaultedState { get; private set; }

    public State NotifyWebsiteInstanceState { get; private set; }
    public State NotifyWebsiteInstanceFaultedState { get; private set; }

    public State ExportProcessFinalizedState { get; private set; }

    #endregion
}

Some consumers:

public class ExportProcessInitializeEventConsumer : ConsumerBase<ExportProcessInitializeEvent>
{
    protected override async Task ConsumeInternal(ConsumeContext<ExportProcessInitializeEvent> context)
    {
        Console.WriteLine($"Export initialized {context.Message.ExportId}");

        await context.Publish<DownloadMHTMLsEvent>(new { context.Message.ExportId });
    }
}

public class DownloadMHTMLSEventConsumer : ConsumerBase<DownloadMHTMLsEvent>
{
    protected override async Task ConsumeInternal(ConsumeContext<DownloadMHTMLsEvent> context)
    {
        Console.WriteLine($"All Export MHTML files downloaded successfully {context.Message.ExportId}");

        await context.Publish<TakeScreenshotsEvent>(new { context.Message.ExportId });
    }
}

.
.
.

public class NotifyWebsiteInstanceEventConsumer : ConsumerBase<NotifyWebsiteInstanceEvent>
    {
        protected override async Task ConsumeInternal(ConsumeContext<NotifyWebsiteInstanceEvent> context)
        {
            Console.WriteLine($"Website instance is notified about export status {context.Message.ExportId}");

            await context.Publish<ExportProcessFinalizedEvent>(new { context.Message.ExportId });
        }
    }

// and the last one in the chain
public class ExportProcessFinalizedEventConsumer : ConsumerBase<ExportProcessFinalizedEvent>
    {
        protected override Task ConsumeInternal(ConsumeContext<ExportProcessFinalizedEvent> context)
        {
            Console.WriteLine($"Export process finalized {context.Message.ExportId}");

            return Task.CompletedTask;
        }
    }

As you can see it's a simple chain of Console.WriteLines and when send a Post request to the REST API, the chain is executed correctly (in the right order):

enter image description here

but the record in the database that should represent the current state of the Saga doesn't change:

enter image description here

It seems like TransitionTos (possible all Durings except the initial one) aren't being executed at all? Am I overriding those somehow?

It would be great if someone could explain what I'm doing wrong here?

Upvotes: 0

Views: 662

Answers (1)

Chris Patterson
Chris Patterson

Reputation: 33268

  1. You need to remove every single one of the AddRequestClient calls from your configuration. They aren't doing anything for you since you're adding them specifying a consumer type instead of an actual message type. And since you aren't actually configuring the request client at all, leave it out. MassTransit already has a built-in generic request client.

  2. TransitionTo doesn't save anything to the database. The saga is only persisted once the event is finished being processed (meaning all state machine activities that make up the event behavior have completed). Since you didn't post your state machine, I really have no idea what it is doing. The state should change once the event has been consumed by the saga.

Upvotes: 1

Related Questions