Reputation: 2421
I am trying to setup a Kafka cluster (the first node in the cluster actually).
I have a single node zookeeper cluster setup. I am setting up kafka on a separate node.
Both running CentOS 6.4, running IPV6 which is a bit of a PITA. I verified that the machines can talk to each other using netcat.
When I startup kafka, I am getting the following exception (which causes kafka to shut down).
EDIT: I got kafka starting, I had to set the host.name
property in the server.config file.
I was able to create a test topic and send messages just fine from the kafka server.
However, I get the same error when trying to consume the messages.
Any help, suggestions?
bin/kafka-console-consumer.sh --zookeeper zk1:2181 --topic test --from-beginning
Exception in thread "main" java.net.UnknownHostException: kafka: kafka: Name or service not known
at java.net.InetAddress.getLocalHost(InetAddress.java:1473)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:128)
at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:178)
at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
Caused by: java.net.UnknownHostException: kafka: Name or service not known
at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:901)
at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1293)
at java.net.InetAddress.getLocalHost(InetAddress.java:1469)
... 5 more
Upvotes: 25
Views: 44326
Reputation: 31
When working with Kafka, especially in environments where the default host resolution setup might not work (e.g., when the /etc/hosts
file cannot be modified), configuring DNS for brokers can be challenging. To address this, I implemented a custom DNS resolver that integrates directly into the Kafka producer. While it might not be the most elegant solution, it works effectively for both producers and consumers.
The producer uses a utility method to inject the custom DNS resolver. If enabled, it ensures the broker hostname resolves to the configured IP:
public abstract class CustomAvroProducer<T> extends AbstractKafkaClient implements CustomProducer<T> {
private static final Logger LOG = LogManager.getLogger(CustomAvroProducer.class);
private final List<T> records = Collections.synchronizedList(new ArrayList<>());
private final KafkaProducer<String, T> producer = new KafkaProducer<>(getProperties());
public CustomAvroProducer(KafkaTopic topic) {
super(topic);
configureCustomDnsResolver();
}
private void configureCustomDnsResolver() {
if (BROKER_DNS_ENABLE) { // Remove if need
try {
// DEFAULT_BROKER_HOST = kafka
// BROKER_IP = 10.1.1.9
KafkaUtil.injectDnsResolverForBroker(producer, DEFAULT_BROKER_HOST, BROKER_IP);
LOG.info("Custom DNS resolver activated for broker: {}", BROKER_IP);
} catch (Exception e) {
LOG.error("Failed to add DNS for hostname '{}', broker IP '{}'", DEFAULT_BROKER_HOST, BROKER_IP, e);
}
}
}
...
}
/**
* Utility class for Kafka-related operations.
* This class provides methods to inject a custom DNS resolver
* into the Kafka producer, ensuring the broker name is recognized
* and messages are sent correctly.
*/
public class KafkaUtil {
/**
* Injects a custom DNS resolver into the Kafka producer,
* configuring the broker's IP address.
*
* @param kafka The Kafka producer where the DNS resolver will be injected.
* @param brokerDefault The default broker name to be used by the DNS resolver.
* @param brokerIp The IP address of the Kafka broker.
* @throws NoSuchFieldException If one of the expected fields is not found.
* @throws IllegalAccessException If access to the field is not allowed.
*/
public static <T> void injectDnsResolverForBroker(KafkaProducer<String, T> kafka, String brokerDefault, String brokerIp) throws NoSuchFieldException, IllegalAccessException {
Object connectionStates = getProducerConnectionStates(kafka);
Field hostResolver = connectionStates.getClass().getDeclaredField("hostResolver");
hostResolver.setAccessible(true);
hostResolver.set(connectionStates, new CustomDnsResolver(brokerDefault, brokerIp));
}
private static <T> Object getProducerConnectionStates(KafkaProducer<String, T> kafka) throws NoSuchFieldException, IllegalAccessException {
Field senderField = KafkaProducer.class.getDeclaredField("sender");
senderField.setAccessible(true);
Sender sender = (Sender) senderField.get(kafka);
Field clientField = Sender.class.getDeclaredField("client");
clientField.setAccessible(true);
NetworkClient networkClient = (NetworkClient) clientField.get(sender);
Field networkField = NetworkClient.class.getDeclaredField("connectionStates");
networkField.setAccessible(true);
return networkField.get(networkClient);
}
}
public class CustomDnsResolver implements HostResolver {
private static final Logger LOG = LogManager.getLogger(CustomDnsResolver.class);
protected final String brokerIp;
protected final boolean isBrokerIpConfigured;
protected final String defaultBrokerHost;
public CustomDnsResolver(String defaultBrokerHost, String brokerIp) {
this.brokerIp = brokerIp;
this.defaultBrokerHost = defaultBrokerHost;
this.isBrokerIpConfigured = Strings.isNotEmpty(brokerIp);
}
@Override
public InetAddress[] resolve(String host) throws UnknownHostException {
if (isBrokerHost(host)) {
LOG.info("Resolving host '{}' to IP: {}", defaultBrokerHost, brokerIp);
return InetAddress.getAllByName(brokerIp);
}
return InetAddress.getAllByName(host);
}
private boolean isBrokerHost(String host) {
return host.equals(defaultBrokerHost) && this.isBrokerIpConfigured;
}
}
Upvotes: 0
Reputation: 457
Under your kafka_folder/config (sudo vim /etc/hosts), your file should look like that:
127.0.0.1 kafka localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain <localhost_OR_DNS_OR_ip_address>
<localhost_OR_DNS_OR_ip_address> kafka
note that kafka is my desired hostname name.
Then, under
kafka_folder/config/server.properties
there's a field "listeners=".
It looks like that
# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
Simply uncomment that field by removing the "#" before it. Should look like that:
listeners=PLAINTEXT://<localhost_OR_DNS_OR_ip_address>:9092
Upvotes: 3
Reputation: 198
if you get dns server configured kafka will consider your dns domain name so tape this command on your server
hostname the result will be your localdomain, now copie your localdomain then open hosts file
nano /etc/hosts add your localdomain.
127.0.0.1 localhost localhost.localdomain "your localdomain"
::1 localhost localhost.localdomain "your localdomain"
Upvotes: 0
Reputation: 391
As noplay pointed out the issue was that Kafka wasn't able to resolve the correct IP, this may happen for example on you EC2 instances running in private subnets without assignment of public IP. The solution summarized:
hostname
Which will show you the host name, something like ip-10-180-128-217. Then just update your /etc/hosts
sudo nano /etc/hosts
edit, e.g.
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ip-10-180-128-217
Upvotes: 7
Reputation: 2421
When you run > bin/kafka-console-consumer.sh
command kafka loads a ConsoleConsumer
, which will attempt to create a consumer with an auto generated consumer id. The way Kafka generates the consumer id is to concatenate the name of the local host to it. So, in the problem was the fact that java could not resolve the ip address for local host on the Open Stack VM I am working with.
So the answer was that the Open Stack VM was resolving the local host name to kafka
, which is the name of the VM. I had everything setup in the Kafka and Zookeeper instances as kafka1
.
So, when java was calling getLocalHost, it was trying to find the IP Address for kafka
, which I did not have in my /etc/hosts file.
I simply added an entry for kafka
in my /etc/hosts file and everything started working wonderfully!!!
I would have thought it would resolve to localhost
, but it did not, it resolved to the name of the vm, kafka
.
Upvotes: 32