Praba karan
Praba karan

Reputation: 46

Using Python, how to produce kafka message more than 1MB?

I was trying to transfer some huge json data through kafka, but its not happening. Neither no error from python producer nor any messages on the consumer side.

producer = KafkaProducer(bootstrap_servers="kafka_server")
product_message = json.dumps(data)
producer.send(kafka_topic, key=bytes('kk', 'utf-8'), value=bytes(product_message, 'utf-8'))

Upvotes: 1

Views: 918

Answers (2)

Tms91
Tms91

Reputation: 4154

I had the same error and by reading this thread I have fixed the issue like this:

Supposing that you have files server.properties and consumer.properties at path /usr/local/kafka/config/ (this path could be defined in your environment variables as $KAFKA_HOME, but it does not have to)

and that you want to increase the max message length to 15728640 (15 MB),

server side (=broker)

sudo vim /usr/local/kafka/config/server.properties

add line or change variable value

# my additions
# increase max message size to 15 MB
message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

consumer side

sudo vim /usr/local/kafka/config/consumer.properties

add line or change variable value

# my additions
# increase max message size to 15 MB
message.max.bytes=15728640
max.partition.fetch.bytes=15728640
fetch.max.bytes=15728640

restart services

make changes effective (run these commands manually one by one)

sudo systemctl daemon-reload  # wait a little

sudo systemctl restart zookeeper  # wait a little

sudo systemctl restart kafka  # wait a little

Make sure they restarted correctly by running

sudo systemctl status zookeeper
sudo systemctl status kafka

If not, restart them again

Producer side, in the context of your python module kafka-python

Then you have to add the k-argument max_request_size (read the docs) to your KafkaProducer instance, so substitute line

producer.send(kafka_topic, key=bytes('kk', 'utf-8'), value=bytes(product_message, 'utf-8'))

with line

producer.send(kafka_topic, key=bytes('kk', 'utf-8'), value=bytes(product_message, 'utf-8'), max_request_size=15728640)

Upvotes: 0

Gerard Garcia
Gerard Garcia

Reputation: 1856

You may need to increase the topic configuration max.message.bytes to be able to produce the message.

Also, you can probably get a hint of what is going on by getting the future that send returns: https://stackoverflow.com/a/55538034/19059974

Upvotes: 1

Related Questions