Reputation: 5141
I have a producer code where am sending messages to Kafka. I was able to send messages till yesterday. From today I am unable to send messages. Not sure if it's version compatible issue. There are no failures or error messages, code gets executed, but it's not sending messages.
Below are the Python module versions:
kafka-python==2.0.1
Python 3.8.2
Below is my code:
from kafka import KafkaProducer
import logging
logging.basicConfig(level=logging.INFO)
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
producer.send('Jim_Topic', b'Message from PyCharm')
producer.send('Jim_Topic', key=b'message-two', value=b'This is Kafka-Python')
I tries logging the behavior as well, but no idea why producer gets closed:
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to 127.0.0.1:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
INFO:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.
INFO:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed within timeout 0.
INFO:kafka.producer.kafka:Kafka producer closed
Process finished with exit code 0
Upvotes: 1
Views: 13363
Reputation: 5141
Adding producer.flush()
at the end helped me to resolve the issues. Any outstanding messages will be flushed (delivered) before actually committing the transaction
from kafka import KafkaProducer
import logging
logging.basicConfig(level=logging.INFO)
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
producer.send('Jim_Topic', b'Message from PyCharm')
producer.send('Jim_Topic', key=b'message-two', value=b'This is Kafka-Python')
producer.flush()
Upvotes: 7
Reputation: 1631
Can you try the following code. I have picked it up from the kafka-python documentation and tried it on my local Kafka instance
from kafka import KafkaProducer
import json
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
if __name__ == "__main__":
target_topic = "Jim_Topic"
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],retries=5, key_serializer=lambda x: json.dumps(x).encode("ascii"), value_serializer=lambda x: json.dumps(x).encode("ascii"))
messages = [{key:"message_one",value:"Message from PyCharm"},{key:"message_two",value:"This is Kafka-Python"}]
for msg in messages:
producer.send(target_topic,msg).add_callback(on_send_success).add_errback(on_send_error)
producer.flush(timeout=10) # this forcibly sends any messages that are stuck.
producer.close(timeout=5)
Upvotes: 0