mirror of https://github.com/apache/kafka.git
KAFKA-17868: Do not ignore --feature flag in kafka-storage.sh (#17597)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
parent
e4935aaf8e
commit
d7ac865fb0
|
|
@ -134,6 +134,10 @@ object StorageTool extends Logging {
|
|||
case None => Option(config.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).
|
||||
foreach(v => formatter.setReleaseVersion(MetadataVersion.fromVersionString(v.toString)))
|
||||
}
|
||||
Option(namespace.getList[String]("feature")).foreach(
|
||||
featureNamesAndLevels(_).foreachEntry {
|
||||
(k, v) => formatter.setFeatureLevel(k, v)
|
||||
})
|
||||
Option(namespace.getString("initial_controllers")).
|
||||
foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v)))
|
||||
if (namespace.getBoolean("standalone")) {
|
||||
|
|
@ -466,4 +470,26 @@ object StorageTool extends Logging {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
def parseNameAndLevel(input: String): (String, java.lang.Short) = {
|
||||
val equalsIndex = input.indexOf("=")
|
||||
if (equalsIndex < 0)
|
||||
throw new RuntimeException("Can't parse feature=level string " + input + ": equals sign not found.")
|
||||
val name = input.substring(0, equalsIndex).trim
|
||||
val levelString = input.substring(equalsIndex + 1).trim
|
||||
try {
|
||||
(name, levelString.toShort)
|
||||
} catch {
|
||||
case _: Throwable =>
|
||||
throw new RuntimeException("Can't parse feature=level string " + input + ": " + "unable to parse " + levelString + " as a short.")
|
||||
}
|
||||
}
|
||||
|
||||
def featureNamesAndLevels(features: java.util.List[String]): Map[String, java.lang.Short] = {
|
||||
features.asScala.map { (feature: String) =>
|
||||
// Ensure the feature exists
|
||||
val nameAndLevel = parseNameAndLevel(feature)
|
||||
(nameAndLevel._1, nameAndLevel._2)
|
||||
}.toMap
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -325,10 +325,35 @@ Found problem:
|
|||
properties.setProperty("log.dirs", availableDirs.mkString(","))
|
||||
val stream = new ByteArrayOutputStream()
|
||||
assertEquals(0, runFormatCommand(stream, properties, Seq("--feature", "metadata.version=20")))
|
||||
assertTrue(stream.toString().contains("4.0-IV0"),
|
||||
assertTrue(stream.toString().contains("3.8-IV0"),
|
||||
"Failed to find content in output: " + stream.toString())
|
||||
}
|
||||
|
||||
@Test
|
||||
def testFormatWithInvalidFeature(): Unit = {
|
||||
val availableDirs = Seq(TestUtils.tempDir())
|
||||
val properties = new Properties()
|
||||
properties.putAll(defaultStaticQuorumProperties)
|
||||
properties.setProperty("log.dirs", availableDirs.mkString(","))
|
||||
assertEquals("Unsupported feature: non.existent.feature. Supported features are: " +
|
||||
"group.version, kraft.version, transaction.version",
|
||||
assertThrows(classOf[FormatterException], () =>
|
||||
runFormatCommand(new ByteArrayOutputStream(), properties,
|
||||
Seq("--feature", "non.existent.feature=20"))).getMessage)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testFormatWithInvalidKRaftVersionLevel(): Unit = {
|
||||
val availableDirs = Seq(TestUtils.tempDir())
|
||||
val properties = new Properties()
|
||||
properties.putAll(defaultDynamicQuorumProperties)
|
||||
properties.setProperty("log.dirs", availableDirs.mkString(","))
|
||||
assertEquals("No feature:kraft.version with feature level 999",
|
||||
assertThrows(classOf[IllegalArgumentException], () =>
|
||||
runFormatCommand(new ByteArrayOutputStream(), properties,
|
||||
Seq("--feature", "kraft.version=999", "--standalone"))).getMessage)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testFormatWithReleaseVersionAndKRaftVersion(): Unit = {
|
||||
val availableDirs = Seq(TestUtils.tempDir())
|
||||
|
|
@ -711,4 +736,25 @@ Found problem:
|
|||
"SCRAM is only supported in metadata.version 3.5-IV2 or later.",
|
||||
assertThrows(classOf[FormatterException], () => runFormatCommand(stream, properties, arguments.toSeq)).getMessage)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testParseNameAndLevel(): Unit = {
|
||||
assertEquals(("foo.bar", 56.toShort), StorageTool.parseNameAndLevel("foo.bar=56"))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testParseNameAndLevelWithNoEquals(): Unit = {
|
||||
assertEquals("Can't parse feature=level string kraft.version5: equals sign not found.",
|
||||
assertThrows(classOf[RuntimeException],
|
||||
() => StorageTool.parseNameAndLevel("kraft.version5")).
|
||||
getMessage)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testParseNameAndLevelWithNoNumber(): Unit = {
|
||||
assertEquals("Can't parse feature=level string kraft.version=foo: unable to parse foo as a short.",
|
||||
assertThrows(classOf[RuntimeException],
|
||||
() => StorageTool.parseNameAndLevel("kraft.version=foo")).
|
||||
getMessage)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue