user3335999
user3335999

Reputation: 421

Kafka - not all consumers receive subscribed message

To generically publish messages using Kafka, I'm using the class name as the topic:

kafkaProducer.send(new ProducerRecord(object.getClass().getName(), new DomainObjectAdapter(object).toJsonString()));

And the consumers subscribe to the classes they are interested in:

    for(Object sub:_subscriptions)
        topics.add(sub.getClass().getName());
    _kafkaConsumer.subscribe(topics);

The problem is, only one of the consumers ever receive the subscribed to message. My understanding is that kafka will assign a unique partition to each subscriber (if available). I currently only have 2 subscribers and my kafka server.properties specified 4 partitions. It appears that all consumers are reading from the same partition. Perhaps Kafka is a poor choice for a service bus due to this apparent limitation. Any help would be much appreciated!

Kafka consumer properties:

    properties.put("bootstrap.servers", _settings.getEndpoint());
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("enable.auto.commit", "false");
    properties.put("group.id", "TestGroup");
    properties.put("auto.offset.reset","earliest");

Kafka producer properties:

    properties.put("bootstrap.servers",_settings.getEndpoint());
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Server properties (the only thing I changed from the default properties):

num.partitions=4

Note: I've also tried consumer settings as:

    properties.put("bootstrap.servers", _settings.getEndpoint());
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("auto.commit.interval.ms","1000");
    properties.put("enable.auto.commit", "true");
    properties.put("group.id", "testGroup");
    properties.put("auto.offset.reset","latest");
    properties.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");

Upvotes: 1

Views: 2713

Answers (2)

Katya Gorshkova
Katya Gorshkova

Reputation: 1561

If all your consumers have the same consumer group (group.id property) then only one consumer from the group will receive the message. If you want all the consumers to receive the message, they need to have different group.id.

To check which consumers are bound to the partitions of the topic, you can use the following command

./bin/kafka-consumer-groups.sh --bootstrap-server yourhost:9092 --group testGroup --describe

Upvotes: 5

senseiwu
senseiwu

Reputation: 5259

Kafka by default uses RangeAssignor as partition assignment strategy, which has the following characteristics:

The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumers in lexicographic order. We then divide the number of partitions by the total number of consumers to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. The assignment will be: C0: [t0p0, t0p1, t1p0, t1p1] C1: [t0p2, t1p2]

If you want more even distribution for small number of partitions, you could use RoundRobinAssignor by setting partition.assignment.strategy

Upvotes: 1

Related Questions