Jim Macaulay
Jim Macaulay

Reputation: 5141

Unable to send messages to topic in Kafka Python

I have a producer code where am sending messages to Kafka. I was able to send messages till yesterday. From today I am unable to send messages. Not sure if it's version compatible issue. There are no failures or error messages, code gets executed, but it's not sending messages.

Below are the Python module versions:

kafka-python==2.0.1
Python 3.8.2

Below is my code:

from kafka import KafkaProducer
import logging
logging.basicConfig(level=logging.INFO)

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
producer.send('Jim_Topic', b'Message from PyCharm')
producer.send('Jim_Topic', key=b'message-two', value=b'This is Kafka-Python')

I tries logging the behavior as well, but no idea why producer gets closed:

INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to 127.0.0.1:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
INFO:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.
INFO:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed within timeout 0.
INFO:kafka.producer.kafka:Kafka producer closed

Process finished with exit code 0

Upvotes: 1

Views: 13363

Answers (2)

Jim Macaulay
Jim Macaulay

Reputation: 5141

Adding producer.flush() at the end helped me to resolve the issues. Any outstanding messages will be flushed (delivered) before actually committing the transaction

from kafka import KafkaProducer
import logging
logging.basicConfig(level=logging.INFO)

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
producer.send('Jim_Topic', b'Message from PyCharm')
producer.send('Jim_Topic', key=b'message-two', value=b'This is Kafka-Python')
producer.flush()

Upvotes: 7

Yayati Sule
Yayati Sule

Reputation: 1631

Can you try the following code. I have picked it up from the kafka-python documentation and tried it on my local Kafka instance

from kafka import KafkaProducer
import json
def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)


def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)


if __name__ == "__main__":
    target_topic = "Jim_Topic"
    producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],retries=5, key_serializer=lambda x: json.dumps(x).encode("ascii"), value_serializer=lambda x: json.dumps(x).encode("ascii"))
    messages = [{key:"message_one",value:"Message from PyCharm"},{key:"message_two",value:"This is Kafka-Python"}]
    for msg in messages:
      producer.send(target_topic,msg).add_callback(on_send_success).add_errback(on_send_error)

    producer.flush(timeout=10) # this forcibly sends any messages that are stuck.
    producer.close(timeout=5)

Upvotes: 0

Related Questions