mirror of https://github.com/apache/kafka.git
KAFKA-13914: Add command line tool kafka-metadata-quorum.sh (#12469)
Add `MetadataQuorumCommand` to describe quorum status, I'm trying to use arg4j style command format, currently, we only support one sub-command which is "describe" and we can specify 2 arguments which are --status and --replication. ``` # describe quorum status kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --replication ReplicaId LogEndOffset Lag LastFetchTimeMs LastCaughtUpTimeMs Status 0 10 0 -1 -1 Leader 1 10 0 -1 -1 Follower 2 10 0 -1 -1 Follower kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status ClusterId: fMCL8kv1SWm87L_Md-I2hg LeaderId: 3002 LeaderEpoch: 2 HighWatermark: 10 MaxFollowerLag: 0 MaxFollowerLagTimeMs: -1 CurrentVoters: [3000,3001,3002] CurrentObservers: [0,1,2] # specify AdminClient properties kafka-metadata-quorum.sh --bootstrap-server localhost:9092 --command-config config.properties describe --status ``` Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
parent
c7f051914e
commit
150fd5b0b1
|
@ -0,0 +1,17 @@
|
|||
#!/bin/bash
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
exec $(dirname $0)/kafka-run-class.sh kafka.admin.MetadataQuorumCommand "$@"
|
|
@ -0,0 +1,17 @@
|
|||
@echo off
|
||||
rem Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
rem contributor license agreements. See the NOTICE file distributed with
|
||||
rem this work for additional information regarding copyright ownership.
|
||||
rem The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
rem (the "License"); you may not use this file except in compliance with
|
||||
rem the License. You may obtain a copy of the License at
|
||||
rem
|
||||
rem http://www.apache.org/licenses/LICENSE-2.0
|
||||
rem
|
||||
rem Unless required by applicable law or agreed to in writing, software
|
||||
rem distributed under the License is distributed on an "AS IS" BASIS,
|
||||
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
rem See the License for the specific language governing permissions and
|
||||
rem limitations under the License.
|
||||
|
||||
"%~dp0kafka-run-class.bat" kafka.admin.MetadataQuorumCommand %*
|
|
@ -1705,6 +1705,7 @@ project(':tools') {
|
|||
|
||||
dependencies {
|
||||
implementation project(':clients')
|
||||
implementation project(':server-common')
|
||||
implementation project(':log4j-appender')
|
||||
implementation libs.argparse4j
|
||||
implementation libs.jacksonDatabind
|
||||
|
|
|
@ -366,6 +366,7 @@
|
|||
|
||||
<subpackage name="tools">
|
||||
<allow pkg="org.apache.kafka.common"/>
|
||||
<allow pkg="org.apache.kafka.server.util" />
|
||||
<allow pkg="org.apache.kafka.clients.admin" />
|
||||
<allow pkg="org.apache.kafka.clients.producer" />
|
||||
<allow pkg="org.apache.kafka.clients.consumer" />
|
||||
|
|
|
@ -4359,6 +4359,8 @@ public class KafkaAdminClient extends AdminClient {
|
|||
private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
|
||||
return new QuorumInfo(
|
||||
partition.leaderId(),
|
||||
partition.leaderEpoch(),
|
||||
partition.highWatermark(),
|
||||
partition.currentVoters().stream().map(v -> translateReplicaState(v)).collect(Collectors.toList()),
|
||||
partition.observers().stream().map(o -> translateReplicaState(o)).collect(Collectors.toList()));
|
||||
}
|
||||
|
|
|
@ -25,11 +25,15 @@ import java.util.OptionalLong;
|
|||
*/
|
||||
public class QuorumInfo {
|
||||
private final Integer leaderId;
|
||||
private final Integer leaderEpoch;
|
||||
private final Long highWatermark;
|
||||
private final List<ReplicaState> voters;
|
||||
private final List<ReplicaState> observers;
|
||||
|
||||
QuorumInfo(Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
|
||||
QuorumInfo(Integer leaderId, Integer leaderEpoch, Long highWatermark, List<ReplicaState> voters, List<ReplicaState> observers) {
|
||||
this.leaderId = leaderId;
|
||||
this.leaderEpoch = leaderEpoch;
|
||||
this.highWatermark = highWatermark;
|
||||
this.voters = voters;
|
||||
this.observers = observers;
|
||||
}
|
||||
|
@ -38,6 +42,14 @@ public class QuorumInfo {
|
|||
return leaderId;
|
||||
}
|
||||
|
||||
public Integer leaderEpoch() {
|
||||
return leaderEpoch;
|
||||
}
|
||||
|
||||
public Long highWatermark() {
|
||||
return highWatermark;
|
||||
}
|
||||
|
||||
public List<ReplicaState> voters() {
|
||||
return voters;
|
||||
}
|
||||
|
|
|
@ -608,7 +608,7 @@ public class KafkaAdminClientTest {
|
|||
}
|
||||
|
||||
private static QuorumInfo defaultQuorumInfo(Boolean emptyOptionals) {
|
||||
return new QuorumInfo(1,
|
||||
return new QuorumInfo(1, 1, 1L,
|
||||
singletonList(new QuorumInfo.ReplicaState(1, 100,
|
||||
emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000),
|
||||
emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000))),
|
||||
|
|
|
@ -0,0 +1,172 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package kafka.admin
|
||||
|
||||
import kafka.tools.TerseFailure
|
||||
import kafka.utils.Exit
|
||||
import net.sourceforge.argparse4j.ArgumentParsers
|
||||
import net.sourceforge.argparse4j.impl.Arguments.{fileType, storeTrue}
|
||||
import net.sourceforge.argparse4j.inf.Subparsers
|
||||
import org.apache.kafka.clients._
|
||||
import org.apache.kafka.clients.admin.{Admin, QuorumInfo}
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
import org.apache.kafka.server.util.ToolsUtils.prettyPrintTable
|
||||
|
||||
import java.io.File
|
||||
import java.util.Properties
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
/**
|
||||
* A tool for describing quorum status
|
||||
*/
|
||||
object MetadataQuorumCommand {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val res = mainNoExit(args)
|
||||
Exit.exit(res)
|
||||
}
|
||||
|
||||
def mainNoExit(args: Array[String]): Int = {
|
||||
val parser = ArgumentParsers
|
||||
.newArgumentParser("kafka-metadata-quorum")
|
||||
.defaultHelp(true)
|
||||
.description("This tool describes kraft metadata quorum status.")
|
||||
parser
|
||||
.addArgument("--bootstrap-server")
|
||||
.help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.")
|
||||
.required(true)
|
||||
|
||||
parser
|
||||
.addArgument("--command-config")
|
||||
.`type`(fileType())
|
||||
.help("Property file containing configs to be passed to Admin Client.")
|
||||
val subparsers = parser.addSubparsers().dest("command")
|
||||
addDescribeParser(subparsers)
|
||||
|
||||
var admin: Admin = null
|
||||
try {
|
||||
val namespace = parser.parseArgsOrFail(args)
|
||||
val command = namespace.getString("command")
|
||||
|
||||
val commandConfig = namespace.get[File]("command_config")
|
||||
val props = if (commandConfig != null) {
|
||||
if (!commandConfig.exists()) {
|
||||
throw new TerseFailure(s"Properties file ${commandConfig.getPath} does not exists!")
|
||||
}
|
||||
Utils.loadProps(commandConfig.getPath)
|
||||
} else {
|
||||
new Properties()
|
||||
}
|
||||
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server"))
|
||||
admin = Admin.create(props)
|
||||
|
||||
if (command == "describe") {
|
||||
if (namespace.getBoolean("status") && namespace.getBoolean("replication")) {
|
||||
throw new TerseFailure(s"Only one of --status or --replication should be specified with describe sub-command")
|
||||
} else if (namespace.getBoolean("replication")) {
|
||||
handleDescribeReplication(admin)
|
||||
} else if (namespace.getBoolean("status")) {
|
||||
handleDescribeStatus(admin)
|
||||
} else {
|
||||
throw new TerseFailure(s"One of --status or --replication must be specified with describe sub-command")
|
||||
}
|
||||
} else {
|
||||
throw new IllegalStateException(s"Unknown command: $command, only 'describe' is supported")
|
||||
}
|
||||
0
|
||||
} catch {
|
||||
case e: TerseFailure =>
|
||||
Console.err.println(e.getMessage)
|
||||
1
|
||||
} finally {
|
||||
if (admin != null) {
|
||||
admin.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def addDescribeParser(subparsers: Subparsers): Unit = {
|
||||
val describeParser = subparsers
|
||||
.addParser("describe")
|
||||
.help("Describe the metadata quorum info")
|
||||
|
||||
val statusArgs = describeParser.addArgumentGroup("Status")
|
||||
statusArgs
|
||||
.addArgument("--status")
|
||||
.help(
|
||||
"A short summary of the quorum status and the other provides detailed information about the status of replication.")
|
||||
.action(storeTrue())
|
||||
val replicationArgs = describeParser.addArgumentGroup("Replication")
|
||||
replicationArgs
|
||||
.addArgument("--replication")
|
||||
.help("Detailed information about the status of replication")
|
||||
.action(storeTrue())
|
||||
}
|
||||
|
||||
private def handleDescribeReplication(admin: Admin): Unit = {
|
||||
val quorumInfo = admin.describeMetadataQuorum.quorumInfo.get
|
||||
val leaderId = quorumInfo.leaderId
|
||||
val leader = quorumInfo.voters.asScala.filter(_.replicaId == leaderId).head
|
||||
|
||||
def convertQuorumInfo(infos: Seq[QuorumInfo.ReplicaState], status: String): Seq[Array[String]] =
|
||||
infos.map { info =>
|
||||
Array(info.replicaId,
|
||||
info.logEndOffset,
|
||||
leader.logEndOffset - info.logEndOffset,
|
||||
info.lastFetchTimeMs.orElse(-1),
|
||||
info.lastCaughtUpTimeMs.orElse(-1),
|
||||
status
|
||||
).map(_.toString)
|
||||
}
|
||||
prettyPrintTable(
|
||||
Array("NodeId", "LogEndOffset", "Lag", "LastFetchTimeMs", "LastCaughtUpTimeMs", "Status"),
|
||||
(convertQuorumInfo(Seq(leader), "Leader")
|
||||
++ convertQuorumInfo(quorumInfo.voters.asScala.filter(_.replicaId != leaderId).toSeq, "Follower")
|
||||
++ convertQuorumInfo(quorumInfo.observers.asScala.toSeq, "Observer")).asJava,
|
||||
scala.Console.out
|
||||
)
|
||||
}
|
||||
|
||||
private def handleDescribeStatus(admin: Admin): Unit = {
|
||||
val clusterId = admin.describeCluster.clusterId.get
|
||||
val quorumInfo = admin.describeMetadataQuorum.quorumInfo.get
|
||||
val leaderId = quorumInfo.leaderId
|
||||
val leader = quorumInfo.voters.asScala.filter(_.replicaId == leaderId).head
|
||||
val maxLagFollower = quorumInfo.voters.asScala
|
||||
.minBy(_.logEndOffset)
|
||||
val maxFollowerLag = leader.logEndOffset - maxLagFollower.logEndOffset
|
||||
val maxFollowerLagTimeMs =
|
||||
if (leader == maxLagFollower) {
|
||||
0
|
||||
} else if (leader.lastCaughtUpTimeMs.isPresent && maxLagFollower.lastCaughtUpTimeMs.isPresent) {
|
||||
leader.lastCaughtUpTimeMs.getAsLong - maxLagFollower.lastCaughtUpTimeMs.getAsLong
|
||||
} else {
|
||||
-1
|
||||
}
|
||||
println(
|
||||
s"""|ClusterId: $clusterId
|
||||
|LeaderId: ${quorumInfo.leaderId}
|
||||
|LeaderEpoch: ${quorumInfo.leaderEpoch}
|
||||
|HighWatermark: ${quorumInfo.highWatermark}
|
||||
|MaxFollowerLag: $maxFollowerLag
|
||||
|MaxFollowerLagTimeMs: $maxFollowerLagTimeMs
|
||||
|CurrentVoters: ${quorumInfo.voters.asScala.map(_.replicaId).mkString("[", ",", "]")}
|
||||
|CurrentObservers: ${quorumInfo.observers.asScala.map(_.replicaId).mkString("[", ",", "]")}
|
||||
|""".stripMargin
|
||||
)
|
||||
}
|
||||
}
|
|
@ -183,6 +183,10 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
|
|||
));
|
||||
}
|
||||
|
||||
public Collection<ControllerServer> controllerServers() {
|
||||
return controllers().collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterType clusterType() {
|
||||
return ClusterType.RAFT;
|
||||
|
|
|
@ -0,0 +1,192 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package kafka.admin
|
||||
|
||||
import kafka.test.ClusterInstance
|
||||
import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, ClusterTests, Type}
|
||||
import kafka.test.junit.ClusterTestExtensions
|
||||
import kafka.utils.TestUtils
|
||||
import org.apache.kafka.common.errors.UnsupportedVersionException
|
||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
|
||||
import org.junit.jupiter.api.{Tag, Test}
|
||||
import org.junit.jupiter.api.extension.ExtendWith
|
||||
|
||||
import java.util.concurrent.ExecutionException
|
||||
|
||||
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
|
||||
@ClusterTestDefaults(clusterType = Type.KRAFT)
|
||||
@Tag("integration")
|
||||
class MetadataQuorumCommandTest(cluster: ClusterInstance) {
|
||||
|
||||
/**
|
||||
* 1. The same number of broker controllers
|
||||
* 2. More brokers than controllers
|
||||
* 3. Fewer brokers than controllers
|
||||
*/
|
||||
@ClusterTests(
|
||||
Array(
|
||||
new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3),
|
||||
new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3),
|
||||
new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2),
|
||||
new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2),
|
||||
new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3),
|
||||
new ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3)
|
||||
))
|
||||
def testDescribeQuorumReplicationSuccessful(): Unit = {
|
||||
cluster.waitForReadyBrokers()
|
||||
val describeOutput = TestUtils.grabConsoleOutput(
|
||||
MetadataQuorumCommand.mainNoExit(
|
||||
Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication"))
|
||||
)
|
||||
|
||||
val leaderPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Leader\s+""".r
|
||||
val followerPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Follower\s+""".r
|
||||
val observerPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Observer\s+""".r
|
||||
val outputs = describeOutput.split("\n").tail
|
||||
if (cluster.config().clusterType() == Type.CO_KRAFT) {
|
||||
assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), outputs.length)
|
||||
} else {
|
||||
assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), outputs.length)
|
||||
}
|
||||
// `matches` is not supported in scala 2.12, use `findFirstIn` instead.
|
||||
assertTrue(leaderPattern.findFirstIn(outputs.head).nonEmpty)
|
||||
assertEquals(1, outputs.count(leaderPattern.findFirstIn(_).nonEmpty))
|
||||
assertEquals(cluster.config().numControllers() - 1, outputs.count(followerPattern.findFirstIn(_).nonEmpty))
|
||||
|
||||
if (cluster.config().clusterType() == Type.CO_KRAFT) {
|
||||
assertEquals(Math.max(0, cluster.config().numBrokers() - cluster.config().numControllers()), outputs.count(observerPattern.findFirstIn(_).nonEmpty))
|
||||
} else {
|
||||
assertEquals(cluster.config().numBrokers(), outputs.count(observerPattern.findFirstIn(_).nonEmpty))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 1. The same number of broker controllers
|
||||
* 2. More brokers than controllers
|
||||
* 3. Fewer brokers than controllers
|
||||
*/
|
||||
@ClusterTests(
|
||||
Array(
|
||||
new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3),
|
||||
new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3),
|
||||
new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2),
|
||||
new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2),
|
||||
new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3),
|
||||
new ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3)
|
||||
))
|
||||
def testDescribeQuorumStatusSuccessful(): Unit = {
|
||||
cluster.waitForReadyBrokers()
|
||||
val describeOutput = TestUtils.grabConsoleOutput(
|
||||
MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status"))
|
||||
)
|
||||
val outputs = describeOutput.split("\n")
|
||||
|
||||
assertTrue("""ClusterId:\s+\S{22}""".r.findFirstIn(outputs(0)).nonEmpty)
|
||||
assertTrue("""LeaderId:\s+\d+""".r.findFirstIn(outputs(1)).nonEmpty)
|
||||
assertTrue("""LeaderEpoch:\s+\d+""".r.findFirstIn(outputs(2)).nonEmpty)
|
||||
// HighWatermark may be -1
|
||||
assertTrue("""HighWatermark:\s+[-]?\d+""".r.findFirstIn(outputs(3)).nonEmpty)
|
||||
assertTrue("""MaxFollowerLag:\s+\d+""".r.findFirstIn(outputs(4)).nonEmpty)
|
||||
assertTrue("""MaxFollowerLagTimeMs:\s+[-]?\d+""".r.findFirstIn(outputs(5)).nonEmpty)
|
||||
assertTrue("""CurrentVoters:\s+\[\d+(,\d+)*\]""".r.findFirstIn(outputs(6)).nonEmpty)
|
||||
|
||||
// There are no observers if we have fewer brokers than controllers
|
||||
if (cluster.config().clusterType() == Type.CO_KRAFT
|
||||
&& cluster.config().numBrokers() <= cluster.config().numControllers()) {
|
||||
assertTrue("""CurrentObservers:\s+\[\]""".r.findFirstIn(outputs(7)).nonEmpty)
|
||||
} else {
|
||||
assertTrue("""CurrentObservers:\s+\[\d+(,\d+)*\]""".r.findFirstIn(outputs(7)).nonEmpty)
|
||||
}
|
||||
}
|
||||
|
||||
@ClusterTests(
|
||||
Array(new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1),
|
||||
new ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 1)))
|
||||
def testOnlyOneBrokerAndOneController(): Unit = {
|
||||
val statusOutput = TestUtils.grabConsoleOutput(
|
||||
MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status"))
|
||||
)
|
||||
assertEquals("MaxFollowerLag: 0", statusOutput.split("\n")(4))
|
||||
assertEquals("MaxFollowerLagTimeMs: 0", statusOutput.split("\n")(5))
|
||||
|
||||
val replicationOutput = TestUtils.grabConsoleOutput(
|
||||
MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication"))
|
||||
)
|
||||
assertEquals("0", replicationOutput.split("\n")(1).split("\\s+")(2))
|
||||
}
|
||||
|
||||
@ClusterTest(clusterType = Type.ZK, brokers = 3)
|
||||
def testDescribeQuorumInZkMode(): Unit = {
|
||||
assertTrue(
|
||||
assertThrows(
|
||||
classOf[ExecutionException],
|
||||
() =>
|
||||
MetadataQuorumCommand.mainNoExit(
|
||||
Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status"))
|
||||
).getCause.isInstanceOf[UnsupportedVersionException]
|
||||
)
|
||||
assertTrue(
|
||||
assertThrows(
|
||||
classOf[ExecutionException],
|
||||
() =>
|
||||
MetadataQuorumCommand.mainNoExit(
|
||||
Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication"))
|
||||
).getCause.isInstanceOf[UnsupportedVersionException]
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
class MetadataQuorumCommandErrorTest {
|
||||
|
||||
@Test
|
||||
def testPropertiesFileDoesNotExists(): Unit = {
|
||||
assertEquals(1,
|
||||
MetadataQuorumCommand.mainNoExit(
|
||||
Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe")))
|
||||
assertEquals(
|
||||
"Properties file admin.properties does not exists!",
|
||||
TestUtils
|
||||
.grabConsoleError(
|
||||
MetadataQuorumCommand.mainNoExit(
|
||||
Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe")))
|
||||
.trim
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testDescribeOptions(): Unit = {
|
||||
assertEquals(1, MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "describe")))
|
||||
assertEquals(
|
||||
"One of --status or --replication must be specified with describe sub-command",
|
||||
TestUtils
|
||||
.grabConsoleError(MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "describe")))
|
||||
.trim
|
||||
)
|
||||
|
||||
assertEquals(1,
|
||||
MetadataQuorumCommand.mainNoExit(
|
||||
Array("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication")))
|
||||
assertEquals(
|
||||
"Only one of --status or --replication should be specified with describe sub-command",
|
||||
TestUtils
|
||||
.grabConsoleError(
|
||||
MetadataQuorumCommand.mainNoExit(
|
||||
Array("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication")))
|
||||
.trim
|
||||
)
|
||||
}
|
||||
}
|
|
@ -74,6 +74,8 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
|
|||
|
||||
val leaderId = partitionData.leaderId
|
||||
assertTrue(leaderId > 0)
|
||||
assertTrue(partitionData.leaderEpoch() > 0)
|
||||
assertTrue(partitionData.highWatermark() > 0)
|
||||
|
||||
val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
|
||||
.getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
|
||||
|
|
|
@ -14,13 +14,17 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.tools;
|
||||
package org.apache.kafka.server.util;
|
||||
|
||||
import org.apache.kafka.common.Metric;
|
||||
import org.apache.kafka.common.MetricName;
|
||||
|
||||
import java.io.PrintStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class ToolsUtils {
|
||||
|
||||
|
@ -52,4 +56,49 @@ public class ToolsUtils {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void appendColumnValue(
|
||||
StringBuilder rowBuilder,
|
||||
String value,
|
||||
int length
|
||||
) {
|
||||
int padLength = length - value.length();
|
||||
rowBuilder.append(value);
|
||||
for (int i = 0; i < padLength; i++)
|
||||
rowBuilder.append(' ');
|
||||
}
|
||||
|
||||
private static void printRow(
|
||||
List<Integer> columnLengths,
|
||||
String[] row,
|
||||
PrintStream out
|
||||
) {
|
||||
StringBuilder rowBuilder = new StringBuilder();
|
||||
for (int i = 0; i < row.length; i++) {
|
||||
Integer columnLength = columnLengths.get(i);
|
||||
String columnValue = row[i];
|
||||
appendColumnValue(rowBuilder, columnValue, columnLength);
|
||||
rowBuilder.append('\t');
|
||||
}
|
||||
out.println(rowBuilder);
|
||||
}
|
||||
|
||||
public static void prettyPrintTable(
|
||||
String[] headers,
|
||||
List<String[]> rows,
|
||||
PrintStream out
|
||||
) {
|
||||
List<Integer> columnLengths = Arrays.stream(headers)
|
||||
.map(String::length)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
for (String[] row : rows) {
|
||||
for (int i = 0; i < headers.length; i++) {
|
||||
columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length()));
|
||||
}
|
||||
}
|
||||
|
||||
printRow(columnLengths, headers, out);
|
||||
rows.forEach(row -> printRow(columnLengths, row, out));
|
||||
}
|
||||
}
|
|
@ -43,6 +43,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException;
|
|||
import net.sourceforge.argparse4j.inf.Namespace;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.server.util.ToolsUtils;
|
||||
|
||||
public class ProducerPerformance {
|
||||
|
||||
|
|
|
@ -66,6 +66,7 @@ import java.util.stream.Collectors;
|
|||
import static java.util.Collections.singleton;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static net.sourceforge.argparse4j.impl.Arguments.store;
|
||||
import static org.apache.kafka.server.util.ToolsUtils.prettyPrintTable;
|
||||
|
||||
public abstract class TransactionsCommand {
|
||||
private static final Logger log = LoggerFactory.getLogger(TransactionsCommand.class);
|
||||
|
@ -903,51 +904,6 @@ public abstract class TransactionsCommand {
|
|||
}
|
||||
}
|
||||
|
||||
private static void appendColumnValue(
|
||||
StringBuilder rowBuilder,
|
||||
String value,
|
||||
int length
|
||||
) {
|
||||
int padLength = length - value.length();
|
||||
rowBuilder.append(value);
|
||||
for (int i = 0; i < padLength; i++)
|
||||
rowBuilder.append(' ');
|
||||
}
|
||||
|
||||
private static void printRow(
|
||||
List<Integer> columnLengths,
|
||||
String[] row,
|
||||
PrintStream out
|
||||
) {
|
||||
StringBuilder rowBuilder = new StringBuilder();
|
||||
for (int i = 0; i < row.length; i++) {
|
||||
Integer columnLength = columnLengths.get(i);
|
||||
String columnValue = row[i];
|
||||
appendColumnValue(rowBuilder, columnValue, columnLength);
|
||||
rowBuilder.append('\t');
|
||||
}
|
||||
out.println(rowBuilder);
|
||||
}
|
||||
|
||||
private static void prettyPrintTable(
|
||||
String[] headers,
|
||||
List<String[]> rows,
|
||||
PrintStream out
|
||||
) {
|
||||
List<Integer> columnLengths = Arrays.stream(headers)
|
||||
.map(String::length)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
for (String[] row : rows) {
|
||||
for (int i = 0; i < headers.length; i++) {
|
||||
columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length()));
|
||||
}
|
||||
}
|
||||
|
||||
printRow(columnLengths, headers, out);
|
||||
rows.forEach(row -> printRow(columnLengths, row, out));
|
||||
}
|
||||
|
||||
private static void printErrorAndExit(String message, Throwable t) {
|
||||
log.debug(message, t);
|
||||
|
||||
|
|
Loading…
Reference in New Issue