S.Sig
S.Sig

Reputation: 31

Python kafka consumer wont consume messages from producer

so I am fairly new to Kafka. I am attempted to run a simple Kafka consumer and producer. when I run my consumer it prints hello right before the for loop. But nothing ever prints in the for loop, leading me to believe it never enters the for loop in the first place and the consumer doesn't consume the messages from the producer. I am running this on a linux system.

Can anyone give advice on what could be wrong with the producer or consumer? I have displayed my producer and consumer code which are both only a few lines of code.

This is my producer:

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:2181',api_version=(1,0,1))
producer.send('MyFirstTopic1', 'Hello, World!')

This is my consumer:

from kafka import KafkaConsumer,KafkaProducer,TopicPartition,OffsetAndMetadata
consumer = KafkaConsumer(
 bootstrap_servers=['localhost:2181'],api_version=(1,0,1),
 group_id=None,
 enable_auto_commit=False,
 auto_offset_reset='smallest'
)
consumer.subscribe('MyFirstTopic1',0)
print("hello")
for message in consumer:
 print(message)

So when running my producer it eventually gives an error.Anyone know what this means and if this is possibly what is wrong.

File "producer.py", line 3, in <module>
    producer.send('MyFirstTopic1', 'Hello, World!')
  File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 543, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 664, in _wait_on_metadata
    "Failed to update metadata after %.1f secs." % max_wait)
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

Upvotes: 1

Views: 1149

Answers (1)

Mickael Maison
Mickael Maison

Reputation: 26865

It looks like you are using the wrong host in your client configurations. localhost:2181 is usually the Zookeeper server.

For your clients to work, you need to set bootstrap_servers to the Kafka broker hostname and port instead. This is localhost:9092 by default.

See https://kafka-python.readthedocs.io/en/latest/apidoc/KafkaProducer.html

Upvotes: 1

Related Questions