Reputation: 111
I have to create a kafka producer that generates a sequence of numbers from 1 to 300. Each of the message I wrote have to contain information about the topic, a key and the value which is a the binary value of the value to write.
This is the code I've created:
from kafka import KafkaProducer
import numpy as np
import time
producer = KafkaProducer(bootstrap_servers='Cloudera02:9092')
for i in range(1,300):
value = bytes(str(i), 'utf-8')
key = (str(i), 'utf-8')
producer.send('PEC5', key = key, value = value)
time.sleep(3)
producer.flush()
The kafka consumer should read the producer and show only the value in the console.
from kafka import KafkaConsumer
from codecs import utf_8_decode
consumer = KafkaConsumer('PEC5', bootstrap_servers='Cloudera02:9092', auto_offset_reset='smallest', consumer_timeout_ms=10000)
for message in consumer:
for value in message.values:
print(value)
I am running 2 terminals, one with the producer and a second with the consumer, but I don't get anything printed in the console. Any idea what's wrong?
Upvotes: 2
Views: 864
Reputation: 40048
The auto_offset_reset
value is wrong in the KafkaConsumer
, from the docs it only has two valid values latest
and earliest
and Any other value will raise the exception
auto_offset_reset (str) – A policy for resetting offsets on OffsetOutOfRange errors: ‘earliest’ will move to the oldest available message, ‘latest’ will move to the most recent. Any other value will raise the exception. Default: ‘latest’.
So construct KafkaConsumer
with earliest
consumer = KafkaConsumer('PEC5', bootstrap_servers='Cloudera02:9092', auto_offset_reset='earliest', consumer_timeout_ms=10000,group_id='test-1')
Also add the new group_id
when you updated the auto_offset_reset
to earliest
, because kafka will only reset the offest to earliest
if no previous offset is found for the consumer's group. docs
group_id (str or None) – The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None
Upvotes: 2