Reputation: 11
I am able to send the data by using below code in python and not receiving the message at consumer end. I have enabled all the authentications in aws msk cluster. I am able to send and received with kafka commands. i need same thing through a python. I have tried with unauthorized mode and SASL_SSL protocol. both able to send but not received. Please find the below code and suggest me to resolve the issue.
Producer code:
from confluent_kafka import Producer
from datetime import datetime
from time import strftime
import json
bootstrap_servers = 'b-1.xxxxxxxxxxxxxxx.amazonaws.com:9096,b-2.xxxxxxxxxxxxxxxxxxxx.amazonaws.com:9096' producer = Producer({ 'bootstrap.servers': bootstrap_servers, 'security.protocol': 'SASL_SSL', 'sasl.username': 'cccccc', 'sasl.password': 'ccccccccccccccc', 'sasl.mechanism': 'SCRAM-SHA-512' })
data = { 'message': 'hello world', 'timestamp': datetime.now().strftime("%m/%d/%Y %H:%M:%S") } #print(producer.bootstrap_connected()) producer.produce('testTopic1', json.dumps(data).encode('utf-8')) print('message sent') producer.flush()
Consumer code:
from confluent_kafka import Consumer
from datetime import datetime
from time import strftime
import json
bootstrap_servers = 'b-1.xxxxxxxxxxxxxxxxxx.amazonaws.com:9096,b-2.xxxxxxxxxxxxxxxxxxxx.amazonaws.com:9096'
consumer = Consumer({ 'bootstrap.servers': bootstrap_servers, 'security.protocol': 'SASL_SSL', 'sasl.username': 'cccccccccc', 'sasl.password': 'cccccccccccccc', 'sasl.mechanism': 'SCRAM-SHA-512' })
print('start reading')
consumer.subscribe(['testTopic1'])
while True:
msg = consumer.poll(timeout=1.0)
print(msg)
if msg is None:
continue
Upvotes: 1
Views: 405
Reputation: 1239
Consumers always need to set a consumer group as this is what the consumer offset for a partition is stored under.
You need to add 'group.id': 'myconsumergroup'
to your consumer config.
As you add more consumers to the topic they should use the same group.id as this is how kafka will load balance traffic across the consumers within the same group.
Recommend you also set 'auto.offset.reset': 'earliest'
or latest to determine the default consumer behaviour (on first run); this determines if your consumer should start from the beginning or latest message.
Upvotes: 0