AnonymousJoe
AnonymousJoe

Reputation: 31

How to add multiple consumers to consume from kafka stream?

I have recently started working on a Kafka Streaming Application using .NET core. I have followed the tutorial: https://medium.com/@srigumm/building-realtime-streaming-applications-using-net-core-and-kafka-ad45ed081b31.

I have built a basic producer-consumer application in which the producer takes input data and pushes it into a kafka-topic. A consumer can subscribe to the topic and consume data from it. I am also able to push this data into another topic by using a new producer. But what I am unable to do is initialise multiple consumers to consume from the same topic.

In appsettings.json:

  "consumer": {
    "bootstrapservers": "localhost:9092", //specify your kafka broker address
    "groupid": "csharp-consumer",
    "enableautocommit": true,
    "statisticsintervalms": 5000,
    "sessiontimeoutms": 6000,
    "autooffsetreset": 0,
    "enablepartitioneof": true,
    "SaslMechanism": 0, //0 for GSSAPI
    //"SaslKerberosKeytab":"filename.keytab", //specify your keytab file here
    "SaslKerberosPrincipal": "[email protected]", //specify your alias here
    "SaslKerberosServiceName": "kafka"
    //"SaslKerberosKinitCmd":"kinit -k -t %{sasl.kerberos.keytab} %{sasl.kerberos.principal}"
  },

processOrderServices.cs:

namespace Api.Services
{

    public class ProcessOrdersService : BackgroundService
    {
        private readonly ConsumerConfig consumerConfig;
        private readonly ProducerConfig producerConfig;
        //----------------------
        //private readonly ConsumerConfig consumerConfig2;
        public ProcessOrdersService(ConsumerConfig consumerConfig, ProducerConfig producerConfig)
        {
            this.producerConfig = producerConfig;
            this.consumerConfig = consumerConfig;
            //----------------------
            //this.consumerConfig2 = consumerConfig;
        }
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            Console.WriteLine("OrderProcessing Service Started\n\n");

            while (!stoppingToken.IsCancellationRequested)
            {
                var consumerHelper = new ConsumerWrapper(consumerConfig, "orderrequests");
                //var consumerHelper2 = new ConsumerWrapper(consumerConfig, "orderrequests");
                string orderRequest = consumerHelper.readMessage();
                //consumerHelper2.DisplayMessage();

                //Deserilaize 
                OrderRequest order = JsonConvert.DeserializeObject<OrderRequest>(orderRequest);
                //TODO:: Process Order
                Console.WriteLine($"Info: OrderHandler => Processing the order for {order.productname}\n\n");
                order.status = OrderStatus.COMPLETED;

                //Write to ReadyToShip Queue

                var producerWrapper = new ProducerWrapper(producerConfig,"readytoship");
                await producerWrapper.writeMessage(JsonConvert.SerializeObject(order));
                //--------------------------
               // var consumerHelper2 = new ConsumerWrapper(consumerConfig2, "orderrequests");
                //string processedOrder = consumerHelper2.readMessage();
                //OrderRequest order2 = JsonConvert.DeserializeObject<OrderRequest>(processedOrder);
                //Console.WriteLine($"Info: OrderHandler => Delivered the order for {order2.productname}\n\n");
                //order2.status = OrderStatus.DELIVERED;
                //----------------------------
            }
        }
    }
} 

ConsumerWrapper.cs:

namespace Api
{

    public class ConsumerWrapper
    {
        private string _topicName;
        private ConsumerConfig _consumerConfig;
        private Consumer<string,string> _consumer;
        private static readonly Random rand = new Random();
        public ConsumerWrapper(ConsumerConfig config,string topicName)
        {
            this._topicName = topicName;
            this._consumerConfig = config;
            this._consumer = new Consumer<string,string>(this._consumerConfig);
            this._consumer.Subscribe(topicName);
        }
        public string readMessage(){
            var consumeResult = this._consumer.Consume();
            return consumeResult.Value;
        }
        public void DisplayMessage()
        {
            var consumeResult = this._consumer.Consume();
            Console.WriteLine(consumeResult.Value);
            Console.WriteLine($"Info: OrderHandler => Delivered the order for {consumeResult.Value}\n\n");
            return;
        }
    }
} 

I want to be able to call the Consumer class multiple times and be able to read from the same topic. I have understood that there is a requirement to create multiple partitions/group-ids in order to be able to do that. But I am unable to figure out where and how to do that.

Upvotes: 1

Views: 4621

Answers (1)

Sandhya
Sandhya

Reputation: 108

You can do this by using group id concept in Kafka, just use same group id for multiple consumers to avoid duplicate consumption of the data from the same topic.

Upvotes: 1

Related Questions