KAFKA-19509: Improve error message when release version is wrong (#20185)
CI / build (push) Waiting to run Details

Improve the error message in the kafka-storage.sh when an incorrect
release-version is given. Specifically, following the behavior of
kafka-feature.sh, when an incorrect release-version is entered, it
returns the currently supported versions to the user.

Reviewers: TengYao Chi <frankvicky@apache.org>, Yung
 <yungyung7654321@gmail.com>
This commit is contained in:
Lan Ding 2025-07-18 11:39:55 +08:00 committed by GitHub
parent f81853ca88
commit 9572d19c59
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 29 additions and 3 deletions

View File

@ -128,9 +128,21 @@ object StorageTool extends Logging {
setIgnoreFormatted(namespace.getBoolean("ignore_formatted")).
setControllerListenerName(config.controllerListenerNames.get(0)).
setMetadataLogDirectory(config.metadataLogDir)
Option(namespace.getString("release_version")).foreach(
releaseVersion => formatter.
setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion)))
def metadataVersionsToString(first: MetadataVersion, last: MetadataVersion): String = {
val versions = MetadataVersion.VERSIONS.slice(first.ordinal, last.ordinal + 1)
versions.map(_.toString).mkString(", ")
}
Option(namespace.getString("release_version")).foreach(releaseVersion => {
try {
formatter.setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion))
} catch {
case _: Throwable =>
throw new TerseFailure(s"Unknown metadata.version $releaseVersion. Supported metadata.version are " +
s"${metadataVersionsToString(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestProduction())}")
}
})
Option(namespace.getList[String]("feature")).foreach(
featureNamesAndLevels(_).foreachEntry {
(k, v) => formatter.setFeatureLevel(k, v)

View File

@ -306,6 +306,20 @@ Found problem:
"Failed to find content in output: " + stream.toString())
}
@Test
def testFormatWithUnsupportedReleaseVersion(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultStaticQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
val failure = assertThrows(classOf[TerseFailure], () =>
runFormatCommand(stream, properties, Seq("--release-version", "3.3-IV1"))).getMessage
assertTrue(failure.contains("Unknown metadata.version 3.3-IV1"))
assertTrue(failure.contains(MetadataVersion.MINIMUM_VERSION.version))
assertTrue(failure.contains(MetadataVersion.latestProduction().version))
}
@Test
def testFormatWithReleaseVersionAsFeature(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())