Kotesh Nataru
Kotesh Nataru

Reputation: 11

Not receiving the messages at aws MSK consumer end

I am able to send the data by using below code in python and not receiving the message at consumer end. I have enabled all the authentications in aws msk cluster. I am able to send and received with kafka commands. i need same thing through a python. I have tried with unauthorized mode and SASL_SSL protocol. both able to send but not received. Please find the below code and suggest me to resolve the issue.

Producer code:

from confluent_kafka import Producer 
from datetime import datetime 
from time import strftime 
import json

bootstrap_servers = 'b-1.xxxxxxxxxxxxxxx.amazonaws.com:9096,b-2.xxxxxxxxxxxxxxxxxxxx.amazonaws.com:9096' producer = Producer({ 'bootstrap.servers': bootstrap_servers, 'security.protocol': 'SASL_SSL', 'sasl.username': 'cccccc', 'sasl.password': 'ccccccccccccccc', 'sasl.mechanism': 'SCRAM-SHA-512' })

data = { 'message': 'hello world', 'timestamp': datetime.now().strftime("%m/%d/%Y %H:%M:%S") } #print(producer.bootstrap_connected()) producer.produce('testTopic1', json.dumps(data).encode('utf-8')) print('message sent') producer.flush()

Consumer code:

from confluent_kafka import Consumer 
from datetime import datetime 
from time import strftime 
import json

bootstrap_servers = 'b-1.xxxxxxxxxxxxxxxxxx.amazonaws.com:9096,b-2.xxxxxxxxxxxxxxxxxxxx.amazonaws.com:9096' 
consumer = Consumer({ 'bootstrap.servers': bootstrap_servers, 'security.protocol': 'SASL_SSL', 'sasl.username': 'cccccccccc', 'sasl.password': 'cccccccccccccc', 'sasl.mechanism': 'SCRAM-SHA-512' })

print('start reading') 
consumer.subscribe(['testTopic1'])

while True: 
   msg = consumer.poll(timeout=1.0) 
     print(msg)
     if msg is None: 
        continue

Upvotes: 1

Views: 405

Answers (1)

JayTee
JayTee

Reputation: 1239

Consumers always need to set a consumer group as this is what the consumer offset for a partition is stored under.

You need to add 'group.id': 'myconsumergroup' to your consumer config.

As you add more consumers to the topic they should use the same group.id as this is how kafka will load balance traffic across the consumers within the same group.

Recommend you also set 'auto.offset.reset': 'earliest' or latest to determine the default consumer behaviour (on first run); this determines if your consumer should start from the beginning or latest message.

Upvotes: 0

Related Questions