From 1f508ea8c33726f6a4690b4eabe9713b277a7dd8 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Mon, 8 Feb 2021 12:42:40 -0800 Subject: [PATCH] MINOR: Add StorageTool as specified in KIP-631 (#10043) Add StorageTool as specified in KIP-631. It can format and describe storage directories. Fix a bug in `ZkMetaProperties#toString`. Reviewers: David Arthur --- bin/kafka-storage.sh | 17 ++ .../server/BrokerMetadataCheckpoint.scala | 6 +- .../main/scala/kafka/tools/StorageTool.scala | 238 ++++++++++++++++++ .../unit/kafka/tools/StorageToolTest.scala | 187 ++++++++++++++ 4 files changed, 445 insertions(+), 3 deletions(-) create mode 100755 bin/kafka-storage.sh create mode 100644 core/src/main/scala/kafka/tools/StorageTool.scala create mode 100644 core/src/test/scala/unit/kafka/tools/StorageToolTest.scala diff --git a/bin/kafka-storage.sh b/bin/kafka-storage.sh new file mode 100755 index 00000000000..eef93423877 --- /dev/null +++ b/bin/kafka-storage.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.tools.StorageTool "$@" diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala index 93a0abe1793..e3d5c4b3126 100755 --- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala +++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala @@ -88,9 +88,9 @@ class RawMetaProperties(val props: Properties = new Properties()) { } override def toString: String = { - "RawMetaProperties(" + props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map { + "{" + props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map { key => key + "=" + props.get(key) - }.mkString(", ") + ")" + }.mkString(", ") + "}" } } @@ -130,7 +130,7 @@ case class ZkMetaProperties( } override def toString: String = { - s"LegacyMetaProperties(brokerId=$brokerId, clusterId=$clusterId)" + s"ZkMetaProperties(brokerId=$brokerId, clusterId=$clusterId)" } } diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala new file mode 100644 index 00000000000..ff84007a4ec --- /dev/null +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.io.PrintStream +import java.nio.file.{Files, Paths} + +import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties, RawMetaProperties} +import kafka.utils.{Exit, Logging} +import net.sourceforge.argparse4j.ArgumentParsers +import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue} +import org.apache.kafka.common.Uuid +import org.apache.kafka.common.utils.Utils + +import scala.collection.mutable + +object StorageTool extends Logging { + def main(args: Array[String]): Unit = { + try { + val parser = ArgumentParsers. + newArgumentParser("kafka-storage"). + defaultHelp(true). + description("The Kafka storage tool.") + val subparsers = parser.addSubparsers().dest("command") + + val infoParser = subparsers.addParser("info"). + help("Get information about the Kafka log directories on this node.") + val formatParser = subparsers.addParser("format"). + help("Format the Kafka log directories on this node.") + subparsers.addParser("random-uuid").help("Print a random UUID.") + List(infoParser, formatParser).foreach(parser => { + parser.addArgument("--config", "-c"). + action(store()). + required(true). + help("The Kafka configuration file to use.") + }) + formatParser.addArgument("--cluster-id", "-t"). + action(store()). + required(true). + help("The cluster ID to use.") + formatParser.addArgument("--ignore-formatted", "-g"). + action(storeTrue()) + + val namespace = parser.parseArgsOrFail(args) + val command = namespace.getString("command") + val config = Option(namespace.getString("config")).flatMap( + p => Some(new KafkaConfig(Utils.loadProps(p)))) + + command match { + case "info" => + val directories = configToLogDirectories(config.get) + val kip500Mode = configToKip500Mode(config.get) + Exit.exit(infoCommand(System.out, kip500Mode, directories)) + + case "format" => + val directories = configToLogDirectories(config.get) + val clusterId = namespace.getString("cluster_id") + val metaProperties = buildMetadataProperties(clusterId, config.get) + val ignoreFormatted = namespace.getBoolean("ignore_formatted") + if (!configToKip500Mode(config.get)) { + throw new TerseFailure("The kafka configuration file appears to be for " + + "a legacy cluster. Formatting is only supported for kip-500 clusters.") + } + Exit.exit(formatCommand(System.out, directories, metaProperties, ignoreFormatted )) + + case "random-uuid" => + System.out.println(Uuid.randomUuid) + Exit.exit(0) + + case _ => + throw new RuntimeException(s"Unknown command $command") + } + } catch { + case e: TerseFailure => + System.err.println(e.getMessage) + System.exit(1) + } + } + + def configToLogDirectories(config: KafkaConfig): Seq[String] = { + val directories = new mutable.TreeSet[String] + directories ++= config.logDirs + Option(config.metadataLogDir).foreach(directories.add) + directories.toSeq + } + + def configToKip500Mode(config: KafkaConfig): Boolean = config.processRoles.nonEmpty + + def infoCommand(stream: PrintStream, kip500Mode: Boolean, directories: Seq[String]): Int = { + val problems = new mutable.ArrayBuffer[String] + val foundDirectories = new mutable.ArrayBuffer[String] + var prevMetadata: Option[RawMetaProperties] = None + directories.sorted.foreach(directory => { + val directoryPath = Paths.get(directory) + if (!Files.isDirectory(directoryPath)) { + if (!Files.exists(directoryPath)) { + problems += s"$directoryPath does not exist" + } else { + problems += s"$directoryPath is not a directory" + } + } else { + foundDirectories += directoryPath.toString + val metaPath = directoryPath.resolve("meta.properties") + if (!Files.exists(metaPath)) { + problems += s"$directoryPath is not formatted." + } else { + val properties = Utils.loadProps(metaPath.toString) + val rawMetaProperties = new RawMetaProperties(properties) + + val curMetadata = rawMetaProperties.version match { + case 0 | 1 => Some(rawMetaProperties) + case v => + problems += s"Unsupported version for $metaPath: $v" + None + } + + if (prevMetadata.isEmpty) { + prevMetadata = curMetadata + } else { + if (!prevMetadata.get.equals(curMetadata.get)) { + problems += s"Metadata for $metaPath was ${curMetadata.get}, " + + s"but other directories featured ${prevMetadata.get}" + } + } + } + } + }) + + prevMetadata.foreach { prev => + if (kip500Mode) { + if (prev.version == 0) { + problems += "The kafka configuration file appears to be for a kip-500 cluster, but " + + "the directories are formatted for legacy mode." + } + } else if (prev.version == 1) { + problems += "The kafka configuration file appears to be for a legacy cluster, but " + + "the directories are formatted for kip-500." + } + } + + if (directories.isEmpty) { + stream.println("No directories specified.") + 0 + } else { + if (foundDirectories.nonEmpty) { + if (foundDirectories.size == 1) { + stream.println("Found log directory:") + } else { + stream.println("Found log directories:") + } + foundDirectories.foreach(d => stream.println(" %s".format(d))) + stream.println("") + } + + prevMetadata.foreach { prev => + stream.println(s"Found metadata: ${prev}") + stream.println("") + } + + if (problems.nonEmpty) { + if (problems.size == 1) { + stream.println("Found problem:") + } else { + stream.println("Found problems:") + } + problems.foreach(d => stream.println(" %s".format(d))) + stream.println("") + 1 + } else { + 0 + } + } + } + + def buildMetadataProperties( + clusterIdStr: String, + config: KafkaConfig + ): MetaProperties = { + val effectiveClusterId = try { + Uuid.fromString(clusterIdStr) + } catch { + case e: Throwable => throw new TerseFailure(s"Cluster ID string $clusterIdStr " + + s"does not appear to be a valid UUID: ${e.getMessage}") + } + require(config.nodeId >= 0, s"The node.id must be set to a non-negative integer.") + new MetaProperties(effectiveClusterId, config.nodeId) + } + + def formatCommand(stream: PrintStream, + directories: Seq[String], + metaProperties: MetaProperties, + ignoreFormatted: Boolean): Int = { + if (directories.isEmpty) { + throw new TerseFailure("No log directories found in the configuration.") + } + val unformattedDirectories = directories.filter(directory => { + if (!Files.isDirectory(Paths.get(directory)) || !Files.exists(Paths.get(directory, "meta.properties"))) { + true + } else if (!ignoreFormatted) { + throw new TerseFailure(s"Log directory ${directory} is already formatted. " + + "Use --ignore-formatted to ignore this directory and format the others.") + } else { + false + } + }) + if (unformattedDirectories.isEmpty) { + stream.println("All of the log directories are already formatted.") + } + unformattedDirectories.foreach(directory => { + try { + Files.createDirectories(Paths.get(directory)) + } catch { + case e: Throwable => throw new TerseFailure(s"Unable to create storage " + + s"directory ${directory}: ${e.getMessage}") + } + val metaPropertiesPath = Paths.get(directory, "meta.properties") + val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile) + checkpoint.write(metaProperties.toProperties) + stream.println(s"Formatting ${directory}") + }) + 0 + } +} diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala new file mode 100644 index 00000000000..d601e36fcf3 --- /dev/null +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.io.{ByteArrayOutputStream, PrintStream} +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.util +import java.util.Properties + +import kafka.server.{KafkaConfig, MetaProperties} +import kafka.utils.TestUtils +import org.apache.kafka.common.Uuid +import org.apache.kafka.common.utils.Utils +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} +import org.junit.jupiter.api.{Test, Timeout} + + +@Timeout(value = 40) +class StorageToolTest { + private def newKip500Properties() = { + val properties = new Properties() + properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo,/tmp/bar") + properties.setProperty(KafkaConfig.ProcessRolesProp, "controller") + properties.setProperty(KafkaConfig.NodeIdProp, "2") + properties + } + + @Test + def testConfigToLogDirectories(): Unit = { + val config = new KafkaConfig(newKip500Properties()) + assertEquals(Seq("/tmp/bar", "/tmp/foo"), StorageTool.configToLogDirectories(config)) + } + + @Test + def testConfigToLogDirectoriesWithMetaLogDir(): Unit = { + val properties = newKip500Properties() + properties.setProperty(KafkaConfig.MetadataLogDirProp, "/tmp/baz") + val config = new KafkaConfig(properties) + assertEquals(Seq("/tmp/bar", "/tmp/baz", "/tmp/foo"), + StorageTool.configToLogDirectories(config)) + } + + @Test + def testInfoCommandOnEmptyDirectory(): Unit = { + val stream = new ByteArrayOutputStream() + val tempDir = TestUtils.tempDir() + try { + assertEquals(1, StorageTool. + infoCommand(new PrintStream(stream), true, Seq(tempDir.toString))) + assertEquals(s"""Found log directory: + ${tempDir.toString} + +Found problem: + ${tempDir.toString} is not formatted. + +""", stream.toString()) + } finally Utils.delete(tempDir) + } + + @Test + def testInfoCommandOnMissingDirectory(): Unit = { + val stream = new ByteArrayOutputStream() + val tempDir = TestUtils.tempDir() + tempDir.delete() + try { + assertEquals(1, StorageTool. + infoCommand(new PrintStream(stream), true, Seq(tempDir.toString))) + assertEquals(s"""Found problem: + ${tempDir.toString} does not exist + +""", stream.toString()) + } finally Utils.delete(tempDir) + } + + @Test + def testInfoCommandOnDirectoryAsFile(): Unit = { + val stream = new ByteArrayOutputStream() + val tempFile = TestUtils.tempFile() + try { + assertEquals(1, StorageTool. + infoCommand(new PrintStream(stream), true, Seq(tempFile.toString))) + assertEquals(s"""Found problem: + ${tempFile.toString} is not a directory + +""", stream.toString()) + } finally tempFile.delete() + } + + @Test + def testInfoWithMismatchedLegacyKafkaConfig(): Unit = { + val stream = new ByteArrayOutputStream() + val tempDir = TestUtils.tempDir() + try { + Files.write(tempDir.toPath.resolve("meta.properties"), + String.join("\n", util.Arrays.asList( + "version=1", + "cluster.id=XcZZOzUqS4yHOjhMQB6JLQ")). + getBytes(StandardCharsets.UTF_8)) + assertEquals(1, StorageTool. + infoCommand(new PrintStream(stream), false, Seq(tempDir.toString))) + assertEquals(s"""Found log directory: + ${tempDir.toString} + +Found metadata: {cluster.id=XcZZOzUqS4yHOjhMQB6JLQ, version=1} + +Found problem: + The kafka configuration file appears to be for a legacy cluster, but the directories are formatted for kip-500. + +""", stream.toString()) + } finally Utils.delete(tempDir) + } + + @Test + def testInfoWithMismatchedKip500KafkaConfig(): Unit = { + val stream = new ByteArrayOutputStream() + val tempDir = TestUtils.tempDir() + try { + Files.write(tempDir.toPath.resolve("meta.properties"), + String.join("\n", util.Arrays.asList( + "version=0", + "broker.id=1", + "cluster.id=26c36907-4158-4a35-919d-6534229f5241")). + getBytes(StandardCharsets.UTF_8)) + assertEquals(1, StorageTool. + infoCommand(new PrintStream(stream), true, Seq(tempDir.toString))) + assertEquals(s"""Found log directory: + ${tempDir.toString} + +Found metadata: {broker.id=1, cluster.id=26c36907-4158-4a35-919d-6534229f5241, version=0} + +Found problem: + The kafka configuration file appears to be for a kip-500 cluster, but the directories are formatted for legacy mode. + +""", stream.toString()) + } finally Utils.delete(tempDir) + } + + @Test + def testFormatEmptyDirectory(): Unit = { + val tempDir = TestUtils.tempDir() + try { + val metaProperties = MetaProperties( + clusterId = Uuid.fromString("XcZZOzUqS4yHOjhMQB6JLQ"), nodeId = 2) + val stream = new ByteArrayOutputStream() + assertEquals(0, StorageTool. + formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, false)) + assertEquals("Formatting %s%n".format(tempDir), stream.toString()) + + try assertEquals(1, StorageTool. + formatCommand(new PrintStream(new ByteArrayOutputStream()), Seq(tempDir.toString), metaProperties, false)) catch { + case e: TerseFailure => assertEquals(s"Log directory ${tempDir} is already " + + "formatted. Use --ignore-formatted to ignore this directory and format the " + + "others.", e.getMessage) + } + + val stream2 = new ByteArrayOutputStream() + assertEquals(0, StorageTool. + formatCommand(new PrintStream(stream2), Seq(tempDir.toString), metaProperties, true)) + assertEquals("All of the log directories are already formatted.%n".format(), stream2.toString()) + } finally Utils.delete(tempDir) + } + + @Test + def testFormatWithInvalidClusterId(): Unit = { + val config = new KafkaConfig(newKip500Properties()) + assertEquals("Cluster ID string invalid does not appear to be a valid UUID: " + + "Input string `invalid` decoded as 5 bytes, which is not equal to the expected " + + "16 bytes of a base64-encoded UUID", assertThrows(classOf[TerseFailure], + () => StorageTool.buildMetadataProperties("invalid", config)).getMessage) + } +}