MINOR: Add ClusterTool as specified in KIP-631 (#10047)

Add ClusterTool as specified in KIP-631. It can report the current cluster ID, and also send the new RPC for removing broker registrations.

Reviewers: David Arthur <mumrah@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2021-02-08 12:07:39 -08:00 committed by GitHub
parent 8c7284275e
commit 1d3e293c08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 268 additions and 4 deletions

17
bin/kafka-cluster.sh Executable file
View File

@ -0,0 +1,17 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ClusterTool "$@"

View File

@ -751,6 +751,7 @@ project(':core') {
compile project(':clients') compile project(':clients')
compile project(':metadata') compile project(':metadata')
compile project(':raft') compile project(':raft')
compile libs.argparse4j
compile libs.jacksonDatabind compile libs.jacksonDatabind
compile libs.jacksonModuleScala compile libs.jacksonModuleScala
compile libs.jacksonDataformatCsv compile libs.jacksonDataformatCsv

View File

@ -38,6 +38,7 @@ import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaAlteration;
@ -69,6 +70,7 @@ public class MockAdminClient extends AdminClient {
new HashMap<>(); new HashMap<>();
private final Map<TopicPartition, Long> beginningOffsets; private final Map<TopicPartition, Long> beginningOffsets;
private final Map<TopicPartition, Long> endOffsets; private final Map<TopicPartition, Long> endOffsets;
private final boolean usingRaftController;
private final String clusterId; private final String clusterId;
private final List<List<String>> brokerLogDirs; private final List<List<String>> brokerLogDirs;
private final List<Map<String, String>> brokerConfigs; private final List<Map<String, String>> brokerConfigs;
@ -90,6 +92,7 @@ public class MockAdminClient extends AdminClient {
private Node controller = null; private Node controller = null;
private List<List<String>> brokerLogDirs = new ArrayList<>(); private List<List<String>> brokerLogDirs = new ArrayList<>();
private Short defaultPartitions; private Short defaultPartitions;
private boolean usingRaftController = false;
private Integer defaultReplicationFactor; private Integer defaultReplicationFactor;
public Builder() { public Builder() {
@ -135,6 +138,11 @@ public class MockAdminClient extends AdminClient {
return this; return this;
} }
public Builder usingRaftController(boolean usingRaftController) {
this.usingRaftController = usingRaftController;
return this;
}
public Builder defaultPartitions(short numPartitions) { public Builder defaultPartitions(short numPartitions) {
this.defaultPartitions = numPartitions; this.defaultPartitions = numPartitions;
return this; return this;
@ -146,7 +154,8 @@ public class MockAdminClient extends AdminClient {
clusterId, clusterId,
defaultPartitions != null ? defaultPartitions.shortValue() : 1, defaultPartitions != null ? defaultPartitions.shortValue() : 1,
defaultReplicationFactor != null ? defaultReplicationFactor.shortValue() : Math.min(brokers.size(), 3), defaultReplicationFactor != null ? defaultReplicationFactor.shortValue() : Math.min(brokers.size(), 3),
brokerLogDirs); brokerLogDirs,
usingRaftController);
} }
} }
@ -156,7 +165,7 @@ public class MockAdminClient extends AdminClient {
public MockAdminClient(List<Node> brokers, Node controller) { public MockAdminClient(List<Node> brokers, Node controller) {
this(brokers, controller, DEFAULT_CLUSTER_ID, 1, brokers.size(), this(brokers, controller, DEFAULT_CLUSTER_ID, 1, brokers.size(),
Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS)); Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS), false);
} }
private MockAdminClient(List<Node> brokers, private MockAdminClient(List<Node> brokers,
@ -164,7 +173,8 @@ public class MockAdminClient extends AdminClient {
String clusterId, String clusterId,
int defaultPartitions, int defaultPartitions,
int defaultReplicationFactor, int defaultReplicationFactor,
List<List<String>> brokerLogDirs) { List<List<String>> brokerLogDirs,
boolean usingRaftController) {
this.brokers = brokers; this.brokers = brokers;
controller(controller); controller(controller);
this.clusterId = clusterId; this.clusterId = clusterId;
@ -177,6 +187,7 @@ public class MockAdminClient extends AdminClient {
} }
this.beginningOffsets = new HashMap<>(); this.beginningOffsets = new HashMap<>();
this.endOffsets = new HashMap<>(); this.endOffsets = new HashMap<>();
this.usingRaftController = usingRaftController;
} }
synchronized public void controller(Node controller) { synchronized public void controller(Node controller) {
@ -889,7 +900,13 @@ public class MockAdminClient extends AdminClient {
@Override @Override
public DecommissionBrokerResult decommissionBroker(int brokerId, DecommissionBrokerOptions options) { public DecommissionBrokerResult decommissionBroker(int brokerId, DecommissionBrokerOptions options) {
throw new UnsupportedOperationException("Not implemented yet"); if (usingRaftController) {
return new DecommissionBrokerResult(KafkaFuture.completedFuture(null));
} else {
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
future.completeExceptionally(new UnsupportedVersionException(""));
return new DecommissionBrokerResult(future);
}
} }
@Override @Override

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import java.io.PrintStream
import java.util.Properties
import java.util.concurrent.ExecutionException
import kafka.utils.{Exit, Logging}
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.store
import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.utils.Utils
object ClusterTool extends Logging {
def main(args: Array[String]): Unit = {
try {
val parser = ArgumentParsers.
newArgumentParser("kafka-cluster").
defaultHelp(true).
description("The Kafka cluster tool.")
val subparsers = parser.addSubparsers().dest("command")
val clusterIdParser = subparsers.addParser("cluster-id").
help("Get information about the ID of a cluster.")
val decommissionParser = subparsers.addParser("decommission").
help("Decommission a broker..")
List(clusterIdParser, decommissionParser).foreach(parser => {
parser.addArgument("--bootstrap-server", "-b").
action(store()).
help("A list of host/port pairs to use for establishing the connection to the kafka cluster.")
parser.addArgument("--config", "-c").
action(store()).
help("A property file containing configs to passed to AdminClient.")
})
decommissionParser.addArgument("--id", "-i").
`type`(classOf[Integer]).
action(store()).
help("The ID of the broker to decommission.")
val namespace = parser.parseArgsOrFail(args)
val command = namespace.getString("command")
val configPath = namespace.getString("config")
val properties = if (configPath == null) {
new Properties()
} else {
Utils.loadProps(configPath)
}
Option(namespace.getString("bootstrap_server")).
foreach(b => properties.setProperty("bootstrap.servers", b))
if (properties.getProperty("bootstrap.servers") == null) {
throw new TerseFailure("Please specify --bootstrap-server.")
}
command match {
case "cluster-id" =>
val adminClient = Admin.create(properties)
try {
clusterIdCommand(System.out, adminClient)
} finally {
adminClient.close()
}
Exit.exit(0)
case "decommission" =>
val adminClient = Admin.create(properties)
try {
decommissionCommand(System.out, adminClient, namespace.getInt("id"))
} finally {
adminClient.close()
}
Exit.exit(0)
case _ =>
throw new RuntimeException(s"Unknown command $command")
}
} catch {
case e: TerseFailure =>
System.err.println(e.getMessage)
System.exit(1)
}
}
def clusterIdCommand(stream: PrintStream,
adminClient: Admin): Unit = {
val clusterId = Option(adminClient.describeCluster().clusterId().get())
clusterId match {
case None => stream.println(s"No cluster ID found. The Kafka version is probably too old.")
case Some(id) => stream.println(s"Cluster ID: ${id}")
}
}
def decommissionCommand(stream: PrintStream,
adminClient: Admin,
id: Int): Unit = {
try {
Option(adminClient.decommissionBroker(id).all().get())
stream.println(s"Broker ${id} is no longer registered. Note that if the broker " +
"is still running, or is restarted, it will re-register.")
} catch {
case e: ExecutionException => {
val cause = e.getCause()
if (cause.isInstanceOf[UnsupportedVersionException]) {
stream.println(s"The target cluster does not support broker decommissioning.")
} else {
throw e
}
}
}
}
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import org.apache.kafka.common.KafkaException
/**
* An exception thrown to indicate that the command has failed, but we don't want to
* print a stack trace.
*
* @param message The message to print out before exiting. A stack trace will not
* be printed.
*/
class TerseFailure(message: String) extends KafkaException(message) {
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import java.io.{ByteArrayOutputStream, PrintStream}
import org.apache.kafka.clients.admin.MockAdminClient
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Test, Timeout}
@Timeout(value = 60)
class ClusterToolTest {
@Test
def testPrintClusterId(): Unit = {
val adminClient = new MockAdminClient.Builder().
clusterId("QtNwvtfVQ3GEFpzOmDEE-w").
build()
val stream = new ByteArrayOutputStream()
ClusterTool.clusterIdCommand(new PrintStream(stream), adminClient)
assertEquals(
s"""Cluster ID: QtNwvtfVQ3GEFpzOmDEE-w
""", stream.toString())
}
@Test
def testClusterTooOldToHaveId(): Unit = {
val adminClient = new MockAdminClient.Builder().
clusterId(null).
build()
val stream = new ByteArrayOutputStream()
ClusterTool.clusterIdCommand(new PrintStream(stream), adminClient)
assertEquals(
s"""No cluster ID found. The Kafka version is probably too old.
""", stream.toString())
}
@Test
def testDecommissionBroker(): Unit = {
val adminClient = new MockAdminClient.Builder().numBrokers(3).
usingRaftController(true).
build()
val stream = new ByteArrayOutputStream()
ClusterTool.decommissionCommand(new PrintStream(stream), adminClient, 0)
assertEquals(
s"""Broker 0 is no longer registered. Note that if the broker is still running, or is restarted, it will re-register.
""", stream.toString())
}
@Test
def testLegacyModeClusterCannotDecommissionBroker(): Unit = {
val adminClient = new MockAdminClient.Builder().numBrokers(3).
usingRaftController(false).
build()
val stream = new ByteArrayOutputStream()
ClusterTool.decommissionCommand(new PrintStream(stream), adminClient, 0)
assertEquals(
s"""The target cluster does not support broker decommissioning.
""", stream.toString())
}
}