Reputation: 21
This is not about Delayed message delivery. Lately I've noticed my endpoint has a delay in handling a message when looking at when the message is taken from the RabbitMQ queue.
I was wondering if there is some configuration option I’m overlooking that has an impact on the delay between the receipt of the message from RabbitMQ by NServiceBus and the actual handling of the message (command) in the IHandleMessages<> handler.
I’m seeing a consistent delay when comparing the NServiceBus logfile and the logfile of the handler. It does vary a bit between client and server but in a given session it’s always constant. Now I’m running through all the options that I can find but there’s no real pointer for me out in the wild.
The thing is I’m running a workflow solution that uses NServiceBus to do all the messaging between the workflow server (based on MS WF) and 50/60 clients. The server runs on NServiceBus 7.6.0 but all client still use NServiceBus 5.2.26. I decided to write a small test client that also runs on NServiceBus 7.6.0 but that also shows the delay albeit a smaller one. But the delay between individual commands is constant at 6.5 seconds in the test client. So the message is retrieved by NSB from the queue and 6.5 seconds the IHandleMessages<> handler is invoked.
I’ve tried a couple of things (direct routing vs conventional routing, making sure license files where distributed properly for example). The only thing I’m able to come up right now is flat out making a new test server of the workflow server I’m using that is only able to accept 1 command, generating a reply and nothing more and see if that makes a difference.
Configuration wise the solution runs on a Windows Server 2016 having 4 cores, RabbitMQ 3.7.21 and NServiceBus 7.6.0. Clients run on Windows 10 machines using NServiceBus 5.2.26 but the test client uses NServiceBus 7.6.0.
Now for the logs. This is the NServicebus log when it receives the GetVersionCommand:
2022-01-20 16:09:58.104 DEBUG Processing message type: HyFlo.Service.Messages.Commands.Info.GetVersionCommand
Message headers:
CorrelationId : 619d678d-25bf-46c0-b5c0-57c624e3d557
NServiceBus.MessageId : 548813d5-2465-4ff5-904d-ae2300f9ee23
NServiceBus.MessageIntent : Send
NServiceBus.ConversationId : 367575b0-7bee-40d0-90ca-ae2300f9ee24
NServiceBus.CorrelationId : 548813d5-2465-4ff5-904d-ae2300f9ee23
NServiceBus.TimeToBeReceived : 00:10:00
NServiceBus.ReplyToAddress : Hyflo.Client.Test
NServiceBus.OriginatingMachine : DYP297
NServiceBus.OriginatingEndpoint : Hyflo.Client.Test
$.diagnostics.originating.hostid : 3a3bbd6661214b655aa6dbcd95d112a2
NServiceBus.ContentType : text/xml
NServiceBus.EnclosedMessageTypes : HyFlo.Service.Messages.Commands.Info.GetVersionCommand, HyFlo.Service.Messages, Version=4.10.3.26734, Culture=neutral, PublicKeyToken=null;HyFlo.Types.Interfaces.IInfoCommand, HyFlo.Types, Version=4.10.8.20782, Culture=neutral, PublicKeyToken=null
NServiceBus.Version : 7.6.0
NServiceBus.TimeSent : 2022-01-20 15:09:58:107461 Z
NServiceBus.NonDurableMessage : False
Handlers to invoke:
HyFlo.Service.CommandHandlers.Info.GetVersionHandler
Then the custom log of the GetVersionHandler:
2022-01-20 16:10:04.6516 TRACE = New Message =====================================================================================================================================
2022-01-20 16:10:04.6516 TRACE CommandEventId:9125b036-c315-484e-9948-889ed1e56587: New message handled by handler of type 'HyFlo.Service.CommandHandlers.Info.GetVersionHandler' with CorrelationId '619d678d-25bf-46c0-b5c0-57c624e3d557' ..
2022-01-20 16:10:04.6516 TRACE ===================================================================================================================================================
2022-01-20 16:10:04.6516 INFO Cache contains a systemstatus ..
2022-01-20 16:10:04.6516 INFO Retrieved systemstatus from cache is: 0
2022-01-20 16:10:04.6516 INFO Running the injected code ..
2022-01-20 16:10:04.6672 TRACE ===================================================================================================================================================
2022-01-20 16:10:04.6672 TRACE CommandEventId:9125b036-c315-484e-9948-889ed1e56587: Message processed by handler of type 'HyFlo.Service.CommandHandlers.Info.GetVersionHandler' with CorrelationId '619d678d-25bf-46c0-b5c0-57c624e3d557' ..
2022-01-20 16:10:04.6672 TRACE Elapsed time: 00:00:00.0156159 ..
2022-01-20 16:10:04.6672 TRACE = End of Message ==================================================================================================================================
The command is received at 2022-01-20 16:09:58.104 but is delivered at the Handler at 2022-01-20 16:10:04.6516 and this delay is quite consistent between this and other commands. I just can't get my head around why this delay is there.
The endpoint configuration is the following:
internal class ProgramService : ServiceBase
{
private IEndpointInstance _endpointInstance = null;
/// <summary>
/// Determines if application is ran as service or just as console application
/// </summary>
/// <param name="name"></param>
/// <returns>true if it's a service</returns>
private static bool IsService(string name)
{
if (!Environment.UserInteractive) return true;
var sc = new ServiceController(name);
try
{
return sc.Status == ServiceControllerStatus.StartPending;
}
catch (InvalidOperationException)
{
return false;
}
}
/// <summary>
/// Main entry
/// </summary>
private static void Main()
{
using (var service = new ProgramService())
{
// so we can run interactive from Visual Studio or as a windows service
if (!IsService("HyFlo.Service"))
{
Console.CancelKeyPress += (sender, e) => { service.OnStop(); };
service.OnStart(null);
Console.WriteLine("\r\nPress enter key to stop program\r\n");
Console.Read();
service.OnStop();
return;
}
Run(service);
}
}
/// <summary>
/// On critical errors bail out
/// </summary>
/// <param name="errorMessage"></param>
/// <param name="exception"></param>
async Task OnCriticalError(ICriticalErrorContext context)
{
var fatalMessage = $"The following critical error was encountered:\n{context.Error}\nProcess is shutting down.";
try
{
Console.WriteLine("fatalMessage: " + fatalMessage);
TraceWriter.Error(fatalMessage);
if (context.Exception.InnerException != null)
{
TraceWriter.Error("innerException message: " + context.Exception.InnerException.Message + @"\n" +
context.Exception.InnerException.StackTrace);
}
await context.Stop().ConfigureAwait(false);
}
finally
{
Environment.FailFast(fatalMessage, context.Exception);
}
}
/// <summary>
/// Starting the service
/// </summary>
/// <param name="args"></param>
private async Task AsyncOnStart()
{
TraceWriter.Trace("AsyncOnStart() running .. ");
try
{
TraceWriter.Info("Running configuration management ..");
var config =
ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
var section =
config.GetSection("connectionStrings");
if (!section.SectionInformation.IsProtected)
{
section.SectionInformation.ProtectSection("DPAPIProtection");
section.SectionInformation.ForceSave = true;
config.Save(ConfigurationSaveMode.Modified);
}
var endpointConfig = new EndpointConfiguration("hyflo.service");
endpointConfig.EnableInstallers();
endpointConfig.SendFailedMessagesTo("hyflo.service.errors");
endpointConfig.AuditProcessedMessagesTo("hyflo.service.audit");
endpointConfig.UseSerialization<XmlSerializer>();
endpointConfig.DefineCriticalErrorAction(OnCriticalError);
endpointConfig.LimitMessageProcessingConcurrencyTo(8);
var persistenceMethod = ConfigurationManager.AppSettings["NServicebusPersistence"];
if (persistenceMethod.ToUpper() == "RAVENDB")
{
string connectionString = ConfigurationManager.ConnectionStrings["NServiceBus/Persistence"].ConnectionString;
TraceWriter.Info($"Setting persistence to RavenDBPersistence based on connectionString '{connectionString}' .. ");
var documentStore = new Raven.Client.Document.DocumentStore
{
ConnectionStringName = "NServiceBus/Persistence",
DefaultDatabase = "HyFlo.Service"
};
documentStore.Initialize();
endpointConfig.UsePersistence<RavenDBPersistence>().SetDefaultDocumentStore(documentStore);
}
else
{
TraceWriter.Info("Setting persistence to InMemoryPersistence .. ");
endpointConfig.UsePersistence<InMemoryPersistence>();
}
var transportConnection = ConfigurationManager.ConnectionStrings[@"NServiceBus/Transport"];
string transportConnectionString = transportConnection.ConnectionString;
if (String.IsNullOrEmpty(transportConnectionString))
{
transportConnectionString = "host=localhost";
}
TraceWriter.Info($"Configuring RabbitMQTransport for connection '{transportConnectionString}' .. ");
var transport = endpointConfig.UseTransport<RabbitMQTransport>();
transport.ConnectionString(transportConnectionString);
transport.UseConventionalRoutingTopology();
string databusBasePath = ConfigurationManager.AppSettings["DataBusBasePath"] ?? "";
TraceWriter.Info($"Setting Databus's basepath to '{databusBasePath}' ..");
endpointConfig.UseDataBus<FileShareDataBus>().BasePath(databusBasePath);
TraceWriter.Info("Scannning for Hyflo assemblies .. ");
List<string> hyfloAssemblies =
Directory.GetFiles(Directory.GetCurrentDirectory(), "HyFlo.*dll", SearchOption.TopDirectoryOnly).ToList();
TraceWriter.Info("Initializing Autofac with assemblies .. ");
foreach (string assemblyName in hyfloAssemblies)
{
TraceWriter.Info($"Scanning '{assemblyName}' for an Autofac module .. ");
}
try
{
var containerSettings = endpointConfig.UseContainer(new AutofacServiceProviderFactory());
containerSettings.ConfigureContainer(containerBuilder =>
{
var loadedAssemblies = hyfloAssemblies.Select(Assembly.LoadFile).ToList();
containerBuilder.RegisterAssemblyModules(loadedAssemblies.ToArray());
});
}
catch (Exception ex)
{
TraceWriter.Error($"{ex.Message}\n{ex.StackTrace}\n{ex.InnerException?.Source}");
await OnCriticalError(new CriticalErrorContext(null, $"Exception occurred during initialization. Exception is: {ex.Message}\n{ex.StackTrace}\n{ex.InnerException?.Source}", ex));
}
TraceWriter.Info("Setting up default message conventions ..");
var conventions = endpointConfig.Conventions();
conventions.DefiningTimeToBeReceivedAs(type => TimeSpan.FromMinutes(10));
var logFactory = LogManager.Use<DefaultFactory>();
logFactory.Level(LogLevel.Debug);
endpointConfig.EnableFeature<HyfloInitializationFeature>();
TraceWriter.Info("Initialized! Now starting Hyflo ..");
_endpointInstance = await Endpoint.Start(endpointConfig);
TraceWriter.Info("Hyflo started ..");
}
catch (Exception exception)
{
TraceWriter.Error($"{exception.Message}\n{exception.StackTrace}\n{exception.InnerException?.Source}");
await OnCriticalError(new CriticalErrorContext(null, "Failed to start the bus.", exception));
}
}
/// <summary>
/// Stopping the service
/// </summary>
/// <returns></returns>
private Task AsyncOnStop()
{
TraceWriter.Info("Shutting down Hyflo ..");
if (_endpointInstance != null)
{
return _endpointInstance.Stop();
}
return Task.CompletedTask;
}
// <summary>
/// Starting the service
/// </summary>
/// <param name="args"></param>
protected override void OnStart(string[] args)
{
TraceWriter.Trace("ProgramService.OnStart() ..");
AsyncOnStart().GetAwaiter().GetResult();
}
/// <summary>
/// Stopping the service
/// </summary>
protected override void OnStop()
{
TraceWriter.Trace("ProgramService.OnStop() ..");
AsyncOnStop().GetAwaiter().GetResult();
}
}
The HyfloInitializationFeature is a startup task that sends a list of all current workflows to the connected clients.
using HyFlo.Tasks.Interfaces;
using NServiceBus.Features;
namespace HyFlo.Service
{
public class HyfloInitializationFeature : Feature
{
protected override void Setup(FeatureConfigurationContext context)
{
context.RegisterStartupTask(c => new SystemStartupTask(c.Build<IGetInfoOnAllWorkflowsTask>(), c.Build<ISystemTestTask>()));
}
}
}
This is the SystemStartupTask:
namespace HyFlo.Service
{
public class SystemStartupTask : FeatureStartupTask, IDisposable
{
private readonly IGetInfoOnAllWorkflowsTask _getInfoOnAllWorkflowsTask;
private readonly ISystemTestTask _systemTestTask;
public SystemStartupTask(IGetInfoOnAllWorkflowsTask getInfoOnAllWorkflowsTask, ISystemTestTask systemTestTask)
{
_getInfoOnAllWorkflowsTask = getInfoOnAllWorkflowsTask;
_systemTestTask = systemTestTask;
}
public void Dispose()
{
}
/// <summary>
/// OnStart
/// </summary>
/// <param name="session"></param>
/// <returns></returns>
protected override async Task OnStart(IMessageSession session)
{
TraceWriter.Trace($"==> Running SystemStartupTask.OnStart() ..");
DeleteHeartbeatIndicator();
ObjectCache cache = MemoryCache.Default;
var policy = new CacheItemPolicy { SlidingExpiration = (TimeSpan.FromSeconds(15)) };
bool systemOk = false;
if (_systemTestTask != null)
{
_systemTestTask.ContextWrapper = new MessageContextWrapper { Session = session };
_systemTestTask.QuickTest = false;
_systemTestTask.TestComponentsMask = 31;
_systemTestTask.SystemStartup = true;
if (_systemTestTask.Run())
{
var systemResultTexts = new List<string>();
if (int.TryParse(_systemTestTask.Results.ToString(), out int systemsResult))
{
if ((systemsResult & 1) == 1)
{
systemResultTexts.Add("HSB offline");
}
if ((systemsResult & 2) == 2)
{
systemResultTexts.Add("HDM offline");
}
if ((systemsResult & 4) == 4)
{
systemResultTexts.Add("SqlServer offline");
}
if ((systemsResult & 8) == 8)
{
systemResultTexts.Add("Workflow Queueing offline");
}
}
TraceWriter.Trace(
$"SystemStartupTask: Results returned by systemtest: '{(!systemResultTexts.Any() ? "All online" : String.Join(",", systemResultTexts))}' ..");
if (!_systemTestTask.Failure && !systemResultTexts.Any())
{
TraceWriter.Info("HyFlo dependencies all up and running ..");
systemOk = true;
// for caching purposes
if (_getInfoOnAllWorkflowsTask != null)
{
_getInfoOnAllWorkflowsTask.UserId = "";
_getInfoOnAllWorkflowsTask.Run();
}
}
else
{
TraceWriter.Warn("HyFlo can't be started. One or more of its dependencies returned a failure!");
}
}
}
else
{
TraceWriter.Warn("A general failure occurred during the Hyflo systemcheck!");
}
var allworkflowStates = new WorkflowState[0];
if (_getInfoOnAllWorkflowsTask != null)
{
if (!_getInfoOnAllWorkflowsTask.Failure && _getInfoOnAllWorkflowsTask.Results is List<WorkflowState>)
{
allworkflowStates = (_getInfoOnAllWorkflowsTask.Results as List<WorkflowState>).ToArray();
}
TraceWriter.Info(
$"Workflowstate retrieval completed. Failure: '{_getInfoOnAllWorkflowsTask.Failure}', Results: '{allworkflowStates.Count()}' workflow{(allworkflowStates.Count() == 1 ? "" : "s")} found ..");
var timeStamp = DateTime.Now;
TraceWriter.Trace("Sending IStartedHyflo @ GUI with the following parameters:\n" +
$"\tErrorCode: '{Convert.ToInt32(_systemTestTask?.Results)}'\n" +
$"\tFailure: '{!systemOk}'\n" + $"\tTimeStamp: '{timeStamp}'\n" +
$"\tResulting workflowcount: '{allworkflowStates.Length}'\n");
await session.Publish<IStartedHyflo>(evt =>
{
evt.ErrorCode = Convert.ToInt32(_systemTestTask?.Results);
evt.Failure = !systemOk;
evt.TimeStamp = timeStamp;
evt.Result = new DataBusProperty<WorkflowState[]>(allworkflowStates);
}).ConfigureAwait(false);
cache.Set("SystemTest", systemOk, policy);
}
}
/// <summary>
/// OnStop
/// </summary>
/// <param name="session"></param>
/// <returns></returns>
protected override async Task OnStop(IMessageSession session)
{
TraceWriter.Trace($"==> Running SystemStartupTask.OnStop() ..");
TraceWriter.Info("Deleting heartbeat indicator .. ");
DeleteHeartbeatIndicator();
var timeStamp = DateTime.Now;
TraceWriter.Trace("Sending IShutdownHyflo @ GUI with the following parameters:\n" + "\tErrorCode: '0'\n" +
"\tFailure: 'false'\n" + $"\tTimeStamp: '{timeStamp}'");
await session.Publish<IShutdownHyflo>(evt =>
{
evt.ErrorCode = 0;
evt.Failure = false;
evt.TimeStamp = timeStamp;
}).ConfigureAwait(false);
TraceWriter.Info("HyFlo has shutted down .. ");
}
/// <summary>
/// Function deletes the heartbeat.txt file that indicates wether a heartbeat flow is running or not
/// </summary>
private void DeleteHeartbeatIndicator()
{
string stateFolder = ConfigurationManager.AppSettings["StateFolder"];
string fullStateFolder = stateFolder.IndexOf(":", StringComparison.Ordinal) == -1
? $"{AppDomain.CurrentDomain.BaseDirectory}{stateFolder}"
: stateFolder;
string fileName = $@"{fullStateFolder}\heartbeat.txt";
if (File.Exists(fileName))
{
File.Delete(fileName);
}
}
}
}
The GetVersionHandler is the following:
namespace HyFlo.Service.CommandHandlers.Info
{
public class GetVersionHandler : HandlerBase, IHandleMessages<GetVersionCommand>
{
public GetVersionHandler(ISystemTestTask systemTestTask, ITaskBuilderController taskBuilderController)
{
_systemTestTask = systemTestTask;
_taskBuilderController = taskBuilderController;
}
public virtual async Task Handle(GetVersionCommand message, IMessageHandlerContext context)
{
if (message == null)
return;
await RunDirect(async () =>
{
var assemblyVersion = Assembly.GetAssembly(typeof(GetVersionHandler)).GetName().Version;
var reply = new GetVersionReply
{
Failure = false,
Version = $"{assemblyVersion.Major}.{assemblyVersion.Minor}",
TimeStamp = DateTime.Now,
TaskId = CommandEventId,
ErrorCode = 0
};
await Retry.ExecuteAsync(async () =>
{
var replyOptions = new ReplyOptions();
replyOptions.SetHeader("CorrelationId", CorrelationId);
await context.Reply(reply, replyOptions);
});
}, new MessageContextWrapper { HandlerContext = context });
}
}
}
The RunDirect method is located in the HandlerBase class and one of the first things it does is log that a new message is being handled.
public async Task RunDirect(Action codeToRun, IMessageContextWrapper contextWrapper)
{
_taskBuilderController.HandlerBase = this;
CommandEventId = Guid.NewGuid();
ContextWrapper = contextWrapper;
CorrelationId = GetHeaderValue("CorrelationId");
string fullMethodName = GetType().ToString();
TraceWriter.Trace("= New Message =====================================================================================================================================");
TraceWriter.Trace($"CommandEventId:{CommandEventId}: New message handled by handler of type '{fullMethodName}' with CorrelationId '{CorrelationId}' .. ");
TraceWriter.Trace("===================================================================================================================================================");
...
The last thing is the test client I've used to send this GetVersionCommand:
namespace HPS_EndpointTest
{
internal class Program
{
static void Main(string[] args)
{
var endpointConfiguration = new EndpointConfiguration("Hyflo.Client.Test");
endpointConfiguration.EnableInstallers();
endpointConfiguration.UsePersistence<InMemoryPersistence>();
endpointConfiguration.PurgeOnStartup(true);
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
endpointConfiguration.UseTransport<RabbitMQTransport>().ConnectionString("host=hswv0601;username=hyflo;password=hyflo");
endpointConfiguration.UseTransport<RabbitMQTransport>().UseDirectRoutingTopology();
IEndpointInstance endpointInstance = null;
AsyncPump.Run(async delegate
{
endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);
});
Console.WriteLine(DateTime.Now.ToString("dd-MM-yyyyTHH:mm:ss.ffff") + ": Client started ..");
var correlationId = Guid.NewGuid();
string destination = "hyflo.service";
var sendOptions = new SendOptions();
sendOptions.SetHeader("CorrelationId", correlationId.ToString());
sendOptions.SetDestination(destination);
sendOptions.RequireImmediateDispatch();
var versionMessage = new HyFlo.Service.Messages.Commands.Info.GetVersionCommand();
Console.WriteLine(DateTime.Now.ToString("dd-MM-yyyyTHH:mm:ss.ffff") + ": Sending message ..");
AsyncPump.Run(async delegate
{
if (endpointInstance != null)
{
await endpointInstance.Send(versionMessage, sendOptions).ConfigureAwait(false);
}
});
Console.WriteLine(DateTime.Now.ToString("dd-MM-yyyyTHH:mm:ss.ffff") + ": Message sent!");
while (true)
{
var key = Console.ReadKey();
if (key.Key == ConsoleKey.Enter)
{
break;
}
}
AsyncPump.Run(async delegate
{
await endpointInstance.Stop().ConfigureAwait(false);
});
}
}
public class RetrieveAllWorkflowsHandler : IHandleMessages<HyFlo.Service.Messages.Events.IRetrievedAllWorkflowsEvent>
{
public Task Handle(HyFlo.Service.Messages.Events.IRetrievedAllWorkflowsEvent message, IMessageHandlerContext context)
{
Console.WriteLine(DateTime.Now.ToString("dd-MM-yyyyTHH:mm:ss.ffff") + ": IRetrievedAllWorkflowEvents reply received ..");
return Task.CompletedTask;
}
}
public class RetrieveVersionHandler : IHandleMessages<GetVersionReply>
{
public Task Handle(GetVersionReply message, IMessageHandlerContext context)
{
Console.WriteLine(DateTime.Now.ToString("dd-MM-yyyyTHH:mm:ss.ffff") + ": GetVersionReply reply received ..");
return Task.CompletedTask;
}
}
}
I just don't understand why this delay is there.
What I'm doing right now is create a completely new endpoint with limited functionality (ie only supporting the GetVersionCommand) and see what happens with that. If the delay isn't there I'll expand support with other commands and see what happens then. Hope any of you guys sees this and slaps him/herself to the head and replying "but you forgot so and so".
Thanks in advance!!
Upvotes: 1
Views: 633
Reputation: 21
With help of Daniel Marbach I managed to pin point the problem. Because of a complex object graph in one of the classes of the solution (the class that is the primary access point to the solution in fact) the delay occurred. Now when creating either specialised classes or injecting the actual needed dependency there was virtually no delay.
So the final solution was to alter each and every IHandleMessages-handler to have it injected a targeted dependency with which the handler could work.
Upvotes: 1