Reputation: 621
Purpose: I need to keep track of headers when I redeliver a message.
Configuration:
What I've tried without success:
first attempt:
await context.Redeliver(TimeSpan.FromSeconds(5), (consumeCtx, sendCtx) => {
if (consumeCtx.Headers.TryGetHeader("SenderApp", out object sender))
{
sendCtx.Headers.Set("SenderApp", sender);
}
}).ConfigureAwait(false);
second attempt:
protected Task ScheduleSend(Uri rabbitUri, double delay)
{
return GetBus().ScheduleSend<IProcessOrganisationUpdate>(
rabbitUri,
TimeSpan.FromSeconds(delay),
_Data,
new HeaderPipe(_SenderApp, 0));
}
public class HeaderPipe : IPipe<SendContext>
{
private readonly byte _Priority;
private readonly string _SenderApp;
public HeaderPipe (byte priority)
{
_Priority = priority;
_SenderApp = Assembly.GetEntryAssembly()?.GetName()?.Name ?? "Default";
}
public HeaderPipe (string senderApp, byte priority)
{
_Priority = priority;
_SenderApp = senderApp;
}
public void Probe (ProbeContext context)
{ }
public Task Send (SendContext context)
{
context.Headers.Set("SenderApp", _SenderApp);
context.SetPriority(_Priority);
return Task.CompletedTask;
}
}
Expected: FinQuest.Robot.DBProcess
Result: null
I log in Consume method my SenderApp. The first time it's look like this
Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: FinQuest.Robot.DBProcess, modif: LinkedIn)
and looks like this after the redelivery
Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: , modif: LinkedIn)
What I'm doing wrong ? I don't want to use the Retry feature due to its maximum number of retry (I don't want to be limited).
Thanks in advance.
Upvotes: 4
Views: 420
Reputation: 33288
There is a method, used by the redelivery filter, that you might want to use:
https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit/SendContextExtensions.cs#L90
public static void TransferConsumeContextHeaders(this SendContext sendContext, ConsumeContext consumeContext)
In your code, you would use it:
await context.Redeliver(TimeSpan.FromSeconds(5), (consumeCtx, sendCtx) => {
sendCtx.TransferConsumeContextHeaders(consumeCtx);
});
Upvotes: 3