Reputation: 1325
I want to be able to consume data once from 2 different consumers (each consumer will get different messages) when using same topic
and same group-id
.
The producer:
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
dumps(x).encode('utf-8'))
for e in range(50):
data = {'number' : e}
print('Producer {}'.format(data))
producer.send('test', value=data)
sleep(2)
The first consumer code:
consumer = KafkaConsumer(
'test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
#auto_commit_interval_ms=100,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
message = message.value
print('[1] Consume {}'.format(message))
sleep(3)
The second consumer code:
consumer = KafkaConsumer(
'test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
#auto_commit_interval_ms=100,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
message = message.value
print('[2] Consume {}'.format(message))
sleep(5)
I'm expecting to see that some of the messages are consumed by consumer-1
and the other messages by consumer-2
(according to the sleep command in consumer
code)
But it seems that just one consumer is working and gets all the messages. (The first consumer is stuck, and the second consumer gets the messages).
what am I missing ?
Upvotes: 1
Views: 172
Reputation: 192023
From the description of the problem, I assume your topic only has one partition.
Your topic needs more than one partition if you want to run more than one consumer in a group
Upvotes: 3