Reputation: 5372
The API of AdminClient
contains the method describeLogDirs which needs "a list of brokers" in both signatures. The expression in quotes is exactly what the javadoc tells about it - not more.
What do I have to provide as argument for this function? My first guess was a sequence starting by 0 or 1 to replication factor, but then I saw the following (unrelated) line in the logs of my application which showed me that I probably have to expect something like 1001
.
13:47:11.931 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=myInstance-1725351556, groupId=] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(myexample-0)) to broker example.com:9092 (id: 1001 rack: null)
I verified this by hard-coding 1001
for one run:
List<Integer> brokers = Collections.singletonList(1001);
DescribeLogDirsResult result = adminClient.describeLogDirs(brokers);
Now: how do I fill the variable brokers
with live values from my kafka Client? (Producer
and adminClient
instances are available in my code)? I wasn't able to find any example code for this call.
Update (Solution): This is the final codeline which I'm using now:
List<Integer> brokers = adminClient.describeCluster().nodes().get().stream().map(Node::id).collect(Collectors.toList());
Upvotes: 2
Views: 1982
Reputation: 39790
You can use describeCluster()
in order to fetch node-specific details like host, port and id.
public abstract DescribeClusterResult describeCluster(DescribeClusterOptions options)
Get information about the nodes in the cluster.
Parameters: options - The options to use when getting information about the cluster.
Returns: The
DescribeClusterResult
.
Example:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
AdminClient adminClient = AdminClient.create(props);
DescribeClusterResult describeClusterResult = adminClient.describeCluster();
Collection<Node> clusterDetails = describeClusterResult.nodes().get();
Upvotes: 1