Kadir Akin
Kadir Akin

Reputation: 101

Producer cant distrubute messages to multiple consumers?

I am trying to auto scale java spring framework kafka consumer with docker-compose with --scale flag and forwarding ports in docker-compose.yaml like "8070-8072:8070". So when i trigger endpoint for publishing messages, it does well. But in consumer side just 1 consumer consumes all messages. I have 3 consumer with same group-id and different client-id. What i want is a distributed messaging. I read some paper about partitioning and i looked up my logs. It seems there just 1 partition. Is it the reason? How can i achieve this? I'll add the log, consumer config, publisher config and docker-compose file. Firstly the log. It seems just 3 of 1 have partition.

command:

docker-compose up --build --scale nonwebapp=3 --scale webapp=3

docker-compose.yaml

kafka:
        image: confluentinc/cp-kafka:latest
        depends_on:
            - zookeeper
        ports:
            - 9092:9092
        environment:
            KAFKA_LOG_DIRS: "/kafka/kafka-logs"
            KAFKA_BROKER_ID: 1
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
            KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
            KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
        volumes:
            - /var/run/docker.sock:/var/run/docker.sock
        restart: always
            
    webapp:
        build: benchmark.web
        command: bash -c "while !</dev/tcp/kafka/29092; do >&2 echo 'Waiting for kafka to up...' sleep 1; done;"
        ports: 
            - "8070-8072:8070"
        volumes:
            - ./logs:/logs
            - ${GCP_KEY_PATH}:/root/keys/keyfile.json:ro
        depends_on:
          - kafka
        environment:
          SPRING_KAFKA_BOOTSTRAPSERVERS: kafka:29092
          TZ: "Asia/Istanbul"
          GOOGLE_APPLICATION_PROJECT_ID: apt-cubist-282712
          GOOGLE_APPLICATION_CREDENTIALS: /root/keys/keyfile.json
        restart: always
            
            
    nonwebapp:
        build: benchmark.nonweb
        command: bash -c "while !</dev/tcp/kafka/29092; do >&2 echo 'Waiting for kafka to up...' sleep 1; done;"
        depends_on:
          - kafka
        volumes:
          - ${GCP_KEY_PATH}:/root/keys/keyfile.json:ro
        ports:
          - "8060-8062:8060"
        environment:
          SPRING_KAFKA_BOOTSTRAPSERVERS: kafka:29092
          GOOGLE_APPLICATION_PROJECT_ID: apt-cubist-282712
          GOOGLE_APPLICATION_CREDENTIALS: /root/keys/keyfile.json
          TZ: "Asia/Istanbul"
        restart: always

producer config

@Bean
    ProducerFactory<String, byte[]> producerFactory(){
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                kafkaInContainerAdress);

    /*
        configProps.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                kafkaInLocal);
*/
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class
        );
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                ByteArraySerializer.class
        );
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String,byte[]> kafkaTemplate(){

        return new KafkaTemplate<>(producerFactory());
    }

consumer config

 @Bean
    public ConsumerFactory<String, byte[]> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
       props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                kafkaInContainerAdress);
       /*
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                kafkaInLocal);
*/
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                "org.apache.kafka.clients.consumer.RoundRobinAssignor");
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, r.nextInt());
        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                ByteArrayDeserializer.class);
        props.put("group.id","topic_trial_consumers");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, byte[]>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

Upvotes: 0

Views: 259

Answers (1)

JavaTechnical
JavaTechnical

Reputation: 9357

It seems there just 1 partition. Is it the reason?

Yes, if there is only one partition, only one consumer (from the consumer group) can consume it and other consumers (from the same group) will be idle even they are started.

It seems just 3 of 1 have partition

From your image, I can see that the topic_trial-0. So it is the topic_trial's 1st partition.

Increase the no. of partitions, to 3 for example and start three consumers with same group.id and the load should get distributed (one partition for each).

Upvotes: 2

Related Questions