Karthik Raj
Karthik Raj

Reputation: 261

kafka-python consumer not receiving messages

I am having trouble with KafaConsumer to make it read from the beginning, or from any other explicit offset.

Running the command line tools for the consumer for the same topic , I do see messages with the --from-beginning option and it hangs otherwise

$ ./kafka-console-consumer.sh --zookeeper {localhost:port} --topic {topic_name} --from-beginning

If I run it through python, it hangs, which I suspect to be caused by incorrect consumer configs

consumer = KafkaConsumer(topic_name,
                     bootstrap_servers=['localhost:9092'],
                     group_id=None,
                     auto_commit_enable=False,
                     auto_offset_reset='smallest')

print "Consuming messages from the given topic"
for message in consumer:
    print "Message", message
    if message is not None:
        print message.offset, message.value

print "Quit"

Output:

Consuming messages from the given topic (hangs after that)

I am using kafka-python 0.9.5 and the broker runs kafka 8.2. Not sure what the exact problem is.

Set _group_id=None_ as suggested by dpkp to emulate the behavior of console consumer.

Upvotes: 17

Views: 30372

Answers (7)

Charalamm
Charalamm

Reputation: 1947

For me, I had to specify the router's IP in the kafka PLAINTEXT configuration.

Get the router's IP with:

echo $(ifconfig | grep -E "([0-9]{1,3}\.){3}[0-9]{1,3}" | grep -v 127.0.0.1 | awk '{ print $2 }' | cut -f2 -d: | head -n1)

and then add PLAINTEXT_HOST://<touter_ip>:9092 to the kafka advertised listeners. In case of a confluent docker service the configuration is as follows:

   kafka:
    image: confluentinc/cp-kafka:7.0.1
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
      - 29092:29092
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://172.28.0.1:9092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1

and finally the python consumer is:

from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['172.28.0.1:9092'],
    auto_offset_reset = 'earliest',
    group_id=None,
)

print('Listening')
for msg in consumer:
    print(msg)

Upvotes: 1

HGF
HGF

Reputation: 448

I faced the same issue before, so I ran kafka-topics locally at the machine running the code to test and I got UnknownHostException. I added the IP and the host name in hosts file and it worked fine in both kafka-topics and the code. It seems that KafkaConsumer was trying to fetch the messages but failed without raising any exceptions.

Upvotes: 0

幽幽子的筷子
幽幽子的筷子

Reputation: 192

I ran into the same problem: I can recieve in kafka console but can't get message with python script using package kafka-python.

Finally I figure the reason is that I didn't call producer.flush() and producer.close() in my producer.py which is not mentioned in its documentation .

Upvotes: 9

Sida Zhou
Sida Zhou

Reputation: 3715

My take is: to print and ensure offset is what you expect it to be. By using position() and seek_to_beginning(), please see comments in the code.

I can't explain:

  1. Why after instantiating KafkaConsumer, the partitions are not assigned, is this by design? Hack around is to call poll() once before seek_to_beginning()
  2. Why sometimes after seek_to_beginning(), first call to poll() returns no data and doesnt change the offset.

Code:

import kafka
print(kafka.__version__)
from kafka import KafkaProducer, KafkaConsumer
from time import sleep
KAFKA_URL = 'localhost:9092' # kafka broker
KAFKA_TOPIC = 'sida3_sdtest_topic' # topic name

# ASSUMING THAT the topic exist

# write to the topic
producer = KafkaProducer(bootstrap_servers=[KAFKA_URL])
for i in range(20):
    producer.send(KAFKA_TOPIC, ('msg' + str(i)).encode() )
producer.flush()

# read from the topic
# auto_offset_reset='earliest', # auto_offset_reset is needed when offset is not found, it's NOT what we need here
consumer = KafkaConsumer(KAFKA_TOPIC,
bootstrap_servers=[KAFKA_URL],
max_poll_records=2,
group_id='sida3'
)

# (!?) wtf, why we need this to get partitions assigned
# AssertionError: No partitions are currently assigned if poll() is not called
consumer.poll()
consumer.seek_to_beginning()

# also AssertionError: No partitions are currently assigned if poll() is not called
print('partitions of the topic: ',consumer.partitions_for_topic(KAFKA_TOPIC))

from kafka import TopicPartition
print('before poll() x2: ')
print(consumer.position(TopicPartition(KAFKA_TOPIC, 0)))
print(consumer.position(TopicPartition(KAFKA_TOPIC, 1)))

# (!?) sometimes the first call to poll() returns nothing and doesnt change the offset
messages = consumer.poll()
sleep(1)
messages = consumer.poll()

print('after poll() x2: ')
print(consumer.position(TopicPartition(KAFKA_TOPIC, 0)))
print(consumer.position(TopicPartition(KAFKA_TOPIC, 1)))

print('messages: ', messages)

Output:

2.0.1
partitions of the topic:  {0, 1}
before poll() x2: 
0
0
after poll() x2: 
0
2
messages:  {TopicPartition(topic='sida3_sdtest_topic', partition=1): [ConsumerRecord(topic='sida3_sdtest_topic', partition=1, offset=0, timestamp=1600335075864, timestamp_type=0, key=None, value=b'msg0', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1), ConsumerRecord(topic='sida3_sdtest_topic', partition=1, offset=1, timestamp=1600335075864, timestamp_type=0, key=None, value=b'msg1', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1)]}

Upvotes: 1

techkuz
techkuz

Reputation: 3981

auto_offset_reset='earliest' and group_id=None solved it for me.

Upvotes: 7

Pavel Lisiza
Pavel Lisiza

Reputation: 131

auto_offset_reset='earliest' solved it for me.

Upvotes: 10

dpkp
dpkp

Reputation: 1459

The difference between the console-consumer and the python consumer code you have posted is the python consumer uses a consumer group to save offsets: group_id="test-consumer-group" . If instead you set group_id=None, you should see the same behavior as the console consumer.

Upvotes: 15

Related Questions