Reputation: 2063
I have the following different scenarios:
Scenario 1
RabbitMQ is up.
Consumer starts and subscribes correctly through MassTransit to RabbitMQ.
RabbitMQ goes down.
RabbitMQ goes up again.
If Consumer tried to subscribe with busControl.StartAsync(); then it won't reconnect anymore
If Consumer tried to subscribe with busControl.Start(); then it will correctly reconnect.
Already find this a bit weird.. but okey.
Scenario 2
RabbitMQ is down
Consumer goes up and tries to subscribe with busControl.Start();, which will fail due to RabbitMQ down
RabbitMQ goes up
Consumer is not able to reconnect to RabbitMQ
But I need this case to be covered somehow..
Registration:
private static void RegisterRabbitMQ(SimpleInjectorContainer container)
{
container.AddMassTransit(x =>
{
x.AddBus(() => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(ConfigurationValuesProvider.Current.Get("RabbitMQHostName"), hostConfigurator =>
{
#if !DEBUG
hostConfigurator.Username(ConfigurationValuesProvider.Current.Get("RabbitMQUsername"));
hostConfigurator.Password(ConfigurationValuesProvider.Current.Get("RabbitMQPassword"));
hostConfigurator.UseCluster(c =>
{
string[] hostnames = ConfigurationValuesProvider.Current.Get("RabbitMQNodes").Split(';');
c.ClusterMembers = hostnames;
});
#endif
});
}));
});
}
Starting bus:
try
{
RegisterRabbitMQ(container);
var busControl = container.GetInstance<IBusControl>();
busControl.Start();
}
catch (Exception ex)
{
Log.Seq.Error("RabbitMQ broker is not reachable, BusControl cannot be started. ", ex);
}
thanks in advance!
Upvotes: 1
Views: 2755
Reputation: 645
We've been using Mass Transit in production for quite some time now. Once Mass Transit connects to Rabbit the connection management of mass transit is really solid. It is the starting and stopping of VM's /Containers and rabbit being down when your consumer services come up is the real problem i believe.
The way we approached this issue is to move the starting of a consumer onto a different thread of the service and implement a retry operation. This will allow your service to start successfully and to go into an loop and wait until rabbit comes up. Importantly we log that the service cannot connect to rabbit, so we have visibility of that.
code
private async Task StartAsync()
{
await Task.Run(() => {
bool isConnected = false;
while(!isConnected)
{
try
{
ServiceBus = CreateServiceBus();
ServiceBus?.Start();
isConnected = true;
_loggingService.Info("Messaging service bus started.");
}
catch(Exception ex)
{
_loggingService.Error("Cannot connect to message broker host.", ex);
Thread.Sleep(Defined_Time);
}
}
});
}
then to configure your bus is exactly as you see in the demo's
private IBusControl CreateServiceBus()
{
var url = string.Format(CultureInfo.InvariantCulture, "rabbitmq://{0}", _messagingServerName);
var uri = new Uri(url);
_loggingService.DebugFormat("Starting messaging service bus: \"{0}\". Username=\"{1}\".", url, _messagingUsername);
return Bus.Factory.CreateUsingRabbitMq(sbc =>
{
Host = sbc.Host(uri, h =>
{
h.Username(_messagingUsername);
h.Password(_messagingPassword);
});
//Configure end points
});
}
Hope that helps.
Upvotes: 2