Reputation: 101
I am trying to auto scale java spring framework kafka consumer with docker-compose with --scale
flag and forwarding ports in docker-compose.yaml like "8070-8072:8070". So when i trigger endpoint for publishing messages, it does well. But in consumer side just 1 consumer consumes all messages. I have 3 consumer with same group-id and different client-id. What i want is a distributed messaging. I read some paper about partitioning and i looked up my logs. It seems there just 1 partition. Is it the reason? How can i achieve this? I'll add the log, consumer config, publisher config and docker-compose file. Firstly the log. It seems just 3 of 1 have partition.
command:
docker-compose up --build --scale nonwebapp=3 --scale webapp=3
docker-compose.yaml
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_LOG_DIRS: "/kafka/kafka-logs"
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
restart: always
webapp:
build: benchmark.web
command: bash -c "while !</dev/tcp/kafka/29092; do >&2 echo 'Waiting for kafka to up...' sleep 1; done;"
ports:
- "8070-8072:8070"
volumes:
- ./logs:/logs
- ${GCP_KEY_PATH}:/root/keys/keyfile.json:ro
depends_on:
- kafka
environment:
SPRING_KAFKA_BOOTSTRAPSERVERS: kafka:29092
TZ: "Asia/Istanbul"
GOOGLE_APPLICATION_PROJECT_ID: apt-cubist-282712
GOOGLE_APPLICATION_CREDENTIALS: /root/keys/keyfile.json
restart: always
nonwebapp:
build: benchmark.nonweb
command: bash -c "while !</dev/tcp/kafka/29092; do >&2 echo 'Waiting for kafka to up...' sleep 1; done;"
depends_on:
- kafka
volumes:
- ${GCP_KEY_PATH}:/root/keys/keyfile.json:ro
ports:
- "8060-8062:8060"
environment:
SPRING_KAFKA_BOOTSTRAPSERVERS: kafka:29092
GOOGLE_APPLICATION_PROJECT_ID: apt-cubist-282712
GOOGLE_APPLICATION_CREDENTIALS: /root/keys/keyfile.json
TZ: "Asia/Istanbul"
restart: always
producer config
@Bean
ProducerFactory<String, byte[]> producerFactory(){
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaInContainerAdress);
/*
configProps.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaInLocal);
*/
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class
);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class
);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String,byte[]> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
consumer config
@Bean
public ConsumerFactory<String, byte[]> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaInContainerAdress);
/*
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaInLocal);
*/
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.RoundRobinAssignor");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, r.nextInt());
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
props.put("group.id","topic_trial_consumers");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
Upvotes: 0
Views: 259
Reputation: 9357
It seems there just 1 partition. Is it the reason?
Yes, if there is only one partition, only one consumer (from the consumer group) can consume it and other consumers (from the same group) will be idle even they are started.
It seems just 3 of 1 have partition
From your image, I can see that the topic_trial-0
. So it is the topic_trial
's 1st partition.
Increase the no. of partitions, to 3 for example and start three consumers with same group.id
and the load should get distributed (one partition for each).
Upvotes: 2