Reputation: 1716
I have a saga that has 3 states; Initial, ReceivingRows, Completed -
public static State Initial { get; set; }
public static State ReceivingRows { get; set; }
public static State Completed { get; set; }
It transitions from Initial to ReceivingRows when it gets a BofMessage (where Bof = Beginning of file). After the BofMessage, it receives a large number of RowMessages where each describes a row in a flat file. Once all RowMessages are sent, an EofMessage is sent and the state changes to Completed. Observe -
static void DefineSagaBehavior()
{
Initially(When(ReceivedBof)
.Then((saga, message) => saga.BeginFile(message))
.TransitionTo(ReceivingRows));
During(ReceivingRows, When(ReceivedRow)
.Then((saga, message) => saga.AddRow(message)));
During(ReceivingRows, When(ReceivedRowError)
.Then((saga, message) => saga.RowError(message)));
During(ReceivingRows, When(ReceivedEof)
.Then((saga, message) => saga.EndFile(message))
.TransitionTo(Completed));
}
This works, except sometimes several RowMessages are received before the BofMessage! This is regardless of the order that I sent them. This means that the messages will be received and ultimately counted as errors, causing them to be missing from the database or file that I finally write them out to.
As a temporary fix, I add a little sleep timer hack in this method that does all the publishing –
public static void Publish(
[NotNull] IServiceBus serviceBus,
[NotNull] string publisherName,
Guid correlationId,
[NotNull] Tuple<string, string> inputFileDescriptor,
[NotNull] string outputFileName)
{
// attempt to load offsets
var offsetsResult = OffsetParser.Parse(inputFileDescriptor.Item1);
if (offsetsResult.Result != ParseOffsetsResult.Success)
{
// publish an offsets invalid message
serviceBus.Publish<TErrorMessage>(CombGuid.Generate(), publisherName, inputFileDescriptor.Item2);
return;
}
// publish beginning of file
var fullInputFilePath = Path.GetFullPath(inputFileDescriptor.Item2);
serviceBus.Publish<TBofMessage>(correlationId, publisherName, fullInputFilePath);
// HACK: make sure bof message happens before row messages, or else some row messages won't be received
Thread.Sleep(5000);
// publish rows from feed
var feedResult = FeedParser.Parse(inputFileDescriptor.Item2, offsetsResult.Offsets);
foreach (var row in feedResult)
{
// publish row message, unaligned if applicable
if (row.Result != ParseRowResult.Success)
serviceBus.Publish<TRowErrorMessage>(correlationId, publisherName, row.Fields);
else
serviceBus.Publish<TRowMessage>(correlationId, publisherName, row.Fields);
}
// publish end of file
serviceBus.Publish<TEofMessage>(correlationId, publisherName, outputFileName);
}
It’s a 5 second sleep-timer, and is quite an ugly hack. Can anyone inform me why I’m not getting the messages in the order I send them? Can I ensure that these message get sent in the right order if they are unordered by default?
Thank you!
Please note this is cross-posted from http://groups.google.com/group/masstransit-discuss/browse_thread/thread/7bd9518a690db4bb for expedience.
Upvotes: 2
Views: 2924
Reputation: 10547
You cannot ensure messages get delivered in any order. You can get close in MT by ensuring there's only one concurrent consumer on the consumer side, I still wouldn't depend on this behaviour (http://docs.masstransit-project.com/en/latest/overview/keyideas.html#handlers). This would effectively make your consumer single threaded.
Upvotes: 6