Reputation: 31
so I am fairly new to Kafka. I am attempted to run a simple Kafka consumer and producer. when I run my consumer it prints hello right before the for loop. But nothing ever prints in the for loop, leading me to believe it never enters the for loop in the first place and the consumer doesn't consume the messages from the producer. I am running this on a linux system.
Can anyone give advice on what could be wrong with the producer or consumer? I have displayed my producer and consumer code which are both only a few lines of code.
This is my producer:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:2181',api_version=(1,0,1))
producer.send('MyFirstTopic1', 'Hello, World!')
This is my consumer:
from kafka import KafkaConsumer,KafkaProducer,TopicPartition,OffsetAndMetadata
consumer = KafkaConsumer(
bootstrap_servers=['localhost:2181'],api_version=(1,0,1),
group_id=None,
enable_auto_commit=False,
auto_offset_reset='smallest'
)
consumer.subscribe('MyFirstTopic1',0)
print("hello")
for message in consumer:
print(message)
So when running my producer it eventually gives an error.Anyone know what this means and if this is possibly what is wrong.
File "producer.py", line 3, in <module>
producer.send('MyFirstTopic1', 'Hello, World!')
File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 543, in send
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 664, in _wait_on_metadata
"Failed to update metadata after %.1f secs." % max_wait)
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
Upvotes: 1
Views: 1149
Reputation: 26865
It looks like you are using the wrong host in your client configurations. localhost:2181
is usually the Zookeeper server.
For your clients to work, you need to set bootstrap_servers
to the Kafka broker hostname and port instead. This is localhost:9092
by default.
See https://kafka-python.readthedocs.io/en/latest/apidoc/KafkaProducer.html
Upvotes: 1