tank
tank

Reputation: 505

Moving data from one Kafka topic to other is not working in Kafka Python

I am a newbie in Kafka. I am currently trying to work with Kafka where I want to take data from one Kafka topic called "input-topic" which has 5 partitions and each partitions has some random integers and move it to another topic called "output-topic" which only has one partition.

Below is my code that I am using

Kafka Producer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: bytes(x))

topic_name = "input-topic"

num_partitions = 3

def send_data_to_topics():
    print("sending...")
    for i in range(num_partitions):
        for j in range(5):
            producer.send(topic_name, value=j, partition=i)
    producer.flush()
    

if __name__ == "__main__":
    send_data_to_topics()

Kafka Consumer

topic_name_input  = "input-topic"
topic_name_output = "output-topic"

data = []

def bytes_to_int(bytes):
    result = 0
    for b in bytes:
        result = result * 256 + int(b)
    return result

consumer = KafkaConsumer(
     topic_name_input,
     bootstrap_servers=['localhost:9092'],
     group_id='my-group',
     value_deserializer=lambda x: bytes_to_int(x)
    )

producer1 = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: bytes(x))

def read_topic_data():
    print("received")
    for message in consumer:
        print(message)
        data.append(message.value)

def send_data_to_topic():
    data = sorted(data) 
    for d in data:
        producer1.send(topic_name_output, value=d)
    producer1.flush()
        
        


if __name__ == "__main__":
    read_topic_data()
    send_data_to_topic()


Another Kafka Consumer

topic_name_input  = "input-topic"
topic_name_output = "output-topic"

def bytes_to_int(bytes):
    result = 0
    for b in bytes:
        result = result * 256 + int(b)
    return result

consumer2 = KafkaConsumer(
     topic_name_output,
     bootstrap_servers=['localhost:9092'],
     group_id='my-group_new',
     value_deserializer=lambda x: bytes_to_int(x)
    )

def read_topic_data():
    print("received")
    for message in consumer2:
        print(message)
        

if __name__ == "__main__":
    read_topic_data()

However i don't receive any messages in the topic "output-topic". Am I missing something here? How can I achieve it efficiently? Do I need to use Kafka Streams? Any help would be appreciated. Thanks!!

Upvotes: 0

Views: 967

Answers (1)

arunkvelu
arunkvelu

Reputation: 1743

I would recommend using KafkaStreams which is efficient for this use case.

KafkaConsumer call is blocking call in nature. When KafkaConsumer is started, it starts processing the messages already available in the topic and waits for new messages to come in. It will process new messages as and when they come into the topic and then will continue to wait for more messages.

Parameter consumer_timeout_ms can be passed to the KafkaConsumer constructor to stop processing after X amount of time if there are no new messages.

From the example code shared,

if __name__ == "__main__":
    read_topic_data()
    send_data_to_topic()

due to the nature of the KafkaConsumer, read_topic_data() function is called but control is not returned to main. So send_data_to_topic() is never called.

To avoid this issue, both reading data from one topic and sending data to another topic should work concurrently. Threads can be used here.

if __name__ == "__main__":
    read_thread = Thread(target=read_topic_data)
    read_thread.start()
    write_thread = Thread(target=send_data_to_topic)
    write_thread.start()

It is also important to note that the KafkaProducer will send all the messages in the data list and exit. It does not wait for/process new messages after exiting. So user has to implement the wait to keep it running.

def send_data_to_topic():
    while True:
        print("starting write thread")
        sorted_data = sorted(data)
        for d in sorted_data:
            producer1.send(topic_name_output, value=d)
        producer1.flush()
        print("finishing write thread")
        time.sleep(10) # wait for 10 seconds before starting next iteration

Another issue with the code is message duplication. read_topic_data() reads messages from Kafka input topic and appends them to a list data. send_data_to_topic() reads messages from this list and sends it to output topic. However, already processed messages are not removed from list. Queue can be used to fix this issue.

from threading import Thread
from queue import Queue

from kafka import KafkaProducer
from kafka import KafkaConsumer

topic_name_input = "input-topic"
topic_name_output = "output-topic"

data = Queue()


def bytes_to_int(bytes):
    result = 0
    for b in bytes:
        result = result * 256 + int(b)
    return result


consumer = KafkaConsumer(
     topic_name_input,
     bootstrap_servers=['avelusamy-mbp15:9092'],
     group_id='my-group',
     value_deserializer=lambda x: bytes_to_int(x)
    )

producer1 = KafkaProducer(
    bootstrap_servers=['avelusamy-mbp15:9092'], value_serializer=lambda x: bytes(x))


def read_topic_data():
    print("received")
    for message in consumer:
        print(message)
        data.put(message.value)


def send_data_to_topic():
    while True:
        producer1.send(topic_name_output, value=data.get())
        producer1.flush()


if __name__ == "__main__":
    read_thread = Thread(target=read_topic_data)
    read_thread.start()
    write_thread = Thread(target=send_data_to_topic)
    write_thread.start()

Upvotes: 1

Related Questions