Natalie Perret
Natalie Perret

Reputation: 8997

RabbitMQ + .NET C#: Low and variable performances on message publishing + consuming

I'm trying to improve the performances of publishing and consuming messages with RabbitMQ + .NET C#.

I've defined the relatively "big" xml file (as an embedded resource) below:

<?xml version="1.0" encoding="UTF-8"?>
<Document>
  <CstmrCdtTrfInitn>
    <GrpHdr>
      <MsgId>ABC/090928/CCT001</MsgId>
      <CreDtTm>2009-09-28T14:07:00</CreDtTm>
      <NbOfTxs>3</NbOfTxs>
      <CtrlSum>11500000</CtrlSum>
      <InitgPty>
        <Nm>ABC Corporation</Nm>
        <PstlAdr>
          <StrtNm>Times Square</StrtNm>
          <BldgNb>7</BldgNb>
          <PstCd>NY 10036</PstCd>
          <TwnNm>New York</TwnNm>
          <Ctry>US</Ctry>
        </PstlAdr>
      </InitgPty>
    </GrpHdr>
    <PmtInf>
      <PmtInfId>ABC/086</PmtInfId>
      <PmtMtd>TRF</PmtMtd>
      <BtchBookg>false</BtchBookg>
      <ReqdExctnDt>2009-09-29</ReqdExctnDt>
      <Dbtr>
        <Nm>ABC Corporation</Nm>
        <PstlAdr>
          <StrtNm>Times Square</StrtNm>
          <BldgNb>7</BldgNb>
          <PstCd>NY 10036</PstCd>
          <TwnNm>New York</TwnNm>
          <Ctry>US</Ctry>
        </PstlAdr>
      </Dbtr>
      <DbtrAcct>
        <Id>
          <Othr>
            <Id>00125574999</Id>
          </Othr>
        </Id>
      </DbtrAcct>
      <DbtrAgt>
        <FinInstnId>
          <BIC>BBBBUS33</BIC>
        </FinInstnId>
      </DbtrAgt>
      <CdtTrfTxInf>
        <PmtId>
          <InstrId>ABC/090928/CCT001/01</InstrId>
          <EndToEndId>ABC/4562/2009-09-08</EndToEndId>
        </PmtId>
        <Amt>
          <InstdAmt Ccy="JPY">10000000</InstdAmt>
        </Amt>
        <ChrgBr>SHAR</ChrgBr>
        <CdtrAgt>
          <FinInstnId>
            <BIC>AAAAGB2L</BIC>
          </FinInstnId>
        </CdtrAgt>
        <Cdtr>
          <Nm>DEF Electronics</Nm>
          <PstlAdr>
            <AdrLine>Corn Exchange 5th Floor</AdrLine>
            <AdrLine>Mark Lane 55</AdrLine>
            <AdrLine>EC3R7NE London</AdrLine>
            <AdrLine>GB</AdrLine>
          </PstlAdr>
        </Cdtr>
        <CdtrAcct>
          <Id>
            <Othr>
              <Id>23683707994125</Id>
            </Othr>
          </Id>
        </CdtrAcct>
        <Purp>
          <Cd>CINV</Cd>
        </Purp>
        <RmtInf>
          <Strd>
            <RfrdDocInf>
              <Nb>4562</Nb>
              <RltdDt>2009-09-08</RltdDt>
            </RfrdDocInf>
          </Strd>
        </RmtInf>
      </CdtTrfTxInf>
      <CdtTrfTxInf>
        <PmtId>
          <InstrId>ABC/090628/CCT001/2</InstrId>
          <EndToEndId>ABC/ABC-13679/2009-09-15</EndToEndId>
        </PmtId>
        <Amt>
          <InstdAmt Ccy="EUR">500000</InstdAmt>
        </Amt>
        <ChrgBr>CRED</ChrgBr>
        <CdtrAgt>
          <FinInstnId>
            <BIC>DDDDBEBB</BIC>
          </FinInstnId>
        </CdtrAgt>
        <Cdtr>
          <Nm>GHI Semiconductors</Nm>
          <PstlAdr>
            <StrtNm>Avenue Brugmann</StrtNm>
            <BldgNb>415</BldgNb>
            <PstCd>1180</PstCd>
            <TwnNm>Brussels</TwnNm>
          </PstlAdr>
        </Cdtr>
        <CdtrAcct>
          <Id>
            <IBAN>BE30001216371411</IBAN>
          </Id>
        </CdtrAcct>
        <InstrForCdtrAgt>
          <Cd>PHOB</Cd>
          <InstrInf>+32/2/2222222</InstrInf>
        </InstrForCdtrAgt>
        <Purp>
          <Cd>GDDS</Cd>
        </Purp>
        <RmtInf>
          <Strd>
            <RfrdDocInf>
              <Tp>
                <CdOrPrtry>
                  <Cd>CINV</Cd>
                </CdOrPrtry>
              </Tp>
              <Nb>ABC-13679</Nb>
              <RltdDt>2009-09-15</RltdDt>
            </RfrdDocInf>
          </Strd>
        </RmtInf>
      </CdtTrfTxInf>
      <CdtTrfTxInf>
        <PmtId>
          <InstrId>ABC/090928/CCT001/3</InstrId>
          <EndToEndId>ABC/987-AC/2009-09-27</EndToEndId>
        </PmtId>
        <Amt>
          <InstdAmt Ccy="USD">1000000</InstdAmt>
        </Amt>
        <ChrgBr>SHAR</ChrgBr>
        <CdtrAgt>
          <FinInstnId>
            <BIC>BBBBUS66</BIC>
          </FinInstnId>
        </CdtrAgt>
        <Cdtr>
          <Nm>ABC Corporation</Nm>
          <PstlAdr>
            <Dept>Treasury department</Dept>
            <StrtNm>Bush Street</StrtNm>
            <BldgNb>13</BldgNb>
            <PstCd>CA 94108</PstCd>
            <TwnNm>San Francisco</TwnNm>
            <Ctry>US</Ctry>
          </PstlAdr>
        </Cdtr>
        <CdtrAcct>
          <Id>
            <Othr>
              <Id>4895623</Id>
            </Othr>
          </Id>
        </CdtrAcct>
        <Purp>
          <Cd>INTC</Cd>
        </Purp>
        <RmtInf>
          <Strd>
            <RfrdDocInf>
              <Tp>
                <CdOrPrtry>
                  <Cd>CINV</Cd>
                </CdOrPrtry>
              </Tp>
              <Nb>987-AC</Nb>
              <RltdDt>2009-09-27</RltdDt>
            </RfrdDocInf>
          </Strd>
        </RmtInf>
      </CdtTrfTxInf>
    </PmtInf>
  </CstmrCdtTrfInitn>
</Document>

Then I publish that file above about 500,000 times to the same queue, publishing and consuming happen in the same .NET Core C# program below:

.NET C# Code:

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Text;

using RabbitMQ.Client;
using RabbitMQ.Client.Events;


namespace CSharpPlayground
{
    public static class EmbeddedResource
    {
        public static string Load(string fileName)
        {
            var executingAssembly = Assembly.GetExecutingAssembly();
            var resource = $"{executingAssembly.ManifestModule.Name.Replace(".dll", string.Empty)}.{fileName}";
            using var stream = executingAssembly.GetManifestResourceStream(resource);
            using var streamReader = new StreamReader(stream!);
            return streamReader.ReadToEnd();
        }
    }

    public static class CheapExtensions
    {
        public static void ForEach<T>(this IEnumerable<T> source, Action<T> action)
        {
            foreach(var item in source)
            {
                action(item);
            }
        }
        public static void ForEach<T>(this IEnumerable<T> source) =>
            source.ForEach(item => {});
    }

    public static class Program
    {
        private static readonly string XmlStuff = EmbeddedResource.Load("XmlStuff.xml");

        public static void Main()
        {
            const string queueName = "hello";
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                DispatchConsumersAsync = true
            };
            using var connection = factory.CreateConnection();
            using var queueDeclareChannel = connection.CreateModel();

            queueDeclareChannel.QueueDeclare(
                queue: queueName,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);

            using var consumerChannel = connection.CreateModel();
            var consumer = new AsyncEventingBasicConsumer(consumerChannel);
            consumer.Received += async (sender, eventArgs) =>
            {
                var receivedBody = eventArgs.Body;
                var receivedMessage = Encoding.UTF8.GetString(receivedBody.ToArray());
                //Console.WriteLine($"[x] Received stuff");
            };
            consumerChannel.BasicConsume(
                queue: queueName,
                autoAck: true,
                consumer: consumer);

            var messageToSend = XmlStuff;
            var bodyToSend = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes(messageToSend));

            Enumerable
                .Repeat((messageToSend, bodyToSend), 500_000)
                .AsParallel()
                .Select((data, index) =>
                {
                    var (message, body) = data;
                    using var publishChannel = connection.CreateModel();
                    var basicProperties = publishChannel.CreateBasicProperties();
                    publishChannel.BasicPublish(
                        exchange: "",
                        routingKey: queueName,
                        mandatory: false,
                        basicProperties: basicProperties,
                        body: body);
                    //Console.WriteLine($"[x] Sent stuff"
                    return index;
                }).ForEach();

            Console.ReadKey();
        }
    }
}

I've tried to the extent possible to remove the Console.WriteLine calls to circumvent the bottleneck of pushing something to standard output.

It turns out that the performances seem rather low, and I'm wondering if the performances I have (in terms of message rates) are somehow "normal":

RabbitMQ Admin - Overview RabbitMQ Admin - Hello Queue

Upvotes: 0

Views: 500

Answers (1)

Jevon Kendon
Jevon Kendon

Reputation: 686

I'm not a RabbitMQ expert, but I do know the creation of a channel (CreateModel()) per message to publish is a RabbitMQ anti-pattern and will hurt performance. That's because every channel creation requires a network round-trip to the broker. The approach also risks exhausting the broker.

Unfortunately, many of the getting started with RabbitMQ blogs start this way, probably because it simplifies the problem. The problem, so to speak, is that a channel is not thread-safe.

If you wanted to write your own messaging framework on top of RabbitMQ then I would start with a single publish channel and use a lock to synchronize it.

If you wanted to build a real application, avoid re-inventing the wheel, and many headaches, I would take a look at a number of well supported open source frameworks, such as EasyNetQ or MassTransit.

Upvotes: 1

Related Questions