From 9572d19c5911f401a155741fcce61b69657aae79 Mon Sep 17 00:00:00 2001 From: Lan Ding Date: Fri, 18 Jul 2025 11:39:55 +0800 Subject: [PATCH] KAFKA-19509: Improve error message when release version is wrong (#20185) Improve the error message in the kafka-storage.sh when an incorrect release-version is given. Specifically, following the behavior of kafka-feature.sh, when an incorrect release-version is entered, it returns the currently supported versions to the user. Reviewers: TengYao Chi , Yung --- .../main/scala/kafka/tools/StorageTool.scala | 18 +++++++++++++++--- .../unit/kafka/tools/StorageToolTest.scala | 14 ++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 65307e0d766..66aabb8f4d1 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -128,9 +128,21 @@ object StorageTool extends Logging { setIgnoreFormatted(namespace.getBoolean("ignore_formatted")). setControllerListenerName(config.controllerListenerNames.get(0)). setMetadataLogDirectory(config.metadataLogDir) - Option(namespace.getString("release_version")).foreach( - releaseVersion => formatter. - setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion))) + + def metadataVersionsToString(first: MetadataVersion, last: MetadataVersion): String = { + val versions = MetadataVersion.VERSIONS.slice(first.ordinal, last.ordinal + 1) + versions.map(_.toString).mkString(", ") + } + Option(namespace.getString("release_version")).foreach(releaseVersion => { + try { + formatter.setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion)) + } catch { + case _: Throwable => + throw new TerseFailure(s"Unknown metadata.version $releaseVersion. Supported metadata.version are " + + s"${metadataVersionsToString(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestProduction())}") + } + }) + Option(namespace.getList[String]("feature")).foreach( featureNamesAndLevels(_).foreachEntry { (k, v) => formatter.setFeatureLevel(k, v) diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 91e8d9fd33e..4d271d594cf 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -306,6 +306,20 @@ Found problem: "Failed to find content in output: " + stream.toString()) } + @Test + def testFormatWithUnsupportedReleaseVersion(): Unit = { + val availableDirs = Seq(TestUtils.tempDir()) + val properties = new Properties() + properties.putAll(defaultStaticQuorumProperties) + properties.setProperty("log.dirs", availableDirs.mkString(",")) + val stream = new ByteArrayOutputStream() + val failure = assertThrows(classOf[TerseFailure], () => + runFormatCommand(stream, properties, Seq("--release-version", "3.3-IV1"))).getMessage + assertTrue(failure.contains("Unknown metadata.version 3.3-IV1")) + assertTrue(failure.contains(MetadataVersion.MINIMUM_VERSION.version)) + assertTrue(failure.contains(MetadataVersion.latestProduction().version)) + } + @Test def testFormatWithReleaseVersionAsFeature(): Unit = { val availableDirs = Seq(TestUtils.tempDir())