Reputation: 986
I'm pretty new to MassTransit and don't understand what am I doing wrong to get the following exception: Messages types must not be System types
.
Here are my definitions:
[BsonIgnoreExtraElements]
public class ArcProcess : SagaStateMachineInstance, ISagaVersion
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public int Version { get; set; }
public Guid ActivationId { get; set; }
}
public static class MessageContracts
{
static bool _initialized;
public static void Initialize()
{
if (_initialized)
return;
GlobalTopology.Send.UseCorrelationId<StartProcessingMessage>(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId<ReconstructionFinishedMessage>(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId<ProcessingFinishedMessage>(x => x.ActivationId);
_initialized = true;
}
}
2 of my consumers are:
public class StartReconstructionConsumer : IConsumer<StartProcessingMessage>
{
readonly ILogger<StartReconstructionConsumer> _Logger;
private readonly int _DelaySeconds = 5;
public StartReconstructionConsumer(ILogger<StartReconstructionConsumer> logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<StartProcessingMessage> context)
{
var activationId = context.Message.ActivationId;
_Logger.LogInformation($"Received Scan: {activationId}");
await Task.Delay(_DelaySeconds * 1000);
_Logger.LogInformation($"Finish Scan: {activationId}");
await context.Publish<ReconstructionFinishedMessage>(new { ActivationId = activationId });
}
}
public class ProcessingFinishedConsumer : IConsumer<ProcessingFinishedMessage>
{
readonly ILogger<ProcessingFinishedConsumer> _Logger;
public ProcessingFinishedConsumer(ILogger<ProcessingFinishedConsumer> logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<ProcessingFinishedMessage> context)
{
_Logger.LogInformation($"Finish {context.Message.ActivationId}");
await Task.CompletedTask;
}
}
And here is the StateMachine definition:
public class ArcStateMachine: MassTransitStateMachine<ArcProcess>
{
static ArcStateMachine()
{
MessageContracts.Initialize();
}
public ArcStateMachine()
{
InstanceState(x => x.CurrentState);
Initially(
When(ProcessingStartedEvent)
.Then(context =>
{
Console.WriteLine(">> ProcessingStartedEvent");
context.Instance.ActivationId = context.Data.ActivationId;
})
.TransitionTo(ProcessingStartedState));
During(ProcessingStartedState,
When(ReconstructionFinishedEvent)
.Then(context =>
{
Console.WriteLine(">> ReconstructionFinishedEvent");
context.Instance.ActivationId = context.Data.ActivationId;
})
.Publish(context =>
{
return context.Init<ProcessingFinishedMessage>(new { ActivationId = context.Data.ActivationId });
})
.TransitionTo(ProcessingFinishedState)
.Finalize());
}
public State ProcessingStartedState { get; }
public State ReconstructionStartedState { get; }
public State ReconstructionFinishedState { get; }
public State ProcessingFinishedState { get; }
public Event<StartProcessingMessage> ProcessingStartedEvent { get; }
public Event<ReconstructionStartedMessage> ReconstructionStartedEvent { get; }
public Event<ReconstructionFinishedMessage> ReconstructionFinishedEvent { get; }
public Event<ProcessingFinishedMessage> ProcessingFinishedEvent { get; }
}
And the setup for MassTransit looks the following:
var rabbitHost = Configuration["RABBIT_MQ_HOST"];
if (rabbitHost.IsNotEmpty())
{
services.AddMassTransit(cnf =>
{
var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];
var machine = new ArcStateMachine();
var repository = MongoDbSagaRepository<ArcProcess>.Create(connectionString,
"mongoRepo", "WorkflowState");
cnf.AddConsumer(typeof(StartReconstructionConsumer));
cnf.AddConsumer(typeof(ProcessingFinishedConsumer));
cnf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(rabbitHost), hst =>
{
hst.Username("guest");
hst.Password("guest");
});
cfg.ConfigureEndpoints(context);
cfg.ReceiveEndpoint(BusConstants.SagaQueue,
e => e.StateMachineSaga(machine, repository));
});
});
services.AddMassTransitHostedService();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "MyApp", Version = "v1" });
});
}
I have several questions about it:
When actually the event is published as a result of publishing a message? I.e. in my example await _BusInstance.Bus.Publish<StartProcessingMessage>(new { ActivationId = id });
is called from a WebApi which is consumed by StartReconstructionConsumer
but when actually the state machine starts to act with Initially(When(ProcessingStartedEvent)...
?
My processing should ensure I'm already in the ProcessingStartedState
state in order to During(ProcessingStartedState, When(ReconstructionFinishedEvent)...
to act correctly. So how do I ensure that my consumer that fires upon receive of StartProcessingMessage
can publish the ReconstructionFinishedMessage
that should initiate that During
? Am I building the messages exchange correctly?
Currently for the await context.Publish<ReconstructionFinishedMessage>(new { ActivationId = activationId });
I get an exception in the logs that states R-FAULT rabbitmq://localhost/saga.service d4070000-7b3b-704d-0f10-08d99942c959 Nanox.GC.Shared.AppCore.Messages.ReconstructionFinishedMessage ReconCaller.Saga.ArcProcess(00:00:04.1132604)
while that guid in the message is actually the MessageId
. And my message in the rabbitmq is routed to saga.service_error
with an exception Messages types must not be System types: System.Threading.Tasks.Task<Nanox.GC.Shared.AppCore.Messages.ProcessingFinishedMessage> (Parameter 'T')
.
It seems like I'm missing here really big..
My intent is to initiate processing that will have several stages processed by a few consumers sequentially. So here I tried to build a simple StateMachine that starts whenever someone called StartProcessing
, then each consumer will do its job and fire the FinishedStepX
which will promote the state machine to a new step and initiate the next consumer up until all the processing is done and the state machine will report ProcessingComplete
.
Thanks for any help n advance
Upvotes: 2
Views: 5012
Reputation: 33278
First, your bus configuration is a bit strange, so I've cleaned that up:
services.AddMassTransit(cnf =>
{
var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];
cfg.AddSagaStateMachine<ArcStateMachine, ArcProcess>()
.Endpoint(e => e.Name = BusConstants.SagaQueue)
.MongoDbRepository(connectionString, r =>
{
r.DatabaseName = "mongoRepo";
r.CollectionName = "WorkflowState";
});
cnf.AddConsumer<StartReconstructionConsumer>();
cnf.AddConsumer<ProcessingFinishedConsumer>();
cnf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(rabbitHost), hst =>
{
hst.Username("guest");
hst.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});
And the publish issue is related to the method being used, only PublishAsync allows the use of message initializers:
During(ProcessingStartedState,
When(ReconstructionFinishedEvent)
.Then(context =>
{
Console.WriteLine(">> ReconstructionFinishedEvent");
context.Instance.ActivationId = context.Data.ActivationId;
})
.PublishAsync(context =>
{
return context.Init<ProcessingFinishedMessage>(new { ActivationId = context.Data.ActivationId });
})
.TransitionTo(ProcessingFinishedState)
.Finalize());
That should sort you out.
Upvotes: 1
Reputation: 986
With the generous help of @Chris Patterson the working solution would be:
Definitions:
[BsonIgnoreExtraElements]
public class ArcProcess : SagaStateMachineInstance, ISagaVersion
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public int Version { get; set; }
public Guid ActivationId { get; set; }
}
public interface StartProcessingMessage
{
Guid ActivationId { get; }
}
public interface ProcessingFinishedMessage
{
Guid ActivationId { get; }
}
public static class MessageContracts
{
static bool _initialized;
public static void Initialize()
{
if (_initialized)
return;
GlobalTopology.Send.UseCorrelationId<StartProcessingMessage>(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId<ProcessingFinishedMessage>(x => x.ActivationId);
_initialized = true;
}
}
Consumers:
public class StartProcessingConsumer : IConsumer<StartProcessingMessage>
{
readonly ILogger<StartProcessingConsumer> _Logger;
private readonly int _DelaySeconds = 5;
public StartProcessingConsumer(ILogger<StartProcessingConsumer> logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<StartProcessingMessage> context)
{
var activationId = context.Message.ActivationId;
_Logger.LogInformation($"Received Scan: {activationId}");
await Task.Delay(_DelaySeconds * 1000);
_Logger.LogInformation($"Finish Scan: {activationId}");
await context.Publish<ProcessingFinishedMessage>(new { ActivationId = activationId });
}
}
public class ProcessingFinishedConsumer : IConsumer<ProcessingFinishedMessage>
{
readonly ILogger<ProcessingFinishedConsumer> _Logger;
public ProcessingFinishedConsumer(ILogger<ProcessingFinishedConsumer> logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<ProcessingFinishedMessage> context)
{
_Logger.LogInformation($"Finish {context.Message.ActivationId}");
await Task.CompletedTask;
}
}
StateMachine definition:
public class ArcStateMachine: MassTransitStateMachine<ArcProcess>
{
static ArcStateMachine()
{
MessageContracts.Initialize();
}
public ArcStateMachine()
{
InstanceState(x => x.CurrentState);
Initially(
When(ProcessingStartedEvent)
.Then(context =>
{
context.Instance.ActivationId = context.Data.ActivationId;
})
.TransitionTo(ProcessingStartedState));
During(ProcessingStartedState,
When(ProcessingFinishedEvent)
.Then(context =>
{
context.Instance.ActivationId = context.Data.ActivationId;
})
.Finalize());
}
public State ProcessingStartedState { get; }
public State ProcessingFinishedState { get; }
public Event<StartProcessingMessage> ProcessingStartedEvent { get; }
public Event<ProcessingFinishedMessage> ProcessingFinishedEvent { get; }
}
MassTransit setup:
var rabbitHost = Configuration["RABBIT_MQ_HOST"];
if (rabbitHost.IsNotEmpty())
{
services.AddMassTransit(cnf =>
{
var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];
cnf.AddSagaStateMachine<ArcStateMachine, ArcProcess>()
.Endpoint(e => e.Name = BusConstants.SagaQueue)
.MongoDbRepository(connectionString, r =>
{
r.DatabaseName = "mongoRepo";
r.CollectionName = "WorkflowState";
});
cnf.AddConsumer(typeof(StartProcessingConsumer));
cnf.AddConsumer(typeof(ProcessingFinishedConsumer));
cnf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(rabbitHost), hst =>
{
hst.Username("guest");
hst.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "MyApp", Version = "v1" });
});
}
This example helped me a lot in understanding how the basics of MassTrasit work.
Upvotes: 1