rkSinghania
rkSinghania

Reputation: 29

Retrieve data from same partitions across multiple topics

In our current system we have 4 topics

Topic1(20),Topic2(20),Topic3(20),Topic4(20)

For producer side we have applied custom IDpartitioner which bacisally do ID%20 (20=numpartition)

public Integer partition(String topic, Object key, Object value, int numPartition) {
            try {
                log.info("topic:{} key:{} numPartition:{}", topic, key.toString(), numPartition);
                int currPartition = Integer.valueOf((String) key) % numPartition;
                if(currPartition < 0) {
                    log.info("Partition less than 0 for topic:{} key:{} currPartition:{} ", topic, key.toString(), currPartition);
                    currPartition = 1;
                }
                return currPartition;
            } catch(Exception e) {
               return 1;
            }
        }

For consumer side We have 2 consumers in our consumer group cloud-consumer on ECS cluster. with below configs. Each consumer is listening to all 4 topics.

  @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
        return new DefaultKafkaConsumerFactory<>(props);
    }

Now the problem is we are not receiving all messages with same partition on same consumer from different topics.

Tried checking partition assignment also below is the result

ConsumerRebalanceListener myConsumerRebalanceListener(){onPartitionsAssigned()}

Consumer 1 > Topic 1- (0-9) ,Topic 2-(0-9) ,Topic 3- (10-19) ,Topic 4-> (10-19)
Consumer 2> Topic 1- (10-19) ,Topic 2-(10-19) ,Topic 3-> (0-9) ,Topic 4- (0-9)

Now as you can see for Partition 5 we received data on both consumers from topic1 and topic3

Upvotes: 1

Views: 483

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191874

It's working as expected. There's no overlapping partitions being consumed within the same consumer group. You cannot have both consumer 1 and 2 both reading "partition 5" of the same topic, if they are in the same group.

Partition assignment will not be consistent across multiple topics unless you manually assign them to the consumers. Consumers have no knowledge of your producer's partitioning scheme, and ideally shouldn't.

You will have per-message ordering within each partition, though, and each deployed consumer should only care about that, rather than reading specific keys from specific partitions; this will help you scale better and handle consumer group rebalances

As mentioned in the comments, perhaps you can use Kafka Streams instead and perform a groupByKey operation to shuffle the data across your consumer instances

Upvotes: 1

Related Questions