ENV
ENV

Reputation: 1282

How to create python kafka producer with multiple bootstrap servers?

I'm trying to use send messages to a Kafka topic which is supported by multiple brokers. Below is my method, where self.bootstrap_servers = "[b-1.dev-stg-kafka.wjiw3s.c1.kafka.us-east-1.amazonaws.com:9092, b-2.dev-stg-kafka.wjiw3s.c1.kafka.us-east-1.amazonaws.com:9092]"

from kafka import KafkaProducer
from time import sleep
    def send(self):
        try:
            producer = KafkaProducer(bootstrap_servers=self.bootstrap_server, api_version=(0, 10, 1))
            message = self.prepare_message()
            producer.send(self.topic, self.encoded_payload)
            sleep(2)
        except:
            print("Exception sending message to kafka")

when I try to test this I get in the exception block and the line that fails is when I create my producer. How can I publish a Kafka message with this KafkaProducer, which has multiple kafka bootstrap_servers supporting it? Is there another way to do this?

Error:

  File "C:\Users\ENV\datareplayer\venv\lib\site-packages\kafka\client_async.py", line 216, in __init__
    self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
  File "C:\Users\ENV\datareplayer\venv\lib\site-packages\kafka\conn.py", line 1236, in collect_hosts
    host, port, afi = get_ip_port_afi(host_port)
  File "C:\Users\ENV\datareplayer\venv\lib\site-packages\kafka\conn.py", line 1192, in get_ip_port_afi
    host, rest = host_and_port_str[1:].split(']')
ValueError: not enough values to unpack (expected 2, got 1)

Upvotes: 0

Views: 7479

Answers (3)

MR.Max
MR.Max

Reputation: 66

just try to sepaprate servers by comma. It works.

kafka_servers = ["localhost:9092","localhost:9093","localhost:9094"]

producer = KafkaProducer(
    bootstrap_servers = kafka_servers, 
    value_serializer=json_serializer
)
consumer = KafkaConsumer( 
     "topic_name",
     bootstrap_servers=["localhost:9092","localhost:9093","localhost:9094"],
     auto_offset_reset="earliest", 
     group_id="consumer-group-a"
)

Upvotes: 1

user10989392
user10989392

Reputation: 11

ast.literal_eval(bootstrap_servers) can help.

It takes a stringified list as input, and converts it to a list. For example:

input_str = "['broker1', 'broker2', 'broker3']"

type(input_str)
str

If you do

input_list = ast.literal_eval(input_str)

input_list is now a list

type(input_list)
list

Upvotes: 1

Ran Lupovich
Ran Lupovich

Reputation: 1831

Regarding sending data to kafka cluster with multiple brokers, The bootstrap server is just the initial connection point, it gets from the bootstraping server metadata where the data leaders are and connects to the appropriate brokers... so you will need a network connection to all brokers in the cluster and it is does not matter which broker is chosen as the bootstrap server

Being said that... the recommended way is to have comma separated list as bootstrap servers, so if one is down it will use the next in the list...

Regarding specifying list of bootstrap servers in the setting of the python kafka

Which python kafka libraries are you using?

It seems there is open issue regarding this matter that had not been fixed yet.

https://github.com/confluentinc/confluent-kafka-python/issues/711

Description The configuration key bootstrap.servers implies that multiple servers can be specified, but it is not explicitly defined what this key's value should be. It appears that only a string using comma separation is allowed for this config key. If a Python list is given, no explicit error or feedback is given to indicate this is not valid. Instead, calls to consumer.poll() or other broker-bound functions simply block and never succeed.

Ideally, a Python list (e.g. ["broker1", "broker2"]) should be valid; or at the very least, constructing the Consumer should throw a runtime/configuration exception.

Upvotes: 2

Related Questions