Reputation: 1560
I'm working on a project and everything is working, but I have one tightly coupled dependency that I don't understand how to invert/inject.
The problem is in my Consumer class, which will receive a command message to start a process, which is part of a global message queue services project, e.g. MyCompany.MQ.Services
, yet has a tightly coupled dependency on the process the command message tells it to start, e.g.:
public Task Consume(ConsumeContext<MyMessageInterface> context)
{
logger = new LoggerConfiguration()
.WriteTo.Console()
.CreateLogger();
try
{
TightCoupleProcess tcp = new TightCoupleProcess(context);
logger.Information("{Blah}, {Blah}, {Blah}", context.Message.exampleVar1, context.Message.exampleVar2, context.Message.exampleVar3);
tcp.StartProcess();
return Task.CompletedTask;
}
catch (Exception ex)
{
return Task.FromException(ex);
}
}
Task Consume
is a part of MassTransit and I cannot modify the signature of Consume
as that is actually the implementation of the IConsumer
interface of MassTransit
.
I guess what I want is a way to invert/inject that dependency so that my global MQ.services
project doesn't depend on the project calling it. I think I have some misapprehension about inversion/injection, but I'm not sure how to articulate my shortcomings. Perhaps what I want isn't possible. I know I can't modify the interface implementation, but I'd be cool if something like the following worked, but since Consume
is an implementation of a MassTransit interface, I don't think I can inject an anonymous function from my calling class:
public Task Consume(ConsumeContext<MyMessageInterface> context, AnonFunc() func)
{
try
{
func(context)
logger.Information("{Blah}, {Blah}, {Blah}", context.Message.exampleVar1, context.Message.exampleVar2, context.Message.exampleVar3);
return Task.CompletedTask;
}
catch (Exception ex)
{
return Task.FromException(ex);
}
}
I've managed to get around other dependencies such as message type definitions by using reflection and putting this logic in the MQ.Services
project, which allows me to keep all of TightCoupleProcess
process related code outside of the MQ.serives
project, e.g.:
public void PublishMessage(object msg)
{
MethodInfo method = this.GetType().GetMethod("InvokePublish");
MethodInfo generic = method.MakeGenericMethod(msg.GetType());
generic.Invoke(this, new object[] { msg });
}
public void InvokePublish<T>(object msg)
{
Task.Run(async () =>
{
await busControl.Publish(msg);
}).Wait();
}
But I can't apply a similar strategy for the Consumer
because of the constraints I've already mentioned as well as, I'm sure, a healthy dose of ignorance.
If this is possible will someone please point me in the right direction?
More Info:
Project: App.SubscriberConsole
-> references App.Services.Subscriber
Project: App.Services.Subscriber
-> references MyCompany.MQ.Services.Consumer
, et al
Project MyCompany.MQ.Services.Consumer
-> references MassTransit
-> implements MassTransit.IConsumer
Upvotes: 4
Views: 4218
Reputation: 19610
I am not sure why you think about injecting into the Consume
method. The method signature comes from the interface and you cannot change it.
You should be injecting in the consumer class constructor. Thinking about injecting a factory delegate is the right one.
public class MyMessageConsumer : IConsumer<MyMessage>
{
private readonly Func<IConsumeContext<MyMessage>, TightCoupleProcess> factory;
public MyMessageConsumer(Func<IConsumeContext<MyMessage>, TightCoupleProcess> factory)
{
_factory = factory;
}
public Task Consume(ConsumeContext<MyMessage> context)
{
var tcp = _factory(context);
tcp.StartProcess();
return Task.CompletedTask;
}
}
Then you configure it like this:
Func<IConsumeContext<MyMessage>, TightCoupleProcess> factory = c => new TightCoupleProcess(c);
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint(host, "customer_update_queue", e =>
{
e.Consumer<MyMessageConsumer>(() => new MyMessageConsumer(factory));
});
});
You can find more overloads for consumer configuration method on the endpoint in the documentation.
One additional thing. You have serious issue with Serilog. You create logger configuration for each message, which you consume. This is not right. You should create logger configuration once in your application entry point.
Then, you either inject your logger, or use the global Log
object, or use MassTransit.SerilogIntegration
package and use MassTransit logging in your consumers.
Upvotes: 6