Reputation: 55
Here is part of my code:
class KafkaProducer:
def __init__(self):
pass
bootstrap_server_host = system_config.get_kafka_bootstrap_server()
producer = Producer({'bootstrap.servers': bootstrap_server_host, "log.connection.close":False})
@classmethod
def send(cls, topic, key, value, data_type=None, uid=None):
try:
data = {"data": value, "createTime": long(time.time() * 1000)}
if data_type is not None:
data["type"] = int(data_type)
if uid is not None:
data["uid"] = long(uid)
cls.producer.produce(topic, json.dumps(data), key)
cls.producer.poll(0)
except BufferError as e:
logger.error('%% Local producer queue is full ' \
'(%d messages awaiting delivery): try again\n' %
len(cls.producer))
raise e
class new_application_scanner():
@classmethod
def scan_new_application(cls):
db_source = None
try:
db_source = DBConnector().connect()
db_cur = db_source.cursor()
...
KafkaProducer.send("RiskEvent", str(uid),
{"uid": uid, "country_id": user_info[1], "event_id": constant.RISK_EVENT_NEW_APPLICATION})
...
except Exception as e:
logger.error(traceback.format_exc())
finally:
if db_source is not None:
db_source.close()
def run_scan_new_application():
while is_scan_new_application_active:
try:
logging.info("scan_new_application starts at %s",time.time())
new_application_scanner.scan_new_application()
logging.info("scan_new_application ends at %s", time.time())
except Exception as e:
logging.error("new_application_scanner Error:%s",format(e))
logging.error(traceback.format_exc())
time.sleep(10)
t1 = threading.Thread(target=run_scan_new_application, name='run_scan_new_application', args=([]))
t1.start()
I have a kafka group of two servers. when I restart two servers one by one ,KafkaProducer.send() throws KafkaException(maybe some bug in confluent_kafka), and there are some exception logs.
The strange thing is the Exception continues to throw out of scan_new_application and there are exception logs in run_scan_new_application too. Even the thread stopped.Here is the exception logs:
2017-12-21 07:11:49 INFO pre_risk_control_flow.py:71 pid-16984 scan_new_application starts at 1513840309.6
2017-12-21 07:11:49 ERROR new_application_scan.py:165 pid-16984 Traceback (most recent call last):
File "/home/ubuntu/data/code/risk/Feature_Engine/data_retrive/pre_risk_control_flow/new_application_scan.py", line 163, in scan_new_application
{"uid": uid, "country_id": user_info[1], "event_id": constant.RISK_EVENT_NEW_APPLICATION})
File "/home/ubuntu/data/code/risk/Feature_Engine/data_retrive/kafka_client/Producer.py", line 27, in send
cls.producer.produce(topic, json.dumps(data), key)
KafkaException: KafkaError{code=_UNKNOWN_TOPIC,val=-188,str="Unable to produce message: Local: Unknown topic"}
2017-12-21 07:11:49 ERROR pre_risk_control_flow.py:75 pid-16984 new_application_scanner Error:KafkaError{code=_UNKNOWN_TOPIC,val=-188,str="Unable to produce message: Local: Unknown topic"}
Upvotes: 0
Views: 2298
Reputation: 3113
The underlying client is raising KafkaException KafkaError{code=_UNKNOWN_TOPIC..}
because it (now) knows the requested topic does not exist in the cluster (and auto topic creation is disabled). This is expected.
You are seeing the exception in run_scan_new_application
because you are not catching KafkaException
in send()
.
Upvotes: 1