Reputation: 1286
I am totally new to Kafka and Docker, and have been handed a problem to fix. Our Continuous Integration tests for Kafka (Apache) queues run just fine on local machines, but when on the Jenkins CI server, occasionally fail with this sort of error:
%3|1508247800.270|FAIL|art#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
%3|1508247800.270|ERROR|art#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
%3|1508247800.270|ERROR|art#producer-1| [thrd:localhost:9092/bootstrap]: 1/1 brokers are down
The working theory is that the Docker image takes time to get started, by which time the Kafka producer has given up. The offending code is
producer_properties = {
'bootstrap.servers': self._job_queue.bootstrap_server,
'client.id': self._job_queue.client_id
}
try:
self._producer = kafka.Producer(**producer_properties)
except:
print("Bang!")
with the error lines above appearing in the creation of the producer. However, no exception is raised, and the call returns an otherwise valid looking producer, so I can't programmatically test the presence of the broker endpoint. Is there an API to check the status of a broker?
Upvotes: 1
Views: 925
Reputation: 1286
Here is the code that seems to work for me. If it looks a bit Frankenstein, then you are right, it is! If there is a clean solution, I look forward to seeing it:
import time
import uuid
from threading import Event
from typing import Dict
import confluent_kafka as kafka
# pylint: disable=no-name-in-module
from confluent_kafka.cimpl import KafkaError
# more imports...
LOG = # ...
# Default number of times to retry connection to Kafka Broker
_DEFAULT_RETRIES = 3
# Default time in seconds to wait between connection attempts
_DEFAULT_RETRY_DELAY_S = 5.0
# Number of times to scan for an error after initiating the connection. It appears that calling
# flush() once on a producer after construction isn't sufficient to catch the 'broker not available'
# # error. At least twice seems to work.
_NUM_ERROR_SCANS = 2
class JobProducer(object):
def __init__(self, connection_retries: int=_DEFAULT_RETRIES,
retry_delay_s: float=_DEFAULT_RETRY_DELAY_S) -> None:
"""
Constructs a producer.
:param connection_retries: how many times to retry the connection before raising a
RuntimeError. If 0, retry forever.
:param retry_delay_s: how long to wait between retries in seconds.
"""
self.__error_event = Event()
self._job_queue = JobQueue()
self._producer = self.__wait_for_broker(connection_retries, retry_delay_s)
self._topic = self._job_queue.topic
def produce_job(self, job_definition: Dict) -> None:
"""
Produce a job definition on the queue
:param job_definition: definition of the job to be executed
"""
value = ... # Conversion to JSON
key = str(uuid.uuid4())
LOG.info('Produced message: %s', value)
self.__error_event.clear()
self._producer.produce(self._topic,
value=value,
key=key,
on_delivery=self._on_delivery)
self._producer.flush(self._job_queue.flush_timeout)
@staticmethod
def _on_delivery(error, message):
if error:
LOG.error('Failed to produce job %s, with error: %s', message.key(), error)
def __create_producer(self) -> kafka.Producer:
producer_properties = {
'bootstrap.servers': self._job_queue.bootstrap_server,
'error_cb': self.__on_error,
'client.id': self._job_queue.client_id,
}
return kafka.Producer(**producer_properties)
def __wait_for_broker(self, retries: int, delay: float) -> kafka.Producer:
retry_count = 0
while True:
self.__error_event.clear()
producer = self.__create_producer()
# Need to call flush() several times with a delay between to ensure errors are caught.
if not self.__error_event.is_set():
for _ in range(_NUM_ERROR_SCANS):
producer.flush(0.1)
if self.__error_event.is_set():
break
time.sleep(0.1)
else:
# Success: no errors.
return producer
# If we get to here, the error callback was invoked.
retry_count += 1
if retries == 0:
msg = '({})'.format(retry_count)
else:
if retry_count <= retries:
msg = '({}/{})'.format(retry_count, retries)
else:
raise RuntimeError('JobProducer timed out')
LOG.warn('JobProducer: could not connect to broker, will retry %s', msg)
time.sleep(delay)
def __on_error(self, error: KafkaError) -> None:
LOG.error('KafkaError: %s', error.str())
self.__error_event.set()
Upvotes: 0
Reputation: 5387
It seems the client doesn't throw exception if connection to broker fails. It actually tries to connect to bootstrap servers when first time producer tries to send the message. If connection fails, it repeatedly tries to connect to any of the brokers passed in the bootstrap list. Eventually, if the brokers come up, send will happen (and we may check the status in the callback function). The confluent kafka python library is using librdkafka library and this client doesn't seem to have proper documentation. Some of the Kafka producer option specified by Kafka protocol, seem not supported by librdkafka.
Here is the sample code with callback I used:
from confluent_kafka import Producer
def notifyme(err, msg):
print err, msg.key(), msg.value()
p = Producer({'bootstrap.servers': '127.0.0.1:9092', 'retry.backoff.ms' : 100,
'message.send.max.retries' : 20,
"reconnect.backoff.jitter.ms" : 2000})
try:
p.produce(topic='sometopic', value='this is data', on_delivery=notifyme)
except Exception as e:
print e
p.flush()
Also, checking for the presence of the broker, you may just telnet to the broker ip on its port (in this example it is 9092). And on the Zookeeper used by Kafka cluster, you may check the contents of the znodes under /brokers/ids
Upvotes: 2