cksanjose
cksanjose

Reputation: 461

MassTransit consume non MassTransit message

I have a console app that is publishing messages to a RabbitMQ exchange. Is it possible for a subscriber that is built with MassTransit to consume this message?

This is the publisher code:

    public virtual void Send(LogEntryMessage message)
    {

        using (var connection = _factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            var props = channel.CreateBasicProperties();
            props.CorrelationId = Guid.NewGuid().ToString();

            var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));

            channel.BasicPublish(exchange: _endpointConfiguration.Exchange, routingKey: _endpointConfiguration.RoutingKey, basicProperties: null,
                body: body);
        }
    }

This is the subscriber code:

      IBusControl ConfigureBus()
      {
        return Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(new Uri("rabbitmq://localhost"), h =>
            {
                h.Username(username);
                h.Password(password);
            });

            cfg.ReceiveEndpoint(host, "LogEntryQueue", e =>
            {
                e.Handler<LogEntryMessage>(context =>
                Console.Out.WriteLineAsync($"Value was entered: {context.Message.MessageBody}"));
            });
        });
    }

This is the consumer code:

    public class LogEntryMessageProcessor : IConsumer<LogEntryMessage>
    {
        public Task Consume(ConsumeContext<LogEntryMessage> context)
        {
            Console.Out.WriteLineAsync($"Value was entered: 
                      {context.Message.Message.MessageBody}");
            return Task.FromResult(0);
        }
    }

Upvotes: 0

Views: 2841

Answers (2)

cksanjose
cksanjose

Reputation: 461

For MassTransit to process a message that is published by non-MassTransit client, the message has to contain the metadata required by MassTransit as described in the Interoperability page. The consumer of the message has to process the payload of the message. In the code below, the payload is LogEntryPayload:

public class LogEntryMessageProcessor : IConsumer<LogEntryPayload>
{
    public Task Consume(ConsumeContext<LogEntryPayload> context)
    {
        //var payload = context.GetPayload<LogEntryPayload>();
        Console.Out.WriteLineAsync($"Value was entered: {context.Message.Id} - {context.Message.MessageBody}");
        return Task.FromResult(0);
    }
}

Upvotes: 1

Alexey Zimarev
Alexey Zimarev

Reputation: 19610

I hope you can get the answer in the Interoperability section, in particular look at the example message.

Basically, you need to construct a JSON object according to some simple rules.

Example message looks like this:

{
    "destinationAddress": "rabbitmq://localhost/input_queue",
    "headers": {},
    "message": {
        "value": "Some Value",
        "customerId": 27
    },
    "messageType": [
        "urn:message:MassTransit.Tests:ValueMessage"
    ]
}

You can easily check how more complex messages look like by creating both publisher and consumer, run the program in order to create bindings, then stop the consumer and publish some messages. They will be in the subscriber queue so you can easily read them using the management plugin.

Upvotes: 1

Related Questions