Reputation: 261
I am having trouble with KafaConsumer
to make it read from the beginning, or from any other explicit offset.
Running the command line tools for the consumer for the same topic , I do see messages with the --from-beginning
option and it hangs otherwise
$ ./kafka-console-consumer.sh --zookeeper {localhost:port} --topic {topic_name} --from-beginning
If I run it through python, it hangs, which I suspect to be caused by incorrect consumer configs
consumer = KafkaConsumer(topic_name,
bootstrap_servers=['localhost:9092'],
group_id=None,
auto_commit_enable=False,
auto_offset_reset='smallest')
print "Consuming messages from the given topic"
for message in consumer:
print "Message", message
if message is not None:
print message.offset, message.value
print "Quit"
Consuming messages from the given topic (hangs after that)
I am using kafka-python 0.9.5 and the broker runs kafka 8.2. Not sure what the exact problem is.
Set _group_id=None_ as suggested by dpkp to emulate the behavior of console consumer.
Upvotes: 17
Views: 30372
Reputation: 1947
For me, I had to specify the router's IP in the kafka PLAINTEXT configuration.
Get the router's IP with:
echo $(ifconfig | grep -E "([0-9]{1,3}\.){3}[0-9]{1,3}" | grep -v 127.0.0.1 | awk '{ print $2 }' | cut -f2 -d: | head -n1)
and then add PLAINTEXT_HOST://<touter_ip>:9092
to the kafka advertised listeners. In case of a confluent docker service the configuration is as follows:
kafka:
image: confluentinc/cp-kafka:7.0.1
container_name: kafka
depends_on:
- zookeeper
ports:
- 9092:9092
- 29092:29092
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://172.28.0.1:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
and finally the python consumer is:
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['172.28.0.1:9092'],
auto_offset_reset = 'earliest',
group_id=None,
)
print('Listening')
for msg in consumer:
print(msg)
Upvotes: 1
Reputation: 448
I faced the same issue before, so I ran kafka-topics locally at the machine running the code to test and I got UnknownHostException. I added the IP and the host name in hosts
file and it worked fine in both kafka-topics and the code.
It seems that KafkaConsumer
was trying to fetch the messages but failed without raising any exceptions.
Upvotes: 0
Reputation: 192
I ran into the same problem: I can recieve in kafka console but can't get message with python script using package kafka-python
.
Finally I figure the reason is that I didn't call producer.flush()
and producer.close()
in my producer.py
which is not mentioned in its documentation .
Upvotes: 9
Reputation: 3715
My take is: to print and ensure offset is what you expect it to be. By using position()
and seek_to_beginning()
, please see comments in the code.
I can't explain:
KafkaConsumer
, the partitions are not assigned, is this by design? Hack around is to call poll()
once before seek_to_beginning()
seek_to_beginning()
, first call to poll()
returns no data and doesnt change the offset.Code:
import kafka
print(kafka.__version__)
from kafka import KafkaProducer, KafkaConsumer
from time import sleep
KAFKA_URL = 'localhost:9092' # kafka broker
KAFKA_TOPIC = 'sida3_sdtest_topic' # topic name
# ASSUMING THAT the topic exist
# write to the topic
producer = KafkaProducer(bootstrap_servers=[KAFKA_URL])
for i in range(20):
producer.send(KAFKA_TOPIC, ('msg' + str(i)).encode() )
producer.flush()
# read from the topic
# auto_offset_reset='earliest', # auto_offset_reset is needed when offset is not found, it's NOT what we need here
consumer = KafkaConsumer(KAFKA_TOPIC,
bootstrap_servers=[KAFKA_URL],
max_poll_records=2,
group_id='sida3'
)
# (!?) wtf, why we need this to get partitions assigned
# AssertionError: No partitions are currently assigned if poll() is not called
consumer.poll()
consumer.seek_to_beginning()
# also AssertionError: No partitions are currently assigned if poll() is not called
print('partitions of the topic: ',consumer.partitions_for_topic(KAFKA_TOPIC))
from kafka import TopicPartition
print('before poll() x2: ')
print(consumer.position(TopicPartition(KAFKA_TOPIC, 0)))
print(consumer.position(TopicPartition(KAFKA_TOPIC, 1)))
# (!?) sometimes the first call to poll() returns nothing and doesnt change the offset
messages = consumer.poll()
sleep(1)
messages = consumer.poll()
print('after poll() x2: ')
print(consumer.position(TopicPartition(KAFKA_TOPIC, 0)))
print(consumer.position(TopicPartition(KAFKA_TOPIC, 1)))
print('messages: ', messages)
Output:
2.0.1
partitions of the topic: {0, 1}
before poll() x2:
0
0
after poll() x2:
0
2
messages: {TopicPartition(topic='sida3_sdtest_topic', partition=1): [ConsumerRecord(topic='sida3_sdtest_topic', partition=1, offset=0, timestamp=1600335075864, timestamp_type=0, key=None, value=b'msg0', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1), ConsumerRecord(topic='sida3_sdtest_topic', partition=1, offset=1, timestamp=1600335075864, timestamp_type=0, key=None, value=b'msg1', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1)]}
Upvotes: 1
Reputation: 1459
The difference between the console-consumer and the python consumer code you have posted is the python consumer uses a consumer group to save offsets: group_id="test-consumer-group"
. If instead you set group_id=None, you should see the same behavior as the console consumer.
Upvotes: 15