diff --git a/bin/kafka-cluster.sh b/bin/kafka-cluster.sh new file mode 100755 index 00000000000..574007e9cd4 --- /dev/null +++ b/bin/kafka-cluster.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.tools.ClusterTool "$@" diff --git a/build.gradle b/build.gradle index d9b185e1bd4..602645dbded 100644 --- a/build.gradle +++ b/build.gradle @@ -751,6 +751,7 @@ project(':core') { compile project(':clients') compile project(':metadata') compile project(':raft') + compile libs.argparse4j compile libs.jacksonDatabind compile libs.jacksonModuleScala compile libs.jacksonDataformatCsv diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index a7dd619a074..c647e9f80c0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -38,6 +38,7 @@ import org.apache.kafka.common.errors.ReplicaNotAvailableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.quota.ClientQuotaAlteration; @@ -69,6 +70,7 @@ public class MockAdminClient extends AdminClient { new HashMap<>(); private final Map beginningOffsets; private final Map endOffsets; + private final boolean usingRaftController; private final String clusterId; private final List> brokerLogDirs; private final List> brokerConfigs; @@ -90,6 +92,7 @@ public class MockAdminClient extends AdminClient { private Node controller = null; private List> brokerLogDirs = new ArrayList<>(); private Short defaultPartitions; + private boolean usingRaftController = false; private Integer defaultReplicationFactor; public Builder() { @@ -135,6 +138,11 @@ public class MockAdminClient extends AdminClient { return this; } + public Builder usingRaftController(boolean usingRaftController) { + this.usingRaftController = usingRaftController; + return this; + } + public Builder defaultPartitions(short numPartitions) { this.defaultPartitions = numPartitions; return this; @@ -146,7 +154,8 @@ public class MockAdminClient extends AdminClient { clusterId, defaultPartitions != null ? defaultPartitions.shortValue() : 1, defaultReplicationFactor != null ? defaultReplicationFactor.shortValue() : Math.min(brokers.size(), 3), - brokerLogDirs); + brokerLogDirs, + usingRaftController); } } @@ -156,7 +165,7 @@ public class MockAdminClient extends AdminClient { public MockAdminClient(List brokers, Node controller) { this(brokers, controller, DEFAULT_CLUSTER_ID, 1, brokers.size(), - Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS)); + Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS), false); } private MockAdminClient(List brokers, @@ -164,7 +173,8 @@ public class MockAdminClient extends AdminClient { String clusterId, int defaultPartitions, int defaultReplicationFactor, - List> brokerLogDirs) { + List> brokerLogDirs, + boolean usingRaftController) { this.brokers = brokers; controller(controller); this.clusterId = clusterId; @@ -177,6 +187,7 @@ public class MockAdminClient extends AdminClient { } this.beginningOffsets = new HashMap<>(); this.endOffsets = new HashMap<>(); + this.usingRaftController = usingRaftController; } synchronized public void controller(Node controller) { @@ -889,7 +900,13 @@ public class MockAdminClient extends AdminClient { @Override public DecommissionBrokerResult decommissionBroker(int brokerId, DecommissionBrokerOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); + if (usingRaftController) { + return new DecommissionBrokerResult(KafkaFuture.completedFuture(null)); + } else { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.completeExceptionally(new UnsupportedVersionException("")); + return new DecommissionBrokerResult(future); + } } @Override diff --git a/core/src/main/scala/kafka/tools/ClusterTool.scala b/core/src/main/scala/kafka/tools/ClusterTool.scala new file mode 100644 index 00000000000..f0d3d90a8cb --- /dev/null +++ b/core/src/main/scala/kafka/tools/ClusterTool.scala @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.io.PrintStream +import java.util.Properties +import java.util.concurrent.ExecutionException + +import kafka.utils.{Exit, Logging} +import net.sourceforge.argparse4j.ArgumentParsers +import net.sourceforge.argparse4j.impl.Arguments.store +import org.apache.kafka.clients.admin.Admin +import org.apache.kafka.common.errors.UnsupportedVersionException +import org.apache.kafka.common.utils.Utils + +object ClusterTool extends Logging { + def main(args: Array[String]): Unit = { + try { + val parser = ArgumentParsers. + newArgumentParser("kafka-cluster"). + defaultHelp(true). + description("The Kafka cluster tool.") + val subparsers = parser.addSubparsers().dest("command") + + val clusterIdParser = subparsers.addParser("cluster-id"). + help("Get information about the ID of a cluster.") + val decommissionParser = subparsers.addParser("decommission"). + help("Decommission a broker..") + List(clusterIdParser, decommissionParser).foreach(parser => { + parser.addArgument("--bootstrap-server", "-b"). + action(store()). + help("A list of host/port pairs to use for establishing the connection to the kafka cluster.") + parser.addArgument("--config", "-c"). + action(store()). + help("A property file containing configs to passed to AdminClient.") + }) + decommissionParser.addArgument("--id", "-i"). + `type`(classOf[Integer]). + action(store()). + help("The ID of the broker to decommission.") + + val namespace = parser.parseArgsOrFail(args) + val command = namespace.getString("command") + val configPath = namespace.getString("config") + val properties = if (configPath == null) { + new Properties() + } else { + Utils.loadProps(configPath) + } + Option(namespace.getString("bootstrap_server")). + foreach(b => properties.setProperty("bootstrap.servers", b)) + if (properties.getProperty("bootstrap.servers") == null) { + throw new TerseFailure("Please specify --bootstrap-server.") + } + + command match { + case "cluster-id" => + val adminClient = Admin.create(properties) + try { + clusterIdCommand(System.out, adminClient) + } finally { + adminClient.close() + } + Exit.exit(0) + case "decommission" => + val adminClient = Admin.create(properties) + try { + decommissionCommand(System.out, adminClient, namespace.getInt("id")) + } finally { + adminClient.close() + } + Exit.exit(0) + case _ => + throw new RuntimeException(s"Unknown command $command") + } + } catch { + case e: TerseFailure => + System.err.println(e.getMessage) + System.exit(1) + } + } + + def clusterIdCommand(stream: PrintStream, + adminClient: Admin): Unit = { + val clusterId = Option(adminClient.describeCluster().clusterId().get()) + clusterId match { + case None => stream.println(s"No cluster ID found. The Kafka version is probably too old.") + case Some(id) => stream.println(s"Cluster ID: ${id}") + } + } + + def decommissionCommand(stream: PrintStream, + adminClient: Admin, + id: Int): Unit = { + try { + Option(adminClient.decommissionBroker(id).all().get()) + stream.println(s"Broker ${id} is no longer registered. Note that if the broker " + + "is still running, or is restarted, it will re-register.") + } catch { + case e: ExecutionException => { + val cause = e.getCause() + if (cause.isInstanceOf[UnsupportedVersionException]) { + stream.println(s"The target cluster does not support broker decommissioning.") + } else { + throw e + } + } + } + } +} diff --git a/core/src/main/scala/kafka/tools/TerseFailure.scala b/core/src/main/scala/kafka/tools/TerseFailure.scala new file mode 100644 index 00000000000..c37b613d71f --- /dev/null +++ b/core/src/main/scala/kafka/tools/TerseFailure.scala @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import org.apache.kafka.common.KafkaException + +/** + * An exception thrown to indicate that the command has failed, but we don't want to + * print a stack trace. + * + * @param message The message to print out before exiting. A stack trace will not + * be printed. + */ +class TerseFailure(message: String) extends KafkaException(message) { +} diff --git a/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala b/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala new file mode 100644 index 00000000000..0ce100cf940 --- /dev/null +++ b/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.io.{ByteArrayOutputStream, PrintStream} +import org.apache.kafka.clients.admin.MockAdminClient +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.{Test, Timeout} + +@Timeout(value = 60) +class ClusterToolTest { + @Test + def testPrintClusterId(): Unit = { + val adminClient = new MockAdminClient.Builder(). + clusterId("QtNwvtfVQ3GEFpzOmDEE-w"). + build() + val stream = new ByteArrayOutputStream() + ClusterTool.clusterIdCommand(new PrintStream(stream), adminClient) + assertEquals( + s"""Cluster ID: QtNwvtfVQ3GEFpzOmDEE-w +""", stream.toString()) + } + + @Test + def testClusterTooOldToHaveId(): Unit = { + val adminClient = new MockAdminClient.Builder(). + clusterId(null). + build() + val stream = new ByteArrayOutputStream() + ClusterTool.clusterIdCommand(new PrintStream(stream), adminClient) + assertEquals( + s"""No cluster ID found. The Kafka version is probably too old. +""", stream.toString()) + } + + @Test + def testDecommissionBroker(): Unit = { + val adminClient = new MockAdminClient.Builder().numBrokers(3). + usingRaftController(true). + build() + val stream = new ByteArrayOutputStream() + ClusterTool.decommissionCommand(new PrintStream(stream), adminClient, 0) + assertEquals( + s"""Broker 0 is no longer registered. Note that if the broker is still running, or is restarted, it will re-register. +""", stream.toString()) + } + + @Test + def testLegacyModeClusterCannotDecommissionBroker(): Unit = { + val adminClient = new MockAdminClient.Builder().numBrokers(3). + usingRaftController(false). + build() + val stream = new ByteArrayOutputStream() + ClusterTool.decommissionCommand(new PrintStream(stream), adminClient, 0) + assertEquals( + s"""The target cluster does not support broker decommissioning. +""", stream.toString()) + } +}