vily
vily

Reputation: 93

Is there any way to check if kafka is up and running from kafka-net

I am using kafka-net client to send messages to kafka. I'm just wondering if there is any way to check is kafka server up and can receive messages. I shut kafka down, but the producer has been created successfully and SendMessageAsync just freezes for quite a long time. I've tried to pass timeout but it doesn't change anything. I use kafka-net 0.9 It works just fine when kafka server is up and running

Upvotes: 5

Views: 17461

Answers (2)

Dale
Dale

Reputation: 1

Try this.

In your constructor, put

options = new KafkaOptions(uri);
var endpoint = new DefaultKafkaConnectionFactory().Resolve(options.KafkaServerUri.First(), options.Log);
client = new KafkaTcpSocket(new DefaultTraceLog(), endpoint);

and then before you send each message,

// test if the broker is alive
var request = new MetadataRequest { Topics = new List<string>() { Topic } };
var task1 = client.WriteAsync(request.Encode()).ConfigureAwait(false);
Task<KafkaDataPayload> task2 = Task.Factory.StartNew(() =>  task1.GetAwaiter().GetResult());
if (task2.Wait(30000) == false)
{
    throw new TimeoutException("Timeout while sending message to kafka broker!");
}

If you have a high volume of messages, this is going to be a performance hit, but with a low volume of messages it shouldn't matter.

Upvotes: 0

Shawn Guo
Shawn Guo

Reputation: 3228

Broker's id is registered in zookeeper(/brokers/ids/[brokerId]) as ephemeral node, which allow other brokers and consumers to detect failures.(Right now the definition of health is fairly naive., if registered in zk /brokers/ids/[brokerId] the broker is healthy, otherwise it is dead).

zookeeper ephemeral node exists as long as the broker's session is active.

You could check if broker is up via ZkUtils.getSortedBrokerList(zkClient), which return all active broker id under /brokers/ids

import org.I0Itec.zkclient.ZkClient;

ZkClient zkClient = new ZkClient(properties.getProperty("zkQuorum"), zkSessionTimeout, zkConnectionTimeout,ZKStringSerializer$.MODULE$);
ZkUtils.getSortedBrokerList(zkClient);

Reference
Kafka data structures in Zookeeper

Upvotes: 1

Related Questions