Reputation: 505
I am a newbie in Kafka. I am currently trying to work with Kafka where I want to take data from one Kafka topic called "input-topic" which has 5 partitions and each partitions has some random integers and move it to another topic called "output-topic" which only has one partition.
Below is my code that I am using
Kafka Producer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: bytes(x))
topic_name = "input-topic"
num_partitions = 3
def send_data_to_topics():
print("sending...")
for i in range(num_partitions):
for j in range(5):
producer.send(topic_name, value=j, partition=i)
producer.flush()
if __name__ == "__main__":
send_data_to_topics()
Kafka Consumer
topic_name_input = "input-topic"
topic_name_output = "output-topic"
data = []
def bytes_to_int(bytes):
result = 0
for b in bytes:
result = result * 256 + int(b)
return result
consumer = KafkaConsumer(
topic_name_input,
bootstrap_servers=['localhost:9092'],
group_id='my-group',
value_deserializer=lambda x: bytes_to_int(x)
)
producer1 = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: bytes(x))
def read_topic_data():
print("received")
for message in consumer:
print(message)
data.append(message.value)
def send_data_to_topic():
data = sorted(data)
for d in data:
producer1.send(topic_name_output, value=d)
producer1.flush()
if __name__ == "__main__":
read_topic_data()
send_data_to_topic()
Another Kafka Consumer
topic_name_input = "input-topic"
topic_name_output = "output-topic"
def bytes_to_int(bytes):
result = 0
for b in bytes:
result = result * 256 + int(b)
return result
consumer2 = KafkaConsumer(
topic_name_output,
bootstrap_servers=['localhost:9092'],
group_id='my-group_new',
value_deserializer=lambda x: bytes_to_int(x)
)
def read_topic_data():
print("received")
for message in consumer2:
print(message)
if __name__ == "__main__":
read_topic_data()
However i don't receive any messages in the topic "output-topic". Am I missing something here? How can I achieve it efficiently? Do I need to use Kafka Streams? Any help would be appreciated. Thanks!!
Upvotes: 0
Views: 967
Reputation: 1743
I would recommend using KafkaStreams which is efficient for this use case.
KafkaConsumer call is blocking call in nature. When KafkaConsumer is started, it starts processing the messages already available in the topic and waits for new messages to come in. It will process new messages as and when they come into the topic and then will continue to wait for more messages.
Parameter consumer_timeout_ms
can be passed to the KafkaConsumer constructor to stop processing after X amount of time if there are no new messages.
From the example code shared,
if __name__ == "__main__":
read_topic_data()
send_data_to_topic()
due to the nature of the KafkaConsumer, read_topic_data() function is called but control is not returned to main. So send_data_to_topic() is never called.
To avoid this issue, both reading data from one topic and sending data to another topic should work concurrently. Threads can be used here.
if __name__ == "__main__":
read_thread = Thread(target=read_topic_data)
read_thread.start()
write_thread = Thread(target=send_data_to_topic)
write_thread.start()
It is also important to note that the KafkaProducer will send all the messages in the data
list and exit. It does not wait for/process new messages after exiting. So user has to implement the wait to keep it running.
def send_data_to_topic():
while True:
print("starting write thread")
sorted_data = sorted(data)
for d in sorted_data:
producer1.send(topic_name_output, value=d)
producer1.flush()
print("finishing write thread")
time.sleep(10) # wait for 10 seconds before starting next iteration
Another issue with the code is message duplication. read_topic_data() reads messages from Kafka input topic and appends them to a list data
. send_data_to_topic() reads messages from this list and sends it to output topic. However, already processed messages are not removed from list. Queue can be used to fix this issue.
from threading import Thread
from queue import Queue
from kafka import KafkaProducer
from kafka import KafkaConsumer
topic_name_input = "input-topic"
topic_name_output = "output-topic"
data = Queue()
def bytes_to_int(bytes):
result = 0
for b in bytes:
result = result * 256 + int(b)
return result
consumer = KafkaConsumer(
topic_name_input,
bootstrap_servers=['avelusamy-mbp15:9092'],
group_id='my-group',
value_deserializer=lambda x: bytes_to_int(x)
)
producer1 = KafkaProducer(
bootstrap_servers=['avelusamy-mbp15:9092'], value_serializer=lambda x: bytes(x))
def read_topic_data():
print("received")
for message in consumer:
print(message)
data.put(message.value)
def send_data_to_topic():
while True:
producer1.send(topic_name_output, value=data.get())
producer1.flush()
if __name__ == "__main__":
read_thread = Thread(target=read_topic_data)
read_thread.start()
write_thread = Thread(target=send_data_to_topic)
write_thread.start()
Upvotes: 1