KnowHoper
KnowHoper

Reputation: 4602

Adding Observers to already running MassTransit system

I am trying to add a microservice to a system that contains a MassTransit observer, which will observe request response or publish messages already being used in the system. I cannot redeploy the existing services easily so would prefer to avoid it if possible.

The following code only executes when the service starts, it does not execute when a message is sent.

                BusControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
                {
                    var host = cfg.Host(new Uri($"{settings.Protocol}://{settings.RabbitMqHost}/"), h =>
                    {
                        h.Username(settings.RabbitMqConsumerUser);
                        h.Password(settings.RabbitMqConsumerPassword);
                    });

                    cfg.ReceiveEndpoint(host, "pub_sub_flo", ec => { });

                    host.ConnectSendObserver(new RequestObserver());
                    host.ConnectPublishObserver(new RequestObserver());

                });

Observers:

 public class RequestObserver : ISendObserver, IPublishObserver
    {
        public Task PreSend<T>(SendContext<T> context) where T : class
        {
            return Task.CompletedTask;
        }

        public Task PostSend<T>(SendContext<T> context) where T : class
        {
            var proxy = new StoreProxyFactory().CreateProxy("fabric:/MessagePatterns");

            proxy.AddEvent(new ConsumerEvent()
            {
                Id = Guid.NewGuid(),
                ConsumerId = Guid.NewGuid(),
                Message = "AMQPRequestResponse",
                Date = DateTimeOffset.Now,
                Type = "Observer"
            }).Wait();

            return Task.CompletedTask;
        }

        public Task SendFault<T>(SendContext<T> context, Exception exception) where T : class
        {
            return Task.CompletedTask;
        }

        public Task PrePublish<T>(PublishContext<T> context) where T : class
        {
            return Task.CompletedTask;
        }

        public Task PostPublish<T>(PublishContext<T> context) where T : class
        {
            var proxy = new StoreProxyFactory().CreateProxy("fabric:/MessagePatterns");

            proxy.AddEvent(new ConsumerEvent()
            {
                Id = Guid.NewGuid(),
                ConsumerId = Guid.NewGuid(),
                Message = "AMQPRequestResponse",
                Date = DateTimeOffset.Now,
                Type = "Observer"
            }).Wait();

            return Task.CompletedTask;
        }

        public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
        {
            return Task.CompletedTask;
        }
    }

Can anyone help?

Many thanks in advance.

Upvotes: 1

Views: 1942

Answers (1)

Chris Patterson
Chris Patterson

Reputation: 33278

The observers are only called for messages sent, published, etc. on the bus instance to which they are attached. They will not observe messages sent or received by other bus instances.

If you want to observe those messages, you could create an observer queue and bind that queue to your service exchanges so that copies of the request messages are sent to your service. The replies, however, would not be easy to get since they're sent directly to the client queues via temporary exchanges.

cfg.ReceiveEndpoint(host, "service-observer", e =>
{
    e.Consumer<SomeConsumer>(...);
    e.Bind("service-endpoint");
});

This will bind the service endpoint exchange to your receive endpoint queue, so that copies of the messages are sent to your consumer.

This is commonly referred to as a wire tap.

Upvotes: 1

Related Questions