AKZ
AKZ

Reputation: 876

How can I create Multiconsumer for single queue on apache kafka?

Hi my scenario is having a queue that a lot of resources put messages into that queue and a lot of consumers read messages and act specific job.

for this scenario I create a topic in Kafka with this command

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

now I develop a java class to consume it

public class ConsumerGroupExample {

    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;

    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
        try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            System.out.println("Interrupted during shutdown, exiting uncleanly");
        }
   }

    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);

        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");

        props.put("group.id", a_groupId);
        props.put("num.consumer.fetchers", "2");
        props.put("partition.assignment.strategy", "roundrobin");
        props.put("auto.commit.interval.ms", "1000");

        return new ConsumerConfig(props);
    }

    public static void main(String[] args) {

        String zooKeeper = "tls.navaco.local:2181";
        String groupId = "group1";
        String topic = "test1";
        int threads = 4;

        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
        example.run(threads);

        while (true) {}

    }

and another java class for produce messages

public class TestProducer {

    public static void main(String[] args) {
        Properties props = new Properties();

        props.put("metadata.broker.list", "tls.navaco.local:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);

        Producer p = new Producer<String, String>(config);

        //sending...
        String topic = "test1";
        String message = "Hello Kafka";
        for (int i = 0; i < 1000; i++) {
            KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic, message + i);
            p.send(keyedMessage);           
        }
    }

}

, and as the Apache document said if the topic wants to act as a queue, the consumer should have the same group.id, and I did it, but when I run 2, 3 or event more consumers, just one of them get the messages and the others didn't do anything.

In fact I want a queue, and the ordering it's not important for me, the thing that important for me is each message just and just consume by one consumer.

I wonder that is it possible to implement it in Kafka or Should I use another product such as ActiveMQ, HornetMQ, .....?

Upvotes: 0

Views: 1265

Answers (1)

Heejin
Heejin

Reputation: 4581

The number of partitions is the unit of parallelism in Kafka. Since you have only one partition in the topic, only one consumer in the group can consume messages of the topic. If you want three consumers in the group to consume the messages at the same time, you must increase the number of partitions to 3 or more. For more information, refer to Consumers in Kafka documentation.

Upvotes: 4

Related Questions