Reputation: 1282
I'm trying to use send messages to a Kafka topic which is supported by multiple brokers. Below is my method, where self.bootstrap_servers = "[b-1.dev-stg-kafka.wjiw3s.c1.kafka.us-east-1.amazonaws.com:9092, b-2.dev-stg-kafka.wjiw3s.c1.kafka.us-east-1.amazonaws.com:9092]"
from kafka import KafkaProducer
from time import sleep
def send(self):
try:
producer = KafkaProducer(bootstrap_servers=self.bootstrap_server, api_version=(0, 10, 1))
message = self.prepare_message()
producer.send(self.topic, self.encoded_payload)
sleep(2)
except:
print("Exception sending message to kafka")
when I try to test this I get in the exception block and the line that fails is when I create my producer. How can I publish a Kafka message with this KafkaProducer, which has multiple kafka bootstrap_servers supporting it? Is there another way to do this?
Error:
File "C:\Users\ENV\datareplayer\venv\lib\site-packages\kafka\client_async.py", line 216, in __init__
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
File "C:\Users\ENV\datareplayer\venv\lib\site-packages\kafka\conn.py", line 1236, in collect_hosts
host, port, afi = get_ip_port_afi(host_port)
File "C:\Users\ENV\datareplayer\venv\lib\site-packages\kafka\conn.py", line 1192, in get_ip_port_afi
host, rest = host_and_port_str[1:].split(']')
ValueError: not enough values to unpack (expected 2, got 1)
Upvotes: 0
Views: 7479
Reputation: 66
just try to sepaprate servers by comma. It works.
kafka_servers = ["localhost:9092","localhost:9093","localhost:9094"]
producer = KafkaProducer(
bootstrap_servers = kafka_servers,
value_serializer=json_serializer
)
consumer = KafkaConsumer(
"topic_name",
bootstrap_servers=["localhost:9092","localhost:9093","localhost:9094"],
auto_offset_reset="earliest",
group_id="consumer-group-a"
)
Upvotes: 1
Reputation: 11
ast.literal_eval(bootstrap_servers)
can help.
It takes a stringified list as input, and converts it to a list. For example:
input_str = "['broker1', 'broker2', 'broker3']"
type(input_str)
str
If you do
input_list = ast.literal_eval(input_str)
input_list
is now a list
type(input_list)
list
Upvotes: 1
Reputation: 1831
Regarding sending data to kafka cluster with multiple brokers, The bootstrap server is just the initial connection point, it gets from the bootstraping server metadata where the data leaders are and connects to the appropriate brokers... so you will need a network connection to all brokers in the cluster and it is does not matter which broker is chosen as the bootstrap server
Being said that... the recommended way is to have comma separated list as bootstrap servers, so if one is down it will use the next in the list...
Regarding specifying list of bootstrap servers in the setting of the python kafka
Which python kafka libraries are you using?
It seems there is open issue regarding this matter that had not been fixed yet.
https://github.com/confluentinc/confluent-kafka-python/issues/711
Description The configuration key bootstrap.servers implies that multiple servers can be specified, but it is not explicitly defined what this key's value should be. It appears that only a string using comma separation is allowed for this config key. If a Python list is given, no explicit error or feedback is given to indicate this is not valid. Instead, calls to consumer.poll() or other broker-bound functions simply block and never succeed.
Ideally, a Python list (e.g. ["broker1", "broker2"]) should be valid; or at the very least, constructing the Consumer should throw a runtime/configuration exception.
Upvotes: 2