KAFKA-16521; kafka-metadata-quorum describe command changes for KIP-853 (#16759)

describe --status now includes directory id and endpoint information for voter and observers.
describe --replication now includes directory id.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, José Armando García Sancio <jsancio@apache.org>
This commit is contained in:
Alyssa Huang 2024-08-01 12:28:57 -07:00 committed by GitHub
parent 902fc33b27
commit bc4df734b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 87 additions and 20 deletions

View File

@ -22,7 +22,7 @@
// Version 2 adds additional fields in the response. The request is unchanged (KIP-853).
"validVersions": "0-2",
"flexibleVersions": "0+",
"latestVersionUnstable": true, // Version 2 is still under development.
"latestVersionUnstable": false,
"fields": [
{ "name": "Topics", "type": "[]TopicData",
"versions": "0+", "fields": [

View File

@ -97,6 +97,13 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
assertNotEquals(-1, state.lastCaughtUpTimestamp)
}
}
if (version >= 2) {
val nodes = response.data.nodes().asScala
assertEquals(cluster.controllerIds().asScala, nodes.map(_.nodeId()).toSet)
val node = nodes.find(_.nodeId() == cluster.controllers().keySet().asScala.head)
assertEquals(cluster.controllerListenerName().get().value(), node.get.listeners().asScala.head.name())
}
}
}

View File

@ -18,7 +18,9 @@ package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.RaftVoterEndpoint;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
@ -167,7 +169,7 @@ public class MetadataQuorumCommand {
rows.addAll(quorumInfoToRows(leader, quorumInfo.observers().stream(), "Observer", humanReadable));
ToolsUtils.prettyPrintTable(
asList("NodeId", "LogEndOffset", "Lag", "LastFetchTimestamp", "LastCaughtUpTimestamp", "Status"),
asList("NodeId", "DirectoryId", "LogEndOffset", "Lag", "LastFetchTimestamp", "LastCaughtUpTimestamp", "Status"),
rows,
System.out
);
@ -186,6 +188,7 @@ public class MetadataQuorumCommand {
valueOf(info.lastCaughtUpTimestamp().getAsLong());
return Stream.of(
info.replicaId(),
info.replicaDirectoryId(),
info.logEndOffset(),
leader.logEndOffset() - info.logEndOffset(),
lastFetchTimestamp,
@ -232,9 +235,60 @@ public class MetadataQuorumCommand {
"\nHighWatermark: " + quorumInfo.highWatermark() +
"\nMaxFollowerLag: " + maxFollowerLag +
"\nMaxFollowerLagTimeMs: " + maxFollowerLagTimeMs +
"\nCurrentVoters: " + quorumInfo.voters().stream().map(QuorumInfo.ReplicaState::replicaId).map(Object::toString).collect(Collectors.joining(",", "[", "]")) +
"\nCurrentObservers: " + quorumInfo.observers().stream().map(QuorumInfo.ReplicaState::replicaId).map(Objects::toString).collect(Collectors.joining(",", "[", "]"))
"\nCurrentVoters: " + printVoterState(quorumInfo) +
"\nCurrentObservers: " + printObserverState(quorumInfo)
);
}
// Constructs the CurrentVoters string
// CurrentVoters: [{"id": 0, "directoryId": "UUID1", "endpoints": [{"name": "C", "securityProtocol": "SSL", "host": "controller-0", "port": 1234}]}, {"id": 1, ... }]}]
private static String printVoterState(QuorumInfo quorumInfo) {
return printReplicaState(quorumInfo, quorumInfo.voters());
}
// Constructs the CurrentObservers string
private static String printObserverState(QuorumInfo quorumInfo) {
return printReplicaState(quorumInfo, quorumInfo.observers());
}
private static String printReplicaState(QuorumInfo quorumInfo, List<QuorumInfo.ReplicaState> replicas) {
List<Node> currentVoterList = replicas.stream().map(voter -> new Node(
voter.replicaId(),
voter.replicaDirectoryId(),
getEndpoints(quorumInfo.nodes().get(voter.replicaId())))).collect(Collectors.toList());
return currentVoterList.stream().map(Objects::toString).collect(Collectors.joining(", ", "[", "]"));
}
private static List<RaftVoterEndpoint> getEndpoints(QuorumInfo.Node node) {
return node == null ? new ArrayList<>() : node.endpoints();
}
private static class Node {
private final int id;
private final Uuid directoryId;
private final List<RaftVoterEndpoint> endpoints;
private Node(int id, Uuid directoryId, List<RaftVoterEndpoint> endpoints) {
this.id = id;
this.directoryId = Objects.requireNonNull(directoryId);
this.endpoints = Objects.requireNonNull(endpoints);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{");
sb.append("\"id\": ").append(id).append(", ");
sb.append("\"directoryId\": ").append("\"").append(directoryId.equals(Uuid.ZERO_UUID) ? "null" : directoryId).append("\"");
if (!endpoints.isEmpty()) {
sb.append(", \"endpoints\": ");
for (RaftVoterEndpoint endpoint : endpoints) {
sb.append(endpoint.toString()).append(", ");
}
sb.setLength(sb.length() - 2); // remove the last comma and space
}
sb.append("}");
return sb.toString();
}
}
}

View File

@ -62,25 +62,30 @@ class MetadataQuorumCommandTest {
MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")
);
List<String> outputs = Arrays.stream(describeOutput.split("\n")).skip(1).collect(Collectors.toList());
List<String> outputs = Arrays.stream(describeOutput.split("\n")).collect(Collectors.toList());
String header = outputs.get(0);
List<String> data = outputs.subList(1, outputs.size());
assertTrue(header.matches("NodeId\\s+DirectoryId\\s+LogEndOffset\\s+Lag\\s+LastFetchTimestamp\\s+LastCaughtUpTimestamp\\s+Status\\s+"));
if (cluster.type() == Type.CO_KRAFT)
assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), outputs.size());
assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), data.size());
else
assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), outputs.size());
assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), data.size());
Pattern leaderPattern = Pattern.compile("\\d+\\s+\\d+\\s+\\d+\\s+[\\dmsago\\s]+-?[\\dmsago\\s]+Leader\\s*");
assertTrue(leaderPattern.matcher(outputs.get(0)).find());
assertTrue(outputs.stream().skip(1).noneMatch(o -> leaderPattern.matcher(o).find()));
Pattern leaderPattern = Pattern.compile("\\d+\\s+\\S+\\s+\\d+\\s+\\d+\\s+-?\\d+\\s+-?\\d+\\s+Leader\\s*");
assertTrue(leaderPattern.matcher(data.get(0)).find());
assertTrue(data.stream().skip(1).noneMatch(o -> leaderPattern.matcher(o).find()));
Pattern followerPattern = Pattern.compile("\\d+\\s+\\d+\\s+\\d+\\s+[\\dmsago\\s]+-?[\\dmsago\\s]+Follower\\s*");
assertEquals(cluster.config().numControllers() - 1, outputs.stream().filter(o -> followerPattern.matcher(o).find()).count());
Pattern followerPattern = Pattern.compile("\\d+\\s+\\S+\\s+\\d+\\s+\\d+\\s+-?\\d+\\s+-?\\d+\\s+Follower\\s*");
assertEquals(cluster.config().numControllers() - 1, data.stream().filter(o -> followerPattern.matcher(o).find()).count());
Pattern observerPattern = Pattern.compile("\\d+\\s+\\d+\\s+\\d+\\s+[\\dmsago\\s]+-?[\\dmsago\\s]+Observer\\s*");
Pattern observerPattern = Pattern.compile("\\d+\\s+\\S+\\s+\\d+\\s+\\d+\\s+-?\\d+\\s+-?\\d+\\s+Observer\\s*");
if (cluster.type() == Type.CO_KRAFT)
assertEquals(Math.max(0, cluster.config().numBrokers() - cluster.config().numControllers()),
outputs.stream().filter(o -> observerPattern.matcher(o).find()).count());
data.stream().filter(o -> observerPattern.matcher(o).find()).count());
else
assertEquals(cluster.config().numBrokers(), outputs.stream().filter(o -> observerPattern.matcher(o).find()).count());
assertEquals(cluster.config().numBrokers(), data.stream().filter(o -> observerPattern.matcher(o).find()).count());
}
/**
@ -113,7 +118,7 @@ class MetadataQuorumCommandTest {
assertTrue(outputs[4].matches("MaxFollowerLag:\\s+\\d+"), describeOutput);
assertTrue(outputs[5].matches("MaxFollowerLagTimeMs:\\s+-?\\d+"), describeOutput);
assertTrue(
outputs[6].matches("CurrentVoters:\\s+\\[\\d+(,\\d+)*]"),
outputs[6].matches("CurrentVoters:\\s+\\[\\{\"id\":\\s+\\d+,\\s+\"directoryId\":\\s+\\S+,\\s+\"endpoints\":\\s+.*}]"),
describeOutput
);
@ -122,7 +127,8 @@ class MetadataQuorumCommandTest {
assertTrue(outputs[7].matches("CurrentObservers:\\s+\\[]"), describeOutput);
} else {
assertTrue(
outputs[7].matches("CurrentObservers:\\s+\\[\\d+(,\\d+)*]"),
outputs[7].matches("CurrentObservers:\\s+\\[\\{\"id\":\\s+\\d+,\\s+\"directoryId\":\\s+\\S+}" +
"(,\\s+\\{\"id\":\\s+\\d+,\\s+\"directoryId\":\\s+\\S+})*]"),
describeOutput
);
}
@ -139,7 +145,7 @@ class MetadataQuorumCommandTest {
String replicationOutput = ToolsTestUtils.captureStandardOut(() ->
MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")
);
assertEquals("0", replicationOutput.split("\n")[1].split("\\s+")[2]);
assertEquals("0", replicationOutput.split("\n")[1].split("\\s+")[3]);
}
@Test
@ -183,9 +189,9 @@ class MetadataQuorumCommandTest {
private static void assertHumanReadable(String output) {
String dataRow = output.split("\n")[1];
String lastFetchTimestamp = dataRow.split("\t")[3];
String lastFetchTimestamp = dataRow.split("\t")[4];
String lastFetchTimestampValue = lastFetchTimestamp.split(" ")[0];
String lastCaughtUpTimestamp = dataRow.split("\t")[4];
String lastCaughtUpTimestamp = dataRow.split("\t")[5];
String lastCaughtUpTimestampValue = lastCaughtUpTimestamp.split(" ")[0];
assertTrue(lastFetchTimestamp.contains("ms ago"));
assertTrue(lastFetchTimestampValue.matches("\\d*"));