mirror of https://github.com/apache/kafka.git
KAFKA-14592: Move FeatureCommand to tools (#13459)
KAFKA-14592: Move FeatureCommand to tools Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
parent
d83a734c41
commit
ea540fa400
|
@ -14,4 +14,4 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
exec $(dirname $0)/kafka-run-class.sh kafka.admin.FeatureCommand "$@"
|
||||
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.FeatureCommand "$@"
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
@echo off
|
||||
rem Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
rem contributor license agreements. See the NOTICE file distributed with
|
||||
rem this work for additional information regarding copyright ownership.
|
||||
rem The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
rem (the "License"); you may not use this file except in compliance with
|
||||
rem the License. You may obtain a copy of the License at
|
||||
rem
|
||||
rem http://www.apache.org/licenses/LICENSE-2.0
|
||||
rem
|
||||
rem Unless required by applicable law or agreed to in writing, software
|
||||
rem distributed under the License is distributed on an "AS IS" BASIS,
|
||||
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
rem See the License for the specific language governing permissions and
|
||||
rem limitations under the License.
|
||||
|
||||
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.FeatureCommand %*
|
|
@ -283,6 +283,7 @@
|
|||
<subpackage name="tools">
|
||||
<allow pkg="org.apache.kafka.common"/>
|
||||
<allow pkg="org.apache.kafka.server.util" />
|
||||
<allow pkg="org.apache.kafka.server.common" />
|
||||
<allow pkg="org.apache.kafka.clients" />
|
||||
<allow pkg="org.apache.kafka.clients.admin" />
|
||||
<allow pkg="org.apache.kafka.clients.producer" />
|
||||
|
|
|
@ -17,306 +17,12 @@
|
|||
|
||||
package kafka.admin
|
||||
|
||||
import kafka.tools.TerseFailure
|
||||
import kafka.utils.Exit
|
||||
import net.sourceforge.argparse4j.ArgumentParsers
|
||||
import net.sourceforge.argparse4j.impl.Arguments.{append, fileType, store, storeTrue}
|
||||
import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace, Subparsers}
|
||||
import net.sourceforge.argparse4j.internal.HelpScreenException
|
||||
import org.apache.kafka.clients.CommonClientConfigs
|
||||
import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType
|
||||
import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, UpdateFeaturesOptions}
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
import org.apache.kafka.server.common.MetadataVersion
|
||||
|
||||
import java.io.{File, PrintStream}
|
||||
import java.util.Properties
|
||||
import scala.concurrent.ExecutionException
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.compat.java8.OptionConverters._
|
||||
|
||||
@deprecated(since = "3.5")
|
||||
object FeatureCommand {
|
||||
def main(args: Array[String]): Unit = {
|
||||
val res = mainNoExit(args, System.out)
|
||||
Exit.exit(res)
|
||||
}
|
||||
|
||||
// This is used for integration tests in order to avoid killing the test with Exit.exit,
|
||||
// and in order to capture the command output.
|
||||
def mainNoExit(
|
||||
args: Array[String],
|
||||
out: PrintStream
|
||||
): Int = {
|
||||
val parser = ArgumentParsers.newArgumentParser("kafka-features")
|
||||
.defaultHelp(true)
|
||||
.description("This tool manages feature flags in Kafka.")
|
||||
parser.addArgument("--bootstrap-server")
|
||||
.help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.")
|
||||
.required(true)
|
||||
|
||||
parser.addArgument("--command-config")
|
||||
.`type`(fileType())
|
||||
.help("Property file containing configs to be passed to Admin Client.")
|
||||
val subparsers = parser.addSubparsers().dest("command")
|
||||
addDescribeParser(subparsers)
|
||||
addUpgradeParser(subparsers)
|
||||
addDowngradeParser(subparsers)
|
||||
addDisableParser(subparsers)
|
||||
|
||||
try {
|
||||
val namespace = parser.parseArgs(args)
|
||||
val command = namespace.getString("command")
|
||||
|
||||
val commandConfig = namespace.get[File]("command_config")
|
||||
val props = if (commandConfig != null) {
|
||||
if (!commandConfig.exists()) {
|
||||
throw new TerseFailure(s"Properties file ${commandConfig.getPath} does not exists!")
|
||||
}
|
||||
Utils.loadProps(commandConfig.getPath)
|
||||
} else {
|
||||
new Properties()
|
||||
}
|
||||
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server"))
|
||||
val admin = Admin.create(props)
|
||||
try {
|
||||
command match {
|
||||
case "describe" => handleDescribe(out, admin)
|
||||
case "upgrade" => handleUpgrade(out, namespace, admin)
|
||||
case "downgrade" => handleDowngrade(out, namespace, admin)
|
||||
case "disable" => handleDisable(out, namespace, admin)
|
||||
}
|
||||
} finally {
|
||||
admin.close()
|
||||
}
|
||||
0
|
||||
} catch {
|
||||
case _: HelpScreenException =>
|
||||
0
|
||||
case e: ArgumentParserException =>
|
||||
System.err.println(s"Command line error: ${e.getMessage}. Type --help for help.")
|
||||
1
|
||||
case e: TerseFailure =>
|
||||
System.err.println(e.getMessage)
|
||||
1
|
||||
}
|
||||
}
|
||||
|
||||
def addDescribeParser(subparsers: Subparsers): Unit = {
|
||||
subparsers.addParser("describe")
|
||||
.help("Describes the current active feature flags.")
|
||||
}
|
||||
|
||||
def addUpgradeParser(subparsers: Subparsers): Unit = {
|
||||
val upgradeParser = subparsers.addParser("upgrade")
|
||||
.help("Upgrade one or more feature flags.")
|
||||
upgradeParser.addArgument("--metadata")
|
||||
.help("The level to which we should upgrade the metadata. For example, 3.3-IV3.")
|
||||
.action(store())
|
||||
upgradeParser.addArgument("--feature")
|
||||
.help("A feature upgrade we should perform, in feature=level format. For example: `metadata.version=5`.")
|
||||
.action(append())
|
||||
upgradeParser.addArgument("--dry-run")
|
||||
.help("Validate this upgrade, but do not perform it.")
|
||||
.action(storeTrue())
|
||||
}
|
||||
|
||||
def addDowngradeParser(subparsers: Subparsers): Unit = {
|
||||
val downgradeParser = subparsers.addParser("downgrade")
|
||||
.help("Downgrade one or more feature flags.")
|
||||
downgradeParser.addArgument("--metadata")
|
||||
.help("The level to which we should downgrade the metadata. For example, 3.3-IV0.")
|
||||
.action(store())
|
||||
downgradeParser.addArgument("--feature")
|
||||
.help("A feature downgrade we should perform, in feature=level format. For example: `metadata.version=5`.")
|
||||
.action(append())
|
||||
downgradeParser.addArgument("--unsafe")
|
||||
.help("Perform this downgrade even if it may irreversibly destroy metadata.")
|
||||
.action(storeTrue())
|
||||
downgradeParser.addArgument("--dry-run")
|
||||
.help("Validate this downgrade, but do not perform it.")
|
||||
.action(storeTrue())
|
||||
}
|
||||
|
||||
def addDisableParser(subparsers: Subparsers): Unit = {
|
||||
val disableParser = subparsers.addParser("disable")
|
||||
.help("Disable one or more feature flags. This is the same as downgrading the version to zero.")
|
||||
disableParser.addArgument("--feature")
|
||||
.help("A feature flag to disable.")
|
||||
.action(append())
|
||||
disableParser.addArgument("--unsafe")
|
||||
.help("Disable this feature flag even if it may irreversibly destroy metadata.")
|
||||
.action(storeTrue())
|
||||
disableParser.addArgument("--dry-run")
|
||||
.help("Perform a dry-run of this disable operation.")
|
||||
.action(storeTrue())
|
||||
}
|
||||
|
||||
def levelToString(
|
||||
feature: String,
|
||||
level: Short
|
||||
): String = {
|
||||
if (feature.equals(MetadataVersion.FEATURE_NAME))
|
||||
try MetadataVersion.fromFeatureLevel(level).version
|
||||
catch { case _: Throwable => s"UNKNOWN [$level]"}
|
||||
else
|
||||
level.toString
|
||||
}
|
||||
|
||||
def handleDescribe(
|
||||
out: PrintStream,
|
||||
admin: Admin
|
||||
): Unit = {
|
||||
val featureMetadata = admin.describeFeatures().featureMetadata().get()
|
||||
val featureList = new java.util.TreeSet[String](featureMetadata.supportedFeatures().keySet())
|
||||
featureList.forEach { feature =>
|
||||
val finalizedLevel = featureMetadata.finalizedFeatures().asScala.get(feature) match {
|
||||
case None => 0.toShort
|
||||
case Some(v) => v.maxVersionLevel()
|
||||
}
|
||||
val range = featureMetadata.supportedFeatures().get(feature)
|
||||
out.printf("Feature: %s\tSupportedMinVersion: %s\tSupportedMaxVersion: %s\tFinalizedVersionLevel: %s\tEpoch: %s%n",
|
||||
feature,
|
||||
levelToString(feature, range.minVersion()),
|
||||
levelToString(feature, range.maxVersion()),
|
||||
levelToString(feature, finalizedLevel),
|
||||
featureMetadata.finalizedFeaturesEpoch().asScala.flatMap(e => Some(e.toString)).getOrElse("-"))
|
||||
}
|
||||
}
|
||||
|
||||
def metadataVersionsToString(first: MetadataVersion, last: MetadataVersion): String = {
|
||||
MetadataVersion.VERSIONS.toList.asJava.
|
||||
subList(first.ordinal(), last.ordinal() + 1).
|
||||
asScala.mkString(", ")
|
||||
}
|
||||
|
||||
def handleUpgrade(out: PrintStream, namespace: Namespace, admin: Admin): Unit = {
|
||||
handleUpgradeOrDowngrade("upgrade", out, namespace, admin, UpgradeType.UPGRADE)
|
||||
}
|
||||
|
||||
def downgradeType(namespace: Namespace): UpgradeType = {
|
||||
val unsafe = namespace.getBoolean("unsafe")
|
||||
if (unsafe == null || !unsafe) {
|
||||
UpgradeType.SAFE_DOWNGRADE
|
||||
} else {
|
||||
UpgradeType.UNSAFE_DOWNGRADE
|
||||
}
|
||||
}
|
||||
|
||||
def handleDowngrade(out: PrintStream, namespace: Namespace, admin: Admin): Unit = {
|
||||
handleUpgradeOrDowngrade("downgrade", out, namespace, admin, downgradeType(namespace))
|
||||
}
|
||||
|
||||
def parseNameAndLevel(input: String): (String, Short) = {
|
||||
val equalsIndex = input.indexOf("=")
|
||||
if (equalsIndex < 0) {
|
||||
throw new TerseFailure(s"Can't parse feature=level string ${input}: equals sign not found.")
|
||||
}
|
||||
val name = input.substring(0, equalsIndex).trim
|
||||
val levelString = input.substring(equalsIndex + 1).trim
|
||||
val level =
|
||||
try levelString.toShort
|
||||
catch {
|
||||
case _: Throwable => throw new TerseFailure(s"Can't parse feature=level string ${input}: " +
|
||||
s"unable to parse ${levelString} as a short.")
|
||||
}
|
||||
(name, level)
|
||||
}
|
||||
|
||||
def handleUpgradeOrDowngrade(
|
||||
op: String,
|
||||
out: PrintStream,
|
||||
namespace: Namespace,
|
||||
admin: Admin,
|
||||
upgradeType: UpgradeType
|
||||
): Unit = {
|
||||
val updates = new java.util.HashMap[String, FeatureUpdate]()
|
||||
Option(namespace.getString("metadata")).foreach(metadata => {
|
||||
val version = try {
|
||||
MetadataVersion.fromVersionString(metadata)
|
||||
} catch {
|
||||
case _: Throwable => throw new TerseFailure("Unsupported metadata version " + metadata +
|
||||
". Supported metadata versions are " + metadataVersionsToString(
|
||||
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, MetadataVersion.latest()))
|
||||
}
|
||||
updates.put(MetadataVersion.FEATURE_NAME, new FeatureUpdate(version.featureLevel(), upgradeType))
|
||||
})
|
||||
Option(namespace.getList[String]("feature")).foreach(features => {
|
||||
features.forEach(feature => {
|
||||
val (name, level) = parseNameAndLevel(feature)
|
||||
if (updates.put(name, new FeatureUpdate(level, upgradeType)) != null) {
|
||||
throw new TerseFailure(s"Feature ${name} was specified more than once.")
|
||||
}
|
||||
})
|
||||
})
|
||||
update(op, out, admin, updates, namespace.getBoolean("dry-run"))
|
||||
}
|
||||
|
||||
def handleDisable(out: PrintStream, namespace: Namespace, admin: Admin): Unit = {
|
||||
val upgradeType = downgradeType(namespace)
|
||||
val updates = new java.util.HashMap[String, FeatureUpdate]()
|
||||
Option(namespace.getList[String]("feature")).foreach(features => {
|
||||
features.forEach(name =>
|
||||
if (updates.put(name, new FeatureUpdate(0.toShort, upgradeType)) != null) {
|
||||
throw new TerseFailure(s"Feature ${name} was specified more than once.")
|
||||
})
|
||||
}
|
||||
)
|
||||
update("disable", out, admin, updates, namespace.getBoolean("dry-run"))
|
||||
}
|
||||
|
||||
def update(
|
||||
op: String,
|
||||
out: PrintStream,
|
||||
admin: Admin,
|
||||
updates: java.util.HashMap[String, FeatureUpdate],
|
||||
dryRun: Boolean
|
||||
): Unit = {
|
||||
if (updates.isEmpty) {
|
||||
throw new TerseFailure(s"You must specify at least one feature to ${op}")
|
||||
}
|
||||
val result = admin.updateFeatures(updates, new UpdateFeaturesOptions().validateOnly(dryRun))
|
||||
val errors = result.values().asScala.map { case (feature, future) =>
|
||||
try {
|
||||
future.get()
|
||||
feature -> None
|
||||
} catch {
|
||||
case e: ExecutionException => feature -> Some(e.getCause)
|
||||
case t: Throwable => feature -> Some(t)
|
||||
}
|
||||
}
|
||||
var numFailures = 0
|
||||
errors.keySet.toList.sorted.foreach { feature =>
|
||||
val maybeThrowable = errors(feature)
|
||||
val level = updates.get(feature).maxVersionLevel()
|
||||
if (maybeThrowable.isDefined) {
|
||||
val helper = if (dryRun) {
|
||||
"Can not"
|
||||
} else {
|
||||
"Could not"
|
||||
}
|
||||
val suffix = if (op.equals("disable")) {
|
||||
s"disable ${feature}"
|
||||
} else {
|
||||
s"${op} ${feature} to ${level}"
|
||||
}
|
||||
out.println(s"${helper} ${suffix}. ${maybeThrowable.get.getMessage}")
|
||||
numFailures = numFailures + 1
|
||||
} else {
|
||||
val verb = if (dryRun) {
|
||||
"can be"
|
||||
} else {
|
||||
"was"
|
||||
}
|
||||
val obj = if (op.equals("disable")) {
|
||||
"disabled."
|
||||
} else {
|
||||
s"${op}d to ${level}."
|
||||
}
|
||||
out.println(s"${feature} ${verb} ${obj}")
|
||||
}
|
||||
}
|
||||
if (numFailures > 0) {
|
||||
throw new TerseFailure(s"${numFailures} out of ${updates.size} operation(s) failed.")
|
||||
}
|
||||
println("WARNING: The 'kafka.tools' package is deprecated and will change to 'org.apache.kafka.tools' in the next major release.")
|
||||
val toolClass = Class.forName("org.apache.kafka.tools.FeatureCommand")
|
||||
val toolMethod = toolClass.getDeclaredMethod("main", classOf[Array[String]])
|
||||
toolMethod.invoke(null, args)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,327 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.admin
|
||||
|
||||
import kafka.api.IntegrationTestHarness
|
||||
import kafka.server.KafkaConfig
|
||||
import kafka.tools.TerseFailure
|
||||
import kafka.utils.{TestInfoUtils, TestUtils}
|
||||
import net.sourceforge.argparse4j.inf.Namespace
|
||||
import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.{SAFE_DOWNGRADE, UNSAFE_DOWNGRADE}
|
||||
import org.apache.kafka.clients.admin.MockAdminClient
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
import org.apache.kafka.server.common.MetadataVersion
|
||||
import org.apache.kafka.server.common.MetadataVersion.{IBP_3_3_IV0, IBP_3_3_IV1, IBP_3_3_IV2, IBP_3_3_IV3, IBP_3_5_IV1}
|
||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.params.ParameterizedTest
|
||||
import org.junit.jupiter.params.provider.ValueSource
|
||||
|
||||
import java.io.{ByteArrayOutputStream, PrintStream}
|
||||
import java.{lang, util}
|
||||
import java.util.Collections.{emptyMap, singletonMap}
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
case class FeatureCommandTestEnv(admin: MockAdminClient = null) extends AutoCloseable {
|
||||
val stream = new ByteArrayOutputStream()
|
||||
val out = new PrintStream(stream)
|
||||
|
||||
override def close(): Unit = {
|
||||
Utils.closeAll(stream, out)
|
||||
Utils.closeQuietly(admin, "admin")
|
||||
}
|
||||
|
||||
def outputWithoutEpoch(): String = {
|
||||
val lines = stream.toString.split(String.format("%n"))
|
||||
lines.map { line =>
|
||||
val pos = line.indexOf("Epoch: ")
|
||||
if (pos > 0) {
|
||||
line.substring(0, pos)
|
||||
} else {
|
||||
line
|
||||
}
|
||||
}.mkString(String.format("%n"))
|
||||
}
|
||||
}
|
||||
|
||||
class FeatureCommandTest extends IntegrationTestHarness {
|
||||
override def brokerCount: Int = 1
|
||||
|
||||
override protected def metadataVersion: MetadataVersion = IBP_3_3_IV1
|
||||
|
||||
serverConfig.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, metadataVersion.toString)
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||
@ValueSource(strings = Array("zk"))
|
||||
def testDescribeWithZk(quorum: String): Unit = {
|
||||
TestUtils.resource(FeatureCommandTestEnv()) { env =>
|
||||
assertEquals(0, FeatureCommand.mainNoExit(
|
||||
Array("--bootstrap-server", bootstrapServers(), "describe"), env.out))
|
||||
assertEquals("", env.outputWithoutEpoch())
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||
@ValueSource(strings = Array("kraft"))
|
||||
def testDescribeWithKRaft(quorum: String): Unit = {
|
||||
TestUtils.resource(FeatureCommandTestEnv()) { env =>
|
||||
assertEquals(0, FeatureCommand.mainNoExit(
|
||||
Array("--bootstrap-server", bootstrapServers(), "describe"), env.out))
|
||||
assertEquals(String.format(
|
||||
"Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
|
||||
"SupportedMaxVersion: 3.5-IV2\tFinalizedVersionLevel: 3.3-IV1\t"),
|
||||
env.outputWithoutEpoch())
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||
@ValueSource(strings = Array("zk"))
|
||||
def testUpgradeMetadataVersionWithZk(quorum: String): Unit = {
|
||||
TestUtils.resource(FeatureCommandTestEnv()) { env =>
|
||||
assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
|
||||
"upgrade", "--metadata", "3.3-IV2"), env.out))
|
||||
assertEquals("Could not upgrade metadata.version to 6. Could not apply finalized feature " +
|
||||
"update because the provided feature is not supported.", env.outputWithoutEpoch())
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||
@ValueSource(strings = Array("kraft"))
|
||||
def testUpgradeMetadataVersionWithKraft(quorum: String): Unit = {
|
||||
TestUtils.resource(FeatureCommandTestEnv()) { env =>
|
||||
assertEquals(0, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
|
||||
"upgrade", "--feature", "metadata.version=5"), env.out))
|
||||
assertEquals("metadata.version was upgraded to 5.", env.outputWithoutEpoch())
|
||||
}
|
||||
TestUtils.resource(FeatureCommandTestEnv()) { env =>
|
||||
assertEquals(0, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
|
||||
"upgrade", "--metadata", "3.3-IV2"), env.out))
|
||||
assertEquals("metadata.version was upgraded to 6.", env.outputWithoutEpoch())
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||
@ValueSource(strings = Array("zk"))
|
||||
def testDowngradeMetadataVersionWithZk(quorum: String): Unit = {
|
||||
TestUtils.resource(FeatureCommandTestEnv()) { env =>
|
||||
assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
|
||||
"disable", "--feature", "metadata.version"), env.out))
|
||||
assertEquals("Could not disable metadata.version. Can not delete non-existing finalized feature.",
|
||||
env.outputWithoutEpoch())
|
||||
}
|
||||
TestUtils.resource(FeatureCommandTestEnv()) { env =>
|
||||
assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
|
||||
"downgrade", "--metadata", "3.3-IV0"), env.out))
|
||||
assertEquals("Could not downgrade metadata.version to 4. Could not apply finalized feature " +
|
||||
"update because the provided feature is not supported.", env.outputWithoutEpoch())
|
||||
}
|
||||
TestUtils.resource(FeatureCommandTestEnv()) { env =>
|
||||
assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
|
||||
"downgrade", "--unsafe", "--metadata", "3.3-IV0"), env.out))
|
||||
assertEquals("Could not downgrade metadata.version to 4. Could not apply finalized feature " +
|
||||
"update because the provided feature is not supported.", env.outputWithoutEpoch())
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||
@ValueSource(strings = Array("kraft"))
|
||||
def testDowngradeMetadataVersionWithKRaft(quorum: String): Unit = {
|
||||
TestUtils.resource(FeatureCommandTestEnv()) { env =>
|
||||
assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
|
||||
"disable", "--feature", "metadata.version"), env.out))
|
||||
assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " +
|
||||
"metadata.version. Local controller 1000 only supports versions 1-11", env.outputWithoutEpoch())
|
||||
}
|
||||
TestUtils.resource(FeatureCommandTestEnv()) { env =>
|
||||
assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
|
||||
"downgrade", "--metadata", "3.3-IV0"), env.out))
|
||||
assertEquals("Could not downgrade metadata.version to 4. Invalid metadata.version 4. " +
|
||||
"Refusing to perform the requested downgrade because it might delete metadata information. " +
|
||||
"Retry using UNSAFE_DOWNGRADE if you want to force the downgrade to proceed.", env.outputWithoutEpoch())
|
||||
}
|
||||
TestUtils.resource(FeatureCommandTestEnv()) { env =>
|
||||
assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
|
||||
"downgrade", "--unsafe", "--metadata", "3.3-IV0"), env.out))
|
||||
assertEquals("Could not downgrade metadata.version to 4. Invalid metadata.version 4. " +
|
||||
"Unsafe metadata downgrade is not supported in this version.", env.outputWithoutEpoch())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class FeatureCommandUnitTest {
|
||||
@Test
|
||||
def testLevelToString(): Unit = {
|
||||
assertEquals("5", FeatureCommand.levelToString("foo.bar", 5.toShort))
|
||||
assertEquals("3.3-IV0",
|
||||
FeatureCommand.levelToString(MetadataVersion.FEATURE_NAME, IBP_3_3_IV0.featureLevel()))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testMetadataVersionsToString(): Unit = {
|
||||
assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3, 3.4-IV0, 3.5-IV0, 3.5-IV1",
|
||||
FeatureCommand.metadataVersionsToString(IBP_3_3_IV0, IBP_3_5_IV1))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testdowngradeType(): Unit = {
|
||||
assertEquals(SAFE_DOWNGRADE, FeatureCommand.downgradeType(
|
||||
new Namespace(singletonMap("unsafe", java.lang.Boolean.valueOf(false)))))
|
||||
assertEquals(UNSAFE_DOWNGRADE, FeatureCommand.downgradeType(
|
||||
new Namespace(singletonMap("unsafe", java.lang.Boolean.valueOf(true)))))
|
||||
assertEquals(SAFE_DOWNGRADE, FeatureCommand.downgradeType(new Namespace(emptyMap())))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testParseNameAndLevel(): Unit = {
|
||||
assertEquals(("foo.bar", 5.toShort), FeatureCommand.parseNameAndLevel("foo.bar=5"))
|
||||
assertEquals(("quux", 0.toShort), FeatureCommand.parseNameAndLevel(" quux=0"))
|
||||
assertEquals("Can't parse feature=level string baaz: equals sign not found.",
|
||||
assertThrows(classOf[TerseFailure],
|
||||
() => FeatureCommand.parseNameAndLevel("baaz")).getMessage)
|
||||
assertEquals("Can't parse feature=level string w=tf: unable to parse tf as a short.",
|
||||
assertThrows(classOf[TerseFailure],
|
||||
() => FeatureCommand.parseNameAndLevel("w=tf")).getMessage)
|
||||
}
|
||||
|
||||
def buildAdminClient1(): MockAdminClient = {
|
||||
new MockAdminClient.Builder().
|
||||
minSupportedFeatureLevels(Map(
|
||||
MetadataVersion.FEATURE_NAME -> lang.Short.valueOf(IBP_3_3_IV0.featureLevel()),
|
||||
"foo.bar" -> lang.Short.valueOf(0.toShort)
|
||||
).asJava).
|
||||
featureLevels(Map(
|
||||
MetadataVersion.FEATURE_NAME -> lang.Short.valueOf(IBP_3_3_IV2.featureLevel()),
|
||||
"foo.bar" -> lang.Short.valueOf(5.toShort)
|
||||
).asJava).
|
||||
maxSupportedFeatureLevels(Map(
|
||||
MetadataVersion.FEATURE_NAME -> lang.Short.valueOf(IBP_3_3_IV3.featureLevel()),
|
||||
"foo.bar" -> lang.Short.valueOf(10.toShort)
|
||||
).asJava).
|
||||
build()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testHandleDescribe(): Unit = {
|
||||
TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
|
||||
FeatureCommand.handleDescribe(env.out, env.admin)
|
||||
assertEquals(String.format(
|
||||
"Feature: foo.bar\tSupportedMinVersion: 0\tSupportedMaxVersion: 10\tFinalizedVersionLevel: 5\tEpoch: 123%n" +
|
||||
"Feature: metadata.version\tSupportedMinVersion: 3.3-IV0\tSupportedMaxVersion: 3.3-IV3\tFinalizedVersionLevel: 3.3-IV2\tEpoch: 123%n"),
|
||||
env.stream.toString)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testHandleUpgrade(): Unit = {
|
||||
TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
|
||||
assertEquals("1 out of 2 operation(s) failed.",
|
||||
assertThrows(classOf[TerseFailure], () =>
|
||||
FeatureCommand.handleUpgrade(env.out, new Namespace(Map(
|
||||
"metadata" -> "3.3-IV1",
|
||||
"feature" -> util.Arrays.asList("foo.bar=6")
|
||||
).asJava), env.admin)).getMessage)
|
||||
assertEquals(String.format(
|
||||
"foo.bar was upgraded to 6.%n" +
|
||||
"Could not upgrade metadata.version to 5. Can't upgrade to lower version.%n"),
|
||||
env.stream.toString)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testHandleUpgradeDryRun(): Unit = {
|
||||
TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
|
||||
assertEquals("1 out of 2 operation(s) failed.",
|
||||
assertThrows(classOf[TerseFailure], () =>
|
||||
FeatureCommand.handleUpgrade(env.out, new Namespace(Map(
|
||||
"metadata" -> "3.3-IV1",
|
||||
"feature" -> util.Arrays.asList("foo.bar=6"),
|
||||
"dry-run" -> java.lang.Boolean.valueOf(true)
|
||||
).asJava), env.admin)).getMessage)
|
||||
assertEquals(String.format(
|
||||
"foo.bar can be upgraded to 6.%n" +
|
||||
"Can not upgrade metadata.version to 5. Can't upgrade to lower version.%n"),
|
||||
env.stream.toString)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testHandleDowngrade(): Unit = {
|
||||
TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
|
||||
assertEquals("1 out of 2 operation(s) failed.",
|
||||
assertThrows(classOf[TerseFailure], () =>
|
||||
FeatureCommand.handleDowngrade(env.out, new Namespace(Map(
|
||||
"metadata" -> "3.3-IV3",
|
||||
"feature" -> util.Arrays.asList("foo.bar=1")
|
||||
).asJava), env.admin)).getMessage)
|
||||
assertEquals(String.format(
|
||||
"foo.bar was downgraded to 1.%n" +
|
||||
"Could not downgrade metadata.version to 7. Can't downgrade to newer version.%n"),
|
||||
env.stream.toString)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testHandleDowngradeDryRun(): Unit = {
|
||||
TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
|
||||
assertEquals("1 out of 2 operation(s) failed.",
|
||||
assertThrows(classOf[TerseFailure], () =>
|
||||
FeatureCommand.handleDowngrade(env.out, new Namespace(Map(
|
||||
"metadata" -> "3.3-IV3",
|
||||
"feature" -> util.Arrays.asList("foo.bar=1"),
|
||||
"dry-run" -> java.lang.Boolean.valueOf(true)
|
||||
).asJava), env.admin)).getMessage)
|
||||
assertEquals(String.format(
|
||||
"foo.bar can be downgraded to 1.%n" +
|
||||
"Can not downgrade metadata.version to 7. Can't downgrade to newer version.%n"),
|
||||
env.stream.toString)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testHandleDisable(): Unit = {
|
||||
TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
|
||||
assertEquals("1 out of 3 operation(s) failed.",
|
||||
assertThrows(classOf[TerseFailure], () =>
|
||||
FeatureCommand.handleDisable(env.out, new Namespace(Map[String, AnyRef](
|
||||
"feature" -> util.Arrays.asList("foo.bar", "metadata.version", "quux")
|
||||
).asJava), env.admin)).getMessage)
|
||||
assertEquals(String.format(
|
||||
"foo.bar was disabled.%n" +
|
||||
"Could not disable metadata.version. Can't downgrade below 4%n" +
|
||||
"quux was disabled.%n"),
|
||||
env.stream.toString)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testHandleDisableDryRun(): Unit = {
|
||||
TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
|
||||
assertEquals("1 out of 3 operation(s) failed.",
|
||||
assertThrows(classOf[TerseFailure], () =>
|
||||
FeatureCommand.handleDisable(env.out, new Namespace(Map[String, AnyRef](
|
||||
"feature" -> util.Arrays.asList("foo.bar", "metadata.version", "quux"),
|
||||
"dry-run" -> java.lang.Boolean.valueOf(true)
|
||||
).asJava), env.admin)).getMessage)
|
||||
assertEquals(String.format(
|
||||
"foo.bar can be disabled.%n" +
|
||||
"Can not disable metadata.version. Can't downgrade below 4%n" +
|
||||
"quux can be disabled.%n"),
|
||||
env.stream.toString)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,324 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.tools;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.stream.Collectors;
|
||||
import net.sourceforge.argparse4j.ArgumentParsers;
|
||||
import net.sourceforge.argparse4j.impl.Arguments;
|
||||
import net.sourceforge.argparse4j.inf.ArgumentParser;
|
||||
import net.sourceforge.argparse4j.inf.ArgumentParserException;
|
||||
import net.sourceforge.argparse4j.inf.Namespace;
|
||||
import net.sourceforge.argparse4j.inf.Subparser;
|
||||
import net.sourceforge.argparse4j.inf.Subparsers;
|
||||
import net.sourceforge.argparse4j.internal.HelpScreenException;
|
||||
import org.apache.kafka.clients.admin.Admin;
|
||||
import org.apache.kafka.clients.admin.FeatureMetadata;
|
||||
import org.apache.kafka.clients.admin.FeatureUpdate;
|
||||
import org.apache.kafka.clients.admin.SupportedVersionRange;
|
||||
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
|
||||
import org.apache.kafka.clients.admin.UpdateFeaturesResult;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
|
||||
import static net.sourceforge.argparse4j.impl.Arguments.append;
|
||||
import static net.sourceforge.argparse4j.impl.Arguments.store;
|
||||
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
|
||||
|
||||
public class FeatureCommand {
|
||||
public static void main(String... args) {
|
||||
Exit.exit(mainNoExit(args));
|
||||
}
|
||||
|
||||
static int mainNoExit(String... args) {
|
||||
try {
|
||||
execute(args);
|
||||
return 0;
|
||||
} catch (HelpScreenException e) {
|
||||
return 0;
|
||||
} catch (ArgumentParserException e) {
|
||||
System.err.printf("Command line error: " + e.getMessage() + ". Type --help for help.");
|
||||
return 1;
|
||||
} catch (Throwable e) {
|
||||
System.err.println(e.getMessage());
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
static void execute(String... args) throws Exception {
|
||||
ArgumentParser parser = ArgumentParsers
|
||||
.newArgumentParser("kafka-features")
|
||||
.defaultHelp(true)
|
||||
.description("This tool manages feature flags in Kafka.");
|
||||
parser
|
||||
.addArgument("--bootstrap-server")
|
||||
.help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.")
|
||||
.required(true);
|
||||
|
||||
parser
|
||||
.addArgument("--command-config")
|
||||
.type(Arguments.fileType())
|
||||
.help("Property file containing configs to be passed to Admin Client.");
|
||||
Subparsers subparsers = parser.addSubparsers().dest("command");
|
||||
addDescribeParser(subparsers);
|
||||
addUpgradeParser(subparsers);
|
||||
addDowngradeParser(subparsers);
|
||||
addDisableParser(subparsers);
|
||||
|
||||
Namespace namespace = parser.parseArgsOrFail(args);
|
||||
String command = namespace.getString("command");
|
||||
String configPath = namespace.getString("command_config");
|
||||
Properties properties = (configPath == null) ? new Properties() : Utils.loadProps(configPath);
|
||||
|
||||
String bootstrapServer = namespace.getString("bootstrap_server");
|
||||
if (bootstrapServer != null) {
|
||||
properties.setProperty("bootstrap.servers", bootstrapServer);
|
||||
}
|
||||
if (properties.getProperty("bootstrap.servers") == null) {
|
||||
throw new TerseException("Please specify --bootstrap-server.");
|
||||
}
|
||||
|
||||
try (Admin adminClient = Admin.create(properties)) {
|
||||
switch (command) {
|
||||
case "describe":
|
||||
handleDescribe(adminClient);
|
||||
break;
|
||||
case "upgrade":
|
||||
handleUpgrade(namespace, adminClient);
|
||||
break;
|
||||
case "downgrade":
|
||||
handleDowngrade(namespace, adminClient);
|
||||
break;
|
||||
case "disable":
|
||||
handleDisable(namespace, adminClient);
|
||||
break;
|
||||
default:
|
||||
throw new TerseException("Unknown command " + command);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void addDescribeParser(Subparsers subparsers) {
|
||||
subparsers.addParser("describe")
|
||||
.help("Describes the current active feature flags.");
|
||||
}
|
||||
|
||||
private static void addUpgradeParser(Subparsers subparsers) {
|
||||
Subparser upgradeParser = subparsers.addParser("upgrade")
|
||||
.help("Upgrade one or more feature flags.");
|
||||
upgradeParser.addArgument("--metadata")
|
||||
.help("The level to which we should upgrade the metadata. For example, 3.3-IV3.")
|
||||
.action(store());
|
||||
upgradeParser.addArgument("--feature")
|
||||
.help("A feature upgrade we should perform, in feature=level format. For example: `metadata.version=5`.")
|
||||
.action(append());
|
||||
upgradeParser.addArgument("--dry-run")
|
||||
.help("Validate this upgrade, but do not perform it.")
|
||||
.action(storeTrue());
|
||||
|
||||
}
|
||||
|
||||
private static void addDowngradeParser(Subparsers subparsers) {
|
||||
Subparser downgradeParser = subparsers.addParser("downgrade")
|
||||
.help("Upgrade one or more feature flags.");
|
||||
downgradeParser.addArgument("--metadata")
|
||||
.help("The level to which we should downgrade the metadata. For example, 3.3-IV0.")
|
||||
.action(store());
|
||||
downgradeParser.addArgument("--feature")
|
||||
.help("A feature downgrade we should perform, in feature=level format. For example: `metadata.version=5`.")
|
||||
.action(append());
|
||||
downgradeParser.addArgument("--unsafe")
|
||||
.help("Perform this downgrade even if it may irreversibly destroy metadata.")
|
||||
.action(storeTrue());
|
||||
downgradeParser.addArgument("--dry-run")
|
||||
.help("Validate this downgrade, but do not perform it.")
|
||||
.action(storeTrue());
|
||||
}
|
||||
|
||||
private static void addDisableParser(Subparsers subparsers) {
|
||||
Subparser downgradeParser = subparsers.addParser("disable")
|
||||
.help("Disable one or more feature flags. This is the same as downgrading the version to zero.");
|
||||
downgradeParser.addArgument("--feature")
|
||||
.help("A feature flag to disable.")
|
||||
.action(append());
|
||||
downgradeParser.addArgument("--unsafe")
|
||||
.help("Disable this feature flag even if it may irreversibly destroy metadata.")
|
||||
.action(storeTrue());
|
||||
downgradeParser.addArgument("--dry-run")
|
||||
.help("Perform a dry-run of this disable operation.")
|
||||
.action(storeTrue());
|
||||
}
|
||||
|
||||
static String levelToString(String feature, short level) {
|
||||
if (feature.equals(MetadataVersion.FEATURE_NAME)) {
|
||||
try {
|
||||
return MetadataVersion.fromFeatureLevel(level).version();
|
||||
} catch (Throwable e) {
|
||||
return "UNKNOWN " + level;
|
||||
}
|
||||
}
|
||||
return String.valueOf(level);
|
||||
}
|
||||
|
||||
static void handleDescribe(Admin adminClient) throws ExecutionException, InterruptedException {
|
||||
FeatureMetadata featureMetadata = adminClient.describeFeatures().featureMetadata().get();
|
||||
featureMetadata.supportedFeatures().keySet().stream().sorted().forEach(feature -> {
|
||||
short finalizedLevel = (featureMetadata.finalizedFeatures().get(feature) == null) ? 0 : featureMetadata.finalizedFeatures().get(feature).maxVersionLevel();
|
||||
SupportedVersionRange range = featureMetadata.supportedFeatures().get(feature);
|
||||
System.out.printf("Feature: %s\tSupportedMinVersion: %s\tSupportedMaxVersion: %s\tFinalizedVersionLevel: %s\tEpoch: %s%n",
|
||||
feature,
|
||||
levelToString(feature, range.minVersion()),
|
||||
levelToString(feature, range.maxVersion()),
|
||||
levelToString(feature, finalizedLevel),
|
||||
(featureMetadata.finalizedFeaturesEpoch().isPresent()) ? featureMetadata.finalizedFeaturesEpoch().get().toString() : "-");
|
||||
});
|
||||
}
|
||||
|
||||
static String metadataVersionsToString(MetadataVersion first, MetadataVersion last) {
|
||||
List<MetadataVersion> versions = Arrays.asList(MetadataVersion.VERSIONS).subList(first.ordinal(), last.ordinal() + 1);
|
||||
return versions.stream()
|
||||
.map(String::valueOf)
|
||||
.collect(Collectors.joining(", "));
|
||||
}
|
||||
|
||||
static void handleUpgrade(Namespace namespace, Admin adminClient) throws TerseException {
|
||||
handleUpgradeOrDowngrade("upgrade", namespace, adminClient, FeatureUpdate.UpgradeType.UPGRADE);
|
||||
}
|
||||
|
||||
static FeatureUpdate.UpgradeType downgradeType(Namespace namespace) {
|
||||
Boolean unsafe = namespace.getBoolean("unsafe");
|
||||
if (unsafe == null || !unsafe) {
|
||||
return FeatureUpdate.UpgradeType.SAFE_DOWNGRADE;
|
||||
} else {
|
||||
return FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE;
|
||||
}
|
||||
}
|
||||
|
||||
static void handleDowngrade(Namespace namespace, Admin adminClient) throws TerseException {
|
||||
handleUpgradeOrDowngrade("downgrade", namespace, adminClient, downgradeType(namespace));
|
||||
}
|
||||
|
||||
static String[] parseNameAndLevel(String input) {
|
||||
int equalsIndex = input.indexOf("=");
|
||||
if (equalsIndex < 0) {
|
||||
throw new RuntimeException("Can't parse feature=level string " + input + ": equals sign not found.");
|
||||
}
|
||||
String name = input.substring(0, equalsIndex).trim();
|
||||
String levelString = input.substring(equalsIndex + 1).trim();
|
||||
try {
|
||||
Short.parseShort(levelString);
|
||||
} catch (Throwable t) {
|
||||
throw new RuntimeException("Can't parse feature=level string " + input + ": " +
|
||||
"unable to parse " + levelString + " as a short.");
|
||||
}
|
||||
return new String[]{name, levelString};
|
||||
}
|
||||
|
||||
private static void handleUpgradeOrDowngrade(String op, Namespace namespace, Admin admin, FeatureUpdate.UpgradeType upgradeType) throws TerseException {
|
||||
Map<String, FeatureUpdate> updates = new HashMap<>();
|
||||
MetadataVersion version;
|
||||
String metadata = namespace.getString("metadata");
|
||||
if (metadata != null) {
|
||||
try {
|
||||
version = MetadataVersion.fromVersionString(metadata);
|
||||
} catch (Throwable e) {
|
||||
throw new TerseException("Unsupported metadata version " + metadata +
|
||||
". Supported metadata versions are " + metadataVersionsToString(
|
||||
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, MetadataVersion.latest()));
|
||||
}
|
||||
updates.put(MetadataVersion.FEATURE_NAME, new FeatureUpdate(version.featureLevel(), upgradeType));
|
||||
}
|
||||
|
||||
List<String> features = namespace.getList("feature");
|
||||
if (features != null) {
|
||||
features.forEach(feature -> {
|
||||
String[] nameAndLevel;
|
||||
nameAndLevel = parseNameAndLevel(feature);
|
||||
|
||||
if (updates.put(nameAndLevel[0], new FeatureUpdate(Short.parseShort(nameAndLevel[1]), upgradeType)) != null) {
|
||||
throw new RuntimeException("Feature " + nameAndLevel[0] + " was specified more than once.");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
update(op, admin, updates, namespace.getBoolean("dry_run"));
|
||||
}
|
||||
|
||||
static void handleDisable(Namespace namespace, Admin adminClient) throws TerseException {
|
||||
FeatureUpdate.UpgradeType upgradeType = downgradeType(namespace);
|
||||
Map<String, FeatureUpdate> updates = new HashMap<>();
|
||||
|
||||
List<String> features = namespace.getList("feature");
|
||||
if (features != null) {
|
||||
features.forEach(feature -> {
|
||||
if (updates.put(feature, new FeatureUpdate((short) 0, upgradeType)) != null) {
|
||||
throw new RuntimeException("Feature " + feature + " was specified more than once.");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
update("disable", adminClient, updates, namespace.getBoolean("dry_run"));
|
||||
}
|
||||
|
||||
private static void update(String op, Admin admin, Map<String, FeatureUpdate> updates, Boolean dryRun) throws TerseException {
|
||||
if (updates.isEmpty()) {
|
||||
throw new TerseException("You must specify at least one feature to " + op);
|
||||
}
|
||||
|
||||
UpdateFeaturesResult result = admin.updateFeatures(updates, new UpdateFeaturesOptions().validateOnly(dryRun));
|
||||
Map<String, Optional<Throwable>> errors = new TreeMap<>();
|
||||
result.values().forEach((feature, future) -> {
|
||||
try {
|
||||
future.get();
|
||||
errors.put(feature, null);
|
||||
} catch (ExecutionException e) {
|
||||
errors.put(feature, Optional.ofNullable(e.getCause()));
|
||||
} catch (Throwable t) {
|
||||
errors.put(feature, Optional.of(t));
|
||||
}
|
||||
});
|
||||
|
||||
int numFailures = 0;
|
||||
for (Map.Entry<String, Optional<Throwable>> feature: errors.entrySet()) {
|
||||
short level = updates.get(feature.getKey()).maxVersionLevel();
|
||||
Optional<Throwable> maybeThrowable = feature.getValue();
|
||||
if (maybeThrowable != null && maybeThrowable.isPresent()) {
|
||||
String helper = dryRun ? "Can not " : "Could not ";
|
||||
String suffix = (op.equals("disable")) ? "disable " + feature.getKey() : op + " " + feature.getKey() + " to " + level;
|
||||
System.out.println(helper + suffix + ". " + maybeThrowable.get().getMessage());
|
||||
numFailures++;
|
||||
} else {
|
||||
String verb = dryRun ? " can be " : " was ";
|
||||
String obj = (op.equals("disable")) ? "disabled." : op + "d to " + level + ".";
|
||||
System.out.println(feature.getKey() + verb + obj);
|
||||
}
|
||||
}
|
||||
|
||||
if (numFailures > 0) {
|
||||
throw new TerseException(numFailures + " out of " + updates.size() + " operation(s) failed.");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,302 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.tools;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import kafka.test.ClusterInstance;
|
||||
import kafka.test.annotation.ClusterTest;
|
||||
import kafka.test.annotation.ClusterTestDefaults;
|
||||
import kafka.test.annotation.Type;
|
||||
import kafka.test.junit.ClusterTestExtensions;
|
||||
import net.sourceforge.argparse4j.inf.Namespace;
|
||||
import org.apache.kafka.clients.admin.MockAdminClient;
|
||||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
import org.junit.jupiter.api.Tag;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
||||
import static java.lang.String.format;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonMap;
|
||||
|
||||
import static org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.SAFE_DOWNGRADE;
|
||||
import static org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@ExtendWith(value = ClusterTestExtensions.class)
|
||||
@ClusterTestDefaults(clusterType = Type.KRAFT)
|
||||
@Tag("integration")
|
||||
public class FeatureCommandTest {
|
||||
|
||||
private final ClusterInstance cluster;
|
||||
public FeatureCommandTest(ClusterInstance cluster) {
|
||||
this.cluster = cluster;
|
||||
}
|
||||
|
||||
@ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1)
|
||||
public void testDescribeWithZK() {
|
||||
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
|
||||
assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe"))
|
||||
);
|
||||
assertEquals("", commandOutput);
|
||||
}
|
||||
|
||||
@ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1)
|
||||
public void testDescribeWithKRaft() {
|
||||
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
|
||||
assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe"))
|
||||
);
|
||||
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
|
||||
"SupportedMaxVersion: 3.5-IV2\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput));
|
||||
}
|
||||
|
||||
@ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1)
|
||||
public void testUpgradeMetadataVersionWithZk() {
|
||||
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
|
||||
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
|
||||
"upgrade", "--metadata", "3.3-IV2"))
|
||||
);
|
||||
assertEquals("Could not upgrade metadata.version to 6. Could not apply finalized feature " +
|
||||
"update because the provided feature is not supported.", commandOutput);
|
||||
}
|
||||
|
||||
@ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1)
|
||||
public void testUpgradeMetadataVersionWithKraft() {
|
||||
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
|
||||
assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
|
||||
"upgrade", "--feature", "metadata.version=5"))
|
||||
);
|
||||
assertEquals("metadata.version was upgraded to 5.", commandOutput);
|
||||
|
||||
commandOutput = ToolsTestUtils.captureStandardOut(() ->
|
||||
assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
|
||||
"upgrade", "--metadata", "3.3-IV2"))
|
||||
);
|
||||
assertEquals("metadata.version was upgraded to 6.", commandOutput);
|
||||
}
|
||||
|
||||
@ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1)
|
||||
public void testDowngradeMetadataVersionWithZk() {
|
||||
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
|
||||
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
|
||||
"disable", "--feature", "metadata.version"))
|
||||
);
|
||||
assertEquals("Could not disable metadata.version. Can not delete non-existing finalized feature.", commandOutput);
|
||||
|
||||
commandOutput = ToolsTestUtils.captureStandardOut(() ->
|
||||
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
|
||||
"downgrade", "--metadata", "3.3-IV0"))
|
||||
);
|
||||
assertEquals("Could not downgrade metadata.version to 4. Could not apply finalized feature " +
|
||||
"update because the provided feature is not supported.", commandOutput);
|
||||
|
||||
commandOutput = ToolsTestUtils.captureStandardOut(() ->
|
||||
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
|
||||
"downgrade", "--unsafe", "--metadata", "3.3-IV0"))
|
||||
);
|
||||
assertEquals("Could not downgrade metadata.version to 4. Could not apply finalized feature " +
|
||||
"update because the provided feature is not supported.", commandOutput);
|
||||
}
|
||||
|
||||
@ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1)
|
||||
public void testDowngradeMetadataVersionWithKRaft() {
|
||||
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
|
||||
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
|
||||
"disable", "--feature", "metadata.version"))
|
||||
);
|
||||
assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " +
|
||||
"metadata.version. Local controller 3000 only supports versions 1-11", commandOutput);
|
||||
|
||||
commandOutput = ToolsTestUtils.captureStandardOut(() ->
|
||||
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
|
||||
"downgrade", "--metadata", "3.3-IV0"))
|
||||
|
||||
);
|
||||
assertEquals("Could not downgrade metadata.version to 4. Invalid metadata.version 4. " +
|
||||
"Refusing to perform the requested downgrade because it might delete metadata information. " +
|
||||
"Retry using UNSAFE_DOWNGRADE if you want to force the downgrade to proceed.", commandOutput);
|
||||
|
||||
commandOutput = ToolsTestUtils.captureStandardOut(() ->
|
||||
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
|
||||
"downgrade", "--unsafe", "--metadata", "3.3-IV0"))
|
||||
|
||||
);
|
||||
assertEquals("Could not downgrade metadata.version to 4. Invalid metadata.version 4. " +
|
||||
"Unsafe metadata downgrade is not supported in this version.", commandOutput);
|
||||
}
|
||||
|
||||
private String outputWithoutEpoch(String output) {
|
||||
int pos = output.indexOf("Epoch: ");
|
||||
return (pos > 0) ? output.substring(0, pos) : output;
|
||||
}
|
||||
}
|
||||
|
||||
class FeatureCommandUnitTest {
|
||||
@Test
|
||||
public void testLevelToString() {
|
||||
assertEquals("5", FeatureCommand.levelToString("foo.bar", (short) 5));
|
||||
assertEquals("3.3-IV0",
|
||||
FeatureCommand.levelToString(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetadataVersionsToString() {
|
||||
assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3",
|
||||
FeatureCommand.metadataVersionsToString(MetadataVersion.IBP_3_3_IV0, MetadataVersion.IBP_3_3_IV3));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testdowngradeType() {
|
||||
assertEquals(SAFE_DOWNGRADE, FeatureCommand.downgradeType(
|
||||
new Namespace(singletonMap("unsafe", Boolean.FALSE))));
|
||||
assertEquals(UNSAFE_DOWNGRADE, FeatureCommand.downgradeType(
|
||||
new Namespace(singletonMap("unsafe", Boolean.TRUE))));
|
||||
assertEquals(SAFE_DOWNGRADE, FeatureCommand.downgradeType(new Namespace(emptyMap())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseNameAndLevel() {
|
||||
assertArrayEquals(new String[]{"foo.bar", "5"}, FeatureCommand.parseNameAndLevel("foo.bar=5"));
|
||||
assertArrayEquals(new String[]{"quux", "0"}, FeatureCommand.parseNameAndLevel("quux=0"));
|
||||
assertTrue(assertThrows(RuntimeException.class, () -> FeatureCommand.parseNameAndLevel("baaz"))
|
||||
.getMessage().contains("Can't parse feature=level string baaz: equals sign not found."));
|
||||
assertTrue(assertThrows(RuntimeException.class, () -> FeatureCommand.parseNameAndLevel("w=tf"))
|
||||
.getMessage().contains("Can't parse feature=level string w=tf: unable to parse tf as a short."));
|
||||
}
|
||||
|
||||
private static MockAdminClient buildAdminClient() {
|
||||
Map<String, Short> minSupportedFeatureLevels = new HashMap<>();
|
||||
minSupportedFeatureLevels.put(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel());
|
||||
minSupportedFeatureLevels.put("foo.bar", (short) 0);
|
||||
|
||||
Map<String, Short> featureLevels = new HashMap<>();
|
||||
featureLevels.put(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV2.featureLevel());
|
||||
featureLevels.put("foo.bar", (short) 5);
|
||||
|
||||
Map<String, Short> maxSupportedFeatureLevels = new HashMap<>();
|
||||
maxSupportedFeatureLevels.put(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV3.featureLevel());
|
||||
maxSupportedFeatureLevels.put("foo.bar", (short) 10);
|
||||
|
||||
return new MockAdminClient.Builder().
|
||||
minSupportedFeatureLevels(minSupportedFeatureLevels).
|
||||
featureLevels(featureLevels).
|
||||
maxSupportedFeatureLevels(maxSupportedFeatureLevels).build();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandleDescribe() {
|
||||
String describeResult = ToolsTestUtils.captureStandardOut(() -> {
|
||||
try {
|
||||
FeatureCommand.handleDescribe(buildAdminClient());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
assertEquals(format("Feature: foo.bar\tSupportedMinVersion: 0\tSupportedMaxVersion: 10\tFinalizedVersionLevel: 5\tEpoch: 123%n" +
|
||||
"Feature: metadata.version\tSupportedMinVersion: 3.3-IV0\tSupportedMaxVersion: 3.3-IV3\tFinalizedVersionLevel: 3.3-IV2\tEpoch: 123"), describeResult);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandleUpgrade() {
|
||||
Map<String, Object> namespace = new HashMap<>();
|
||||
namespace.put("metadata", "3.3-IV1");
|
||||
namespace.put("feature", Collections.singletonList("foo.bar=6"));
|
||||
namespace.put("dry_run", false);
|
||||
String upgradeOutput = ToolsTestUtils.captureStandardOut(() -> {
|
||||
Throwable t = assertThrows(TerseException.class, () -> FeatureCommand.handleUpgrade(new Namespace(namespace), buildAdminClient()));
|
||||
assertTrue(t.getMessage().contains("1 out of 2 operation(s) failed."));
|
||||
});
|
||||
assertEquals(format("foo.bar was upgraded to 6.%n" +
|
||||
"Could not upgrade metadata.version to 5. Can't upgrade to lower version."), upgradeOutput);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandleUpgradeDryRun() {
|
||||
Map<String, Object> namespace = new HashMap<>();
|
||||
namespace.put("metadata", "3.3-IV1");
|
||||
namespace.put("feature", Collections.singletonList("foo.bar=6"));
|
||||
namespace.put("dry_run", true);
|
||||
String upgradeOutput = ToolsTestUtils.captureStandardOut(() -> {
|
||||
Throwable t = assertThrows(TerseException.class, () -> FeatureCommand.handleUpgrade(new Namespace(namespace), buildAdminClient()));
|
||||
assertTrue(t.getMessage().contains("1 out of 2 operation(s) failed."));
|
||||
});
|
||||
assertEquals(format("foo.bar can be upgraded to 6.%n" +
|
||||
"Can not upgrade metadata.version to 5. Can't upgrade to lower version."), upgradeOutput);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandleDowngrade() {
|
||||
Map<String, Object> namespace = new HashMap<>();
|
||||
namespace.put("metadata", "3.3-IV3");
|
||||
namespace.put("feature", Collections.singletonList("foo.bar=1"));
|
||||
namespace.put("dry_run", false);
|
||||
String downgradeOutput = ToolsTestUtils.captureStandardOut(() -> {
|
||||
Throwable t = assertThrows(TerseException.class, () -> FeatureCommand.handleDowngrade(new Namespace(namespace), buildAdminClient()));
|
||||
assertTrue(t.getMessage().contains("1 out of 2 operation(s) failed."));
|
||||
});
|
||||
assertEquals(format("foo.bar was downgraded to 1.%n" +
|
||||
"Could not downgrade metadata.version to 7. Can't downgrade to newer version."), downgradeOutput);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandleDowngradeDryRun() {
|
||||
Map<String, Object> namespace = new HashMap<>();
|
||||
namespace.put("metadata", "3.3-IV3");
|
||||
namespace.put("feature", Collections.singletonList("foo.bar=1"));
|
||||
namespace.put("dry_run", true);
|
||||
String downgradeOutput = ToolsTestUtils.captureStandardOut(() -> {
|
||||
Throwable t = assertThrows(TerseException.class, () -> FeatureCommand.handleDowngrade(new Namespace(namespace), buildAdminClient()));
|
||||
assertTrue(t.getMessage().contains("1 out of 2 operation(s) failed."));
|
||||
});
|
||||
assertEquals(format("foo.bar can be downgraded to 1.%n" +
|
||||
"Can not downgrade metadata.version to 7. Can't downgrade to newer version."), downgradeOutput);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandleDisable() {
|
||||
Map<String, Object> namespace = new HashMap<>();
|
||||
namespace.put("feature", Arrays.asList("foo.bar", "metadata.version", "quux"));
|
||||
namespace.put("dry_run", false);
|
||||
String disableOutput = ToolsTestUtils.captureStandardOut(() -> {
|
||||
Throwable t = assertThrows(TerseException.class, () -> FeatureCommand.handleDisable(new Namespace(namespace), buildAdminClient()));
|
||||
assertTrue(t.getMessage().contains("1 out of 3 operation(s) failed."));
|
||||
});
|
||||
assertEquals(format("foo.bar was disabled.%n" +
|
||||
"Could not disable metadata.version. Can't downgrade below 4%n" +
|
||||
"quux was disabled."), disableOutput);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandleDisableDryRun() {
|
||||
Map<String, Object> namespace = new HashMap<>();
|
||||
namespace.put("feature", Arrays.asList("foo.bar", "metadata.version", "quux"));
|
||||
namespace.put("dry_run", true);
|
||||
String disableOutput = ToolsTestUtils.captureStandardOut(() -> {
|
||||
Throwable t = assertThrows(TerseException.class, () -> FeatureCommand.handleDisable(new Namespace(namespace), buildAdminClient()));
|
||||
assertTrue(t.getMessage().contains("1 out of 3 operation(s) failed."));
|
||||
});
|
||||
assertEquals(format("foo.bar can be disabled.%n" +
|
||||
"Can not disable metadata.version. Can't downgrade below 4%n" +
|
||||
"quux can be disabled."), disableOutput);
|
||||
}
|
||||
}
|
|
@ -46,6 +46,9 @@ public class ToolsTestUtils {
|
|||
System.setErr(currentStream);
|
||||
else
|
||||
System.setOut(currentStream);
|
||||
|
||||
currentStream.close();
|
||||
tempStream.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue