Ragesh Puthiyedath Raju
Ragesh Puthiyedath Raju

Reputation: 3939

Middleware with Masstransit publish

I have .net core WEB API application with MassTransit (for implement RabbitMQ message broker). RabbitMQ-MassTransit configuration is simple and done in few line code in Startup.cs file.

services.AddMassTransit(x =>
        {
            x.AddConsumer<CustomLogConsume>();

            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri("rabbitmq://rabbitmq/"), h =>
                {
                    h.Username("guest");
                    h.Password("guest");
                });

                cfg.ExchangeType = ExchangeType.Fanout;

                cfg.ReceiveEndpoint(host, "ActionLog_Queue", e =>
                {
                    e.PrefetchCount = 16;
                });

                // or, configure the endpoints by convention
                cfg.ConfigureEndpoints(provider);
            }));
        });

I am using dependency injection in my project solution for better code standard. Publish messages are works fine with controller dependency injection. But when I implement a custom middle ware for log actions, Masstransit failed to publish the message properly, it was created a additional queue with _error in RabbitMQ web console.

public class RequestResponseLoggingMiddleware
{
    #region Private Variables

    /// <summary>
    /// RequestDelegate
    /// </summary>
    private readonly RequestDelegate _next;

    /// <summary>
    /// IActionLogPublish
    /// </summary>
    private readonly IActionLogPublish _logPublish;

    #endregion

    #region Constructor
    public RequestResponseLoggingMiddleware(RequestDelegate next, IActionLogPublish logPublish)
    {
        _next = next;
        _logPublish = logPublish;
    }
    #endregion

    #region PrivateMethods

    #region FormatRequest
    /// <summary>
    /// FormatRequest
    /// </summary>
    /// <param name="request"></param>
    /// <returns></returns>
    private async Task<ActionLog> FormatRequest(HttpRequest request)
    {
        ActionLog actionLog = new ActionLog();
        var body = request.Body;
        request.EnableRewind();

        var context = request.HttpContext;

        var buffer = new byte[Convert.ToInt32(request.ContentLength)];
        await request.Body.ReadAsync(buffer, 0, buffer.Length);
        var bodyAsText = Encoding.UTF8.GetString(buffer);
        request.Body = body;

        var injectedRequestStream = new MemoryStream();

        var requestLog = $"REQUEST HttpMethod: {context.Request.Method}, Path: {context.Request.Path}";

        using (var bodyReader = new StreamReader(context.Request.Body))
        {
            bodyAsText = bodyReader.ReadToEnd();

            if (string.IsNullOrWhiteSpace(bodyAsText) == false)
            {
                requestLog += $", Body : {bodyAsText}";
            }

            var bytesToWrite = Encoding.UTF8.GetBytes(bodyAsText);
            injectedRequestStream.Write(bytesToWrite, 0, bytesToWrite.Length);
            injectedRequestStream.Seek(0, SeekOrigin.Begin);
            context.Request.Body = injectedRequestStream;
        }

        actionLog.Request = $"{bodyAsText}";
        actionLog.RequestURL = $"{request.Scheme} {request.Host}{request.Path} {request.QueryString}";

        return actionLog;
    }
    #endregion

    #region FormatResponse
    private async Task<string> FormatResponse(HttpResponse response)
    {
        response.Body.Seek(0, SeekOrigin.Begin);
        var text = await new StreamReader(response.Body).ReadToEndAsync();
        response.Body.Seek(0, SeekOrigin.Begin);

        return $"Response {text}";
    }
    #endregion

    #endregion

    #region PublicMethods

    #region Invoke
    /// <summary>
    /// Invoke - Hits before executing any action. Actions call executes from _next(context)
    /// </summary>
    /// <param name="context"></param>
    /// <returns></returns>
    public async Task Invoke(HttpContext context)
    {
        ActionLog actionLog = new ActionLog();

        actionLog = await FormatRequest(context.Request);


        var originalBodyStream = context.Response.Body;

        using (var responseBody = new MemoryStream())
        {
            context.Response.Body = responseBody;

            await _next(context);

            actionLog.Response = await FormatResponse(context.Response);

            await _logPublish.Publish(actionLog);
            await responseBody.CopyToAsync(originalBodyStream);
        }
    }
    #endregion

    #endregion
}

configure Middleware in startup

  public async void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        ............
        app.UseMiddleware<RequestResponseLoggingMiddleware>();
        ....................
    }

Is there any additional configuration in startup for MassTransit to work with Middle Ware

Edit

IActionLogPublish

public interface IActionLogPublish
{
    Task Publish(ActionLog model);
}

ActionLogPublish

public class ActionLogPublish : IActionLogPublish
{

    private readonly IBus _bus;

    public ActionLogPublish(IBus bus)
    {
        _bus = bus;
    }

    public async Task Publish(ActionLog actionLogData)
    {
        /* Publish values to RabbitMQ Service Bus */

        await _bus.Publish(actionLogData);

        /* Publish values to RabbitMQ Service Bus */
    }

}

Edit

RabbitMQ Web Console

enter image description here

Upvotes: 5

Views: 1854

Answers (2)

Andrii Litvinov
Andrii Litvinov

Reputation: 13192

It is hard to tell from the description what error you are getting exactly. The middleware implementation looks complicated and it can be a source of the error. I would guess that you don't set stream position correctly or something. Corrections from @Nkosi may actually fix it.

If you say that IBus works correctly from controllers, which are created per request, you may want to try to implement IMiddleware interface in your middleware as described in this doc.

public class RequestResponseLoggingMiddleware : IMiddleware
{
    IActionLogPublish logPublish;

    public RequestResponseLoggingMiddleware(IActionLogPublish logPublish)
    {
        this.logPublish = logPublish;
    }

    // ...

    public async Task InvokeAsync(HttpContext context, RequestDelegate next)
    {
        //...
    }

    //...
}

In this case middleware will be registered as scoped or transient service and resolved for every request, same as controller. Which may also fix your issue if it relates to scoped services resolution.

Upvotes: 2

Nkosi
Nkosi

Reputation: 247471

The middleware needs to put the original body back in the response.

Also the injected dependency works fine with controllers and not middleware as it may be registered with scoped lifetime.

In that case it should not be constructor injected into the middlewre but directly into the Invoke

Because middleware is constructed at app startup, not per-request, scoped lifetime services used by middleware constructors aren't shared with other dependency-injected types during each request. If you must share a scoped service between your middleware and other types, add these services to the Invoke method's signature. The Invoke method can accept additional parameters that are populated by DI:

//...omitted for brevity

public RequestResponseLoggingMiddleware(RequestDelegate next) {
    _next = next;
}

//...

private async Task<string> FormatResponseStream(Stream stream) {
    stream.Seek(0, SeekOrigin.Begin);
    var text = await new StreamReader(stream).ReadToEndAsync();
    stream.Seek(0, SeekOrigin.Begin);
    return $"Response {text}";
}

public async Task Invoke(HttpContext context, IActionLogPublish logger) {
    ActionLog actionLog = await FormatRequest(context.Request);
    //keep local copy of response stream
    var originalBodyStream = context.Response.Body;

    using (var responseBody = new MemoryStream()) {
        //replace stream for down stream calls
        context.Response.Body = responseBody;

        await _next(context);

        //put original stream back in the response object
        context.Response.Body = originalBodyStream; // <-- THIS IS IMPORTANT

        //Copy local stream to original stream
        responseBody.Position = 0;
        await responseBody.CopyToAsync(originalBodyStream);

        //custom logging
        actionLog.Response = await FormatResponse(responseBody);
        await logger.Publish(actionLog);
    }
}

Reference Dependency injection in ASP.NET Core: Scoped Service lifetime

When using a scoped service in a middleware, inject the service into the Invoke or InvokeAsync method. Don't inject via constructor injection because it forces the service to behave like a singleton. For more information, see Write custom ASP.NET Core middleware.

Emphasis mine

Upvotes: 2

Related Questions