KAFKA-13823 Feature flag changes from KIP-778 (#12036)

This PR includes the changes to feature flags that were outlined in KIP-778.  Specifically, it
changes UpdateFeatures and FeatureLevelRecord to remove the maximum version level. It also adds
dry-run to the RPC so the controller can actually attempt the upgrade (rather than the client). It
introduces an upgrade type enum, which supersedes the allowDowngrade boolean. Because
FeatureLevelRecord was unused previously, we do not need to introduce a new version.

The kafka-features.sh tool was overhauled in KIP-778 and now includes the describe, upgrade,
downgrade, and disable sub-commands.  Refer to
[KIP-778](https://cwiki.apache.org/confluence/display/KAFKA/KIP-778%3A+KRaft+Upgrades) for more
details on the new command structure.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>
This commit is contained in:
David Arthur 2022-04-14 13:04:32 -04:00 committed by GitHub
parent 01e4ceba52
commit 55ff5d3603
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 1038 additions and 876 deletions

View File

@ -291,12 +291,12 @@
files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/>
<suppress checks="ClassFanOutComplexity"
files="(QuorumController|ReplicationControlManager|ReplicationControlManagerTest).java"/>
<suppress checks="ParameterNumber"
<suppress checks="(ParameterNumber|ClassDataAbstractionCoupling)"
files="(QuorumController).java"/>
<suppress checks="CyclomaticComplexity"
files="(ClientQuotasImage|MetadataDelta|QuorumController|ReplicationControlManager).java"/>
<suppress checks="NPathComplexity"
files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager).java"/>
files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager).java"/>
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="metadata[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="BooleanExpressionComplexity"

View File

@ -23,33 +23,86 @@ import java.util.Objects;
*/
public class FeatureUpdate {
private final short maxVersionLevel;
private final boolean allowDowngrade;
private final UpgradeType upgradeType;
public enum UpgradeType {
UNKNOWN(0),
UPGRADE(1),
SAFE_DOWNGRADE(2),
UNSAFE_DOWNGRADE(3);
private final byte code;
UpgradeType(int code) {
this.code = (byte) code;
}
public byte code() {
return code;
}
public static UpgradeType fromCode(int code) {
if (code == 1) {
return UPGRADE;
} else if (code == 2) {
return SAFE_DOWNGRADE;
} else if (code == 3) {
return UNSAFE_DOWNGRADE;
} else {
return UNKNOWN;
}
}
}
/**
* @param maxVersionLevel the new maximum version level for the finalized feature.
* a value &lt; 1 is special and indicates that the update is intended to
* a value of zero is special and indicates that the update is intended to
* delete the finalized feature, and should be accompanied by setting
* the allowDowngrade flag to true.
* @param allowDowngrade - true, if this feature update was meant to downgrade the existing
* maximum version level of the finalized feature.
* maximum version level of the finalized feature. Only "safe" downgrades are
* enabled with this boolean. See {@link FeatureUpdate#FeatureUpdate(short, UpgradeType)}
* - false, otherwise.
*/
@Deprecated
public FeatureUpdate(final short maxVersionLevel, final boolean allowDowngrade) {
if (maxVersionLevel < 1 && !allowDowngrade) {
this(maxVersionLevel, allowDowngrade ? UpgradeType.SAFE_DOWNGRADE : UpgradeType.UPGRADE);
}
/**
* @param maxVersionLevel The new maximum version level for the finalized feature.
* a value of zero is special and indicates that the update is intended to
* delete the finalized feature, and should be accompanied by setting
* the upgradeType to safe or unsafe.
* @param upgradeType Indicate what kind of upgrade should be performed in this operation.
* - UPGRADE: upgrading the feature level
* - SAFE_DOWNGRADE: only downgrades which do not result in metadata loss are permitted
* - UNSAFE_DOWNGRADE: any downgrade, including those which may result in metadata loss, are permitted
*/
public FeatureUpdate(final short maxVersionLevel, final UpgradeType upgradeType) {
if (maxVersionLevel == 0 && upgradeType.equals(UpgradeType.UPGRADE)) {
throw new IllegalArgumentException(String.format(
"The allowDowngrade flag should be set when the provided maxVersionLevel:%d is < 1.",
maxVersionLevel));
"The downgradeType flag should be set to SAFE or UNSAFE when the provided maxVersionLevel:%d is < 1.",
maxVersionLevel));
}
if (maxVersionLevel < 0) {
throw new IllegalArgumentException("Cannot specify a negative version level.");
}
this.maxVersionLevel = maxVersionLevel;
this.allowDowngrade = allowDowngrade;
this.upgradeType = upgradeType;
}
public short maxVersionLevel() {
return maxVersionLevel;
}
@Deprecated
public boolean allowDowngrade() {
return allowDowngrade;
return upgradeType != UpgradeType.UPGRADE;
}
public UpgradeType upgradeType() {
return upgradeType;
}
@Override
@ -63,16 +116,16 @@ public class FeatureUpdate {
}
final FeatureUpdate that = (FeatureUpdate) other;
return this.maxVersionLevel == that.maxVersionLevel && this.allowDowngrade == that.allowDowngrade;
return this.maxVersionLevel == that.maxVersionLevel && this.upgradeType.equals(that.upgradeType);
}
@Override
public int hashCode() {
return Objects.hash(maxVersionLevel, allowDowngrade);
return Objects.hash(maxVersionLevel, upgradeType);
}
@Override
public String toString() {
return String.format("FeatureUpdate{maxVersionLevel:%d, allowDowngrade:%s}", maxVersionLevel, allowDowngrade);
return String.format("FeatureUpdate{maxVersionLevel:%d, downgradeType:%s}", maxVersionLevel, upgradeType);
}
}

View File

@ -4265,12 +4265,13 @@ public class KafkaAdminClient extends AdminClient {
new UpdateFeaturesRequestData.FeatureUpdateKey();
requestItem.setFeature(feature);
requestItem.setMaxVersionLevel(update.maxVersionLevel());
requestItem.setAllowDowngrade(update.allowDowngrade());
requestItem.setUpgradeType(update.upgradeType().code());
featureUpdatesRequestData.add(requestItem);
}
return new UpdateFeaturesRequest.Builder(
new UpdateFeaturesRequestData()
.setTimeoutMs(timeoutMs)
.setValidateOnly(options.validateOnly())
.setFeatureUpdates(featureUpdatesRequestData));
}

View File

@ -26,4 +26,24 @@ import org.apache.kafka.common.annotation.InterfaceStability;
*/
@InterfaceStability.Evolving
public class UpdateFeaturesOptions extends AbstractOptions<UpdateFeaturesOptions> {
private boolean validateOnly = false;
@Deprecated
public boolean dryRun() {
return validateOnly;
}
public boolean validateOnly() {
return validateOnly;
}
@Deprecated
public UpdateFeaturesOptions dryRun(boolean dryRun) {
return validateOnly(dryRun);
}
public UpdateFeaturesOptions validateOnly(boolean validateOnly) {
this.validateOnly = validateOnly;
return this;
}
}

View File

@ -98,7 +98,7 @@ public enum ApiKeys {
END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false),
DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, true),
ALTER_PARTITION(ApiMessageType.ALTER_PARTITION, true),
UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES),
UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES, true, true),
ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false),
FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false),
DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER),

View File

@ -16,15 +16,46 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.stream.Collectors;
public class UpdateFeaturesRequest extends AbstractRequest {
public static class FeatureUpdateItem {
private final String featureName;
private final short featureLevel;
private final FeatureUpdate.UpgradeType upgradeType;
public FeatureUpdateItem(String featureName, short featureLevel, FeatureUpdate.UpgradeType upgradeType) {
this.featureName = featureName;
this.featureLevel = featureLevel;
this.upgradeType = upgradeType;
}
public String feature() {
return featureName;
}
public short versionLevel() {
return featureLevel;
}
public FeatureUpdate.UpgradeType upgradeType() {
return upgradeType;
}
public boolean isDeleteRequest() {
return featureLevel < 1 && !upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE);
}
}
public static class Builder extends AbstractRequest.Builder<UpdateFeaturesRequest> {
private final UpdateFeaturesRequestData data;
@ -52,6 +83,25 @@ public class UpdateFeaturesRequest extends AbstractRequest {
this.data = data;
}
public FeatureUpdateItem getFeature(String name) {
UpdateFeaturesRequestData.FeatureUpdateKey update = data.featureUpdates().find(name);
if (super.version() == 0) {
if (update.allowDowngrade()) {
return new FeatureUpdateItem(update.feature(), update.maxVersionLevel(), FeatureUpdate.UpgradeType.SAFE_DOWNGRADE);
} else {
return new FeatureUpdateItem(update.feature(), update.maxVersionLevel(), FeatureUpdate.UpgradeType.UPGRADE);
}
} else {
return new FeatureUpdateItem(update.feature(), update.maxVersionLevel(), FeatureUpdate.UpgradeType.fromCode(update.upgradeType()));
}
}
public Collection<FeatureUpdateItem> featureUpdates() {
return data.featureUpdates().stream()
.map(update -> getFeature(update.feature()))
.collect(Collectors.toList());
}
@Override
public UpdateFeaturesResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return UpdateFeaturesResponse.createWithErrors(
@ -69,8 +119,4 @@ public class UpdateFeaturesRequest extends AbstractRequest {
public static UpdateFeaturesRequest parse(ByteBuffer buffer, short version) {
return new UpdateFeaturesRequest(new UpdateFeaturesRequestData(new ByteBufferAccessor(buffer), version), version);
}
public static boolean isDeleteRequest(UpdateFeaturesRequestData.FeatureUpdateKey update) {
return update.maxVersionLevel() < 1 && update.allowDowngrade();
}
}

View File

@ -16,9 +16,9 @@
{
"apiKey": 57,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["zkBroker", "broker", "controller"],
"name": "UpdateFeaturesRequest",
"validVersions": "0",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
@ -29,8 +29,12 @@
"about": "The name of the finalized feature to be updated."},
{"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
"about": "The new maximum version level for the finalized feature. A value >= 1 is valid. A value < 1, is special, and can be used to request the deletion of the finalized feature."},
{"name": "AllowDowngrade", "type": "bool", "versions": "0+",
"about": "When set to true, the finalized feature version level is allowed to be downgraded/deleted. The downgrade request will fail if the new maximum version level is a value that's not lower than the existing maximum finalized version level."}
]}
{"name": "AllowDowngrade", "type": "bool", "versions": "0",
"about": "DEPRECATED in version 1 (see DowngradeType). When set to true, the finalized feature version level is allowed to be downgraded/deleted. The downgrade request will fail if the new maximum version level is a value that's not lower than the existing maximum finalized version level."},
{"name": "UpgradeType", "type": "int8", "versions": "1+", "default": 1,
"about": "Determine which type of upgrade will be performed: 1 will perform an upgrade only (default), 2 is safe downgrades only (lossless), 3 is unsafe downgrades (lossy)."}
]},
{"name": "ValidateOnly", "type": "bool", "versions": "1+", "default": false,
"about": "True if we should validate the request, but not perform the upgrade or downgrade."}
]
}

View File

@ -17,7 +17,7 @@
"apiKey": 57,
"type": "response",
"name": "UpdateFeaturesResponse",
"validVersions": "0",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

View File

@ -4685,8 +4685,8 @@ public class KafkaAdminClientTest {
private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
return Utils.mkMap(
Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, false)),
Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3, true)));
Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, FeatureUpdate.UpgradeType.UPGRADE)),
Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)));
}
private Map<String, ApiError> makeTestFeatureUpdateErrors(final Map<String, FeatureUpdate> updates, final Errors error) {
@ -4782,8 +4782,8 @@ public class KafkaAdminClientTest {
env.cluster().nodeById(controllerId));
final KafkaFuture<Void> future = env.adminClient().updateFeatures(
Utils.mkMap(
Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, false)),
Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3, true))),
Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, FeatureUpdate.UpgradeType.UPGRADE)),
Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE))),
new UpdateFeaturesOptions().timeoutMs(10000)
).all();
future.get();
@ -4806,8 +4806,8 @@ public class KafkaAdminClientTest {
assertThrows(
IllegalArgumentException.class,
() -> env.adminClient().updateFeatures(
Utils.mkMap(Utils.mkEntry("feature", new FeatureUpdate((short) 2, false)),
Utils.mkEntry("", new FeatureUpdate((short) 2, false))),
Utils.mkMap(Utils.mkEntry("feature", new FeatureUpdate((short) 2, FeatureUpdate.UpgradeType.UPGRADE)),
Utils.mkEntry("", new FeatureUpdate((short) 2, FeatureUpdate.UpgradeType.UPGRADE))),
new UpdateFeaturesOptions()));
}
}
@ -4816,7 +4816,7 @@ public class KafkaAdminClientTest {
public void testUpdateFeaturesShouldFailRequestInClientWhenDowngradeFlagIsNotSetDuringDeletion() {
assertThrows(
IllegalArgumentException.class,
() -> new FeatureUpdate((short) 0, false));
() -> new FeatureUpdate((short) 0, FeatureUpdate.UpgradeType.UPGRADE));
}
@Test

View File

@ -62,7 +62,7 @@ public class ApiKeysTest {
public void testResponseThrottleTime() {
Set<ApiKeys> authenticationKeys = EnumSet.of(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE);
// Newer protocol apis include throttle time ms even for cluster actions
Set<ApiKeys> clusterActionsWithThrottleTimeMs = EnumSet.of(ApiKeys.ALTER_PARTITION, ApiKeys.ALLOCATE_PRODUCER_IDS);
Set<ApiKeys> clusterActionsWithThrottleTimeMs = EnumSet.of(ApiKeys.ALTER_PARTITION, ApiKeys.ALLOCATE_PRODUCER_IDS, ApiKeys.UPDATE_FEATURES);
for (ApiKeys apiKey: ApiKeys.zkBrokerApis()) {
Schema responseSchema = apiKey.messageType.responseSchemas()[apiKey.latestVersion()];
BoundField throttleTimeField = responseSchema.get("throttle_time_ms");

View File

@ -16,14 +16,20 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class UpdateFeaturesRequestTest {
@ -53,4 +59,88 @@ public class UpdateFeaturesRequestTest {
assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 1), response.errorCounts());
}
@Test
public void testUpdateFeaturesV0() {
UpdateFeaturesRequestData.FeatureUpdateKeyCollection features =
new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
features.add(new UpdateFeaturesRequestData.FeatureUpdateKey()
.setFeature("foo")
.setMaxVersionLevel((short) 1)
.setAllowDowngrade(true)
);
features.add(new UpdateFeaturesRequestData.FeatureUpdateKey()
.setFeature("bar")
.setMaxVersionLevel((short) 3)
);
UpdateFeaturesRequest request = new UpdateFeaturesRequest(
new UpdateFeaturesRequestData().setFeatureUpdates(features),
UpdateFeaturesRequestData.LOWEST_SUPPORTED_VERSION
);
ByteBuffer buffer = request.serialize();
request = UpdateFeaturesRequest.parse(buffer, UpdateFeaturesRequestData.LOWEST_SUPPORTED_VERSION);
List<UpdateFeaturesRequest.FeatureUpdateItem> updates = new ArrayList<>(request.featureUpdates());
assertEquals(updates.size(), 2);
assertEquals(updates.get(0).upgradeType(), FeatureUpdate.UpgradeType.SAFE_DOWNGRADE);
assertEquals(updates.get(1).upgradeType(), FeatureUpdate.UpgradeType.UPGRADE);
}
@Test
public void testUpdateFeaturesV1() {
UpdateFeaturesRequestData.FeatureUpdateKeyCollection features =
new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
features.add(new UpdateFeaturesRequestData.FeatureUpdateKey()
.setFeature("foo")
.setMaxVersionLevel((short) 1)
.setUpgradeType(FeatureUpdate.UpgradeType.SAFE_DOWNGRADE.code())
);
features.add(new UpdateFeaturesRequestData.FeatureUpdateKey()
.setFeature("bar")
.setMaxVersionLevel((short) 3)
);
UpdateFeaturesRequest request = new UpdateFeaturesRequest(
new UpdateFeaturesRequestData().setFeatureUpdates(features),
UpdateFeaturesRequestData.HIGHEST_SUPPORTED_VERSION
);
ByteBuffer buffer = request.serialize();
request = UpdateFeaturesRequest.parse(buffer, UpdateFeaturesRequestData.HIGHEST_SUPPORTED_VERSION);
List<UpdateFeaturesRequest.FeatureUpdateItem> updates = new ArrayList<>(request.featureUpdates());
assertEquals(updates.size(), 2);
assertEquals(updates.get(0).upgradeType(), FeatureUpdate.UpgradeType.SAFE_DOWNGRADE);
assertEquals(updates.get(1).upgradeType(), FeatureUpdate.UpgradeType.UPGRADE);
}
@Test
public void testUpdateFeaturesV1OldBoolean() {
UpdateFeaturesRequestData.FeatureUpdateKeyCollection features =
new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
features.add(new UpdateFeaturesRequestData.FeatureUpdateKey()
.setFeature("foo")
.setMaxVersionLevel((short) 1)
.setAllowDowngrade(true)
);
features.add(new UpdateFeaturesRequestData.FeatureUpdateKey()
.setFeature("bar")
.setMaxVersionLevel((short) 3)
);
UpdateFeaturesRequest request = new UpdateFeaturesRequest(
new UpdateFeaturesRequestData().setFeatureUpdates(features),
UpdateFeaturesRequestData.HIGHEST_SUPPORTED_VERSION
);
assertThrows(UnsupportedVersionException.class, request::serialize,
"This should fail since allowDowngrade is not supported in v1 of this RPC");
}
}

View File

@ -17,374 +17,317 @@
package kafka.admin
import kafka.server.BrokerFeatures
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit}
import kafka.tools.TerseFailure
import kafka.utils.Exit
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.{append, fileType, storeTrue}
import net.sourceforge.argparse4j.inf.{Namespace, Subparsers}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, UpdateFeaturesOptions}
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType
import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
import org.apache.kafka.common.utils.Utils
import java.io.File
import java.util.Properties
import scala.collection.Seq
import scala.collection.immutable.ListMap
import scala.jdk.CollectionConverters._
import joptsimple.OptionSpec
import scala.concurrent.ExecutionException
import scala.jdk.CollectionConverters._
object FeatureCommand {
def main(args: Array[String]): Unit = {
val opts = new FeatureCommandOptions(args)
val featureApis = new FeatureApis(opts)
var exitCode = 0
val res = mainNoExit(args)
Exit.exit(res)
}
// This is used for integration tests in order to avoid killing the test with Exit.exit
def mainNoExit(args: Array[String]): Int = {
val parser = ArgumentParsers.newArgumentParser("kafka-features")
.defaultHelp(true)
.description("This tool manages feature flags in Kafka.")
parser.addArgument("--bootstrap-server")
.help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.")
.required(true)
parser.addArgument("--command-config")
.`type`(fileType())
.help("Property file containing configs to be passed to Admin Client.")
val subparsers = parser.addSubparsers().dest("command")
addDescribeParser(subparsers)
addUpgradeParser(subparsers)
addDowngradeParser(subparsers)
addDisableParser(subparsers)
try {
featureApis.execute()
val namespace = parser.parseArgsOrFail(args)
val command = namespace.getString("command")
val commandConfig = namespace.get[File]("command_config")
val props = if (commandConfig != null) {
if (!commandConfig.exists()) {
throw new TerseFailure(s"Properties file ${commandConfig.getPath} does not exists!")
}
Utils.loadProps(commandConfig.getPath)
} else {
new Properties()
}
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server"))
val admin = Admin.create(props)
command match {
case "describe" => handleDescribe(namespace, admin)
case "upgrade" => handleUpgrade(namespace, admin)
case "downgrade" => handleDowngrade(namespace, admin)
case "disable" => handleDisable(namespace, admin)
}
admin.close()
0
} catch {
case e: IllegalArgumentException =>
printException(e)
opts.parser.printHelpOn(System.err)
exitCode = 1
case _: UpdateFeaturesException =>
exitCode = 1
case e: ExecutionException =>
val cause = if (e.getCause == null) e else e.getCause
printException(cause)
exitCode = 1
case e: Throwable =>
printException(e)
exitCode = 1
} finally {
featureApis.close()
Exit.exit(exitCode)
case e: TerseFailure =>
System.err.println(e.getMessage)
1
}
}
private def printException(exception: Throwable): Unit = {
System.err.println("\nError encountered when executing command: " + Utils.stackTrace(exception))
}
}
def addDescribeParser(subparsers: Subparsers): Unit = {
val describeParser = subparsers.addParser("describe")
.help("Describe one or more feature flags.")
class UpdateFeaturesException(message: String) extends RuntimeException(message)
val featureArgs = describeParser.addArgumentGroup("Specific Features")
featureArgs.addArgument("--feature")
.action(append())
.help("A specific feature to describe. This option may be repeated for describing multiple feature flags.")
/**
* A class that provides necessary APIs to bridge feature APIs provided by the Admin client with
* the requirements of the CLI tool.
*
* @param opts the CLI options
*/
class FeatureApis(private var opts: FeatureCommandOptions) {
private var supportedFeatures = BrokerFeatures.createDefault().supportedFeatures
private var adminClient = FeatureApis.createAdminClient(opts)
private def pad(op: String): String = {
f"$op%11s"
val releaseArgs = describeParser.addArgumentGroup("All Features for release")
releaseArgs.addArgument("--release")
}
private val addOp = pad("[Add]")
private val upgradeOp = pad("[Upgrade]")
private val deleteOp = pad("[Delete]")
private val downgradeOp = pad("[Downgrade]")
def addUpgradeParser(subparsers: Subparsers): Unit = {
val upgradeParser = subparsers.addParser("upgrade")
.help("Upgrade one or more feature flags.")
// For testing only.
private[admin] def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
supportedFeatures = newFeatures
val featureArgs = upgradeParser.addArgumentGroup("Upgrade specific features")
featureArgs.addArgument("--feature")
.action(append())
.help("A feature flag to upgrade. This option may be repeated for upgrading multiple feature flags.")
featureArgs.addArgument("--version")
.`type`(classOf[Short])
.help("The version to upgrade to.")
.action(append())
val releaseArgs = upgradeParser.addArgumentGroup("Upgrade to feature level defined for a given release")
releaseArgs.addArgument("--release")
upgradeParser.addArgument("--dry-run")
.help("Perform a dry-run of this upgrade operation.")
.action(storeTrue())
}
// For testing only.
private[admin] def setOptions(newOpts: FeatureCommandOptions): Unit = {
adminClient.close()
adminClient = FeatureApis.createAdminClient(newOpts)
opts = newOpts
def addDowngradeParser(subparsers: Subparsers): Unit = {
val downgradeParser = subparsers.addParser("downgrade")
.help("Upgrade one or more feature flags.")
downgradeParser.addArgument("--feature")
.help("A feature flag to downgrade. This option may be repeated for downgrade multiple feature flags.")
.required(true)
.action(append())
downgradeParser.addArgument("--version")
.`type`(classOf[Short])
.help("The version to downgrade to.")
.required(true)
.action(append())
downgradeParser.addArgument("--unsafe")
.help("Perform this downgrade even if it considered unsafe. Refer to specific feature flag documentation for details.")
.action(storeTrue())
downgradeParser.addArgument("--dry-run")
.help("Perform a dry-run of this downgrade operation.")
.action(storeTrue())
}
/**
* Describes the supported and finalized features. The request is issued to any of the provided
* bootstrap servers.
*/
def describeFeatures(): Unit = {
val result = adminClient.describeFeatures.featureMetadata.get
val features = result.supportedFeatures.asScala.keys.toSet ++ result.finalizedFeatures.asScala.keys.toSet
def addDisableParser(subparsers: Subparsers): Unit = {
val disableParser = subparsers.addParser("disable")
.help("Disable one or more feature flags. This is the same as downgrading the version to zero.")
features.toList.sorted.foreach {
feature =>
val output = new StringBuilder()
output.append(s"Feature: $feature")
disableParser.addArgument("--feature")
.help("A feature flag to disable. This option may be repeated for disable multiple feature flags.")
.required(true)
.action(append())
disableParser.addArgument("--unsafe")
.help("Disable the feature flag(s) even if it considered unsafe. Refer to specific feature flag documentation for details.")
.action(storeTrue())
disableParser.addArgument("--dry-run")
.help("Perform a dry-run of this disable operation.")
.action(storeTrue())
}
val (supportedMinVersion, supportedMaxVersion) = {
val supportedVersionRange = result.supportedFeatures.get(feature)
if (supportedVersionRange == null) {
("-", "-")
} else {
(supportedVersionRange.minVersion, supportedVersionRange.maxVersion)
}
}
output.append(s"\tSupportedMinVersion: $supportedMinVersion")
output.append(s"\tSupportedMaxVersion: $supportedMaxVersion")
val (finalizedMinVersionLevel, finalizedMaxVersionLevel) = {
val finalizedVersionRange = result.finalizedFeatures.get(feature)
if (finalizedVersionRange == null) {
("-", "-")
} else {
(finalizedVersionRange.minVersionLevel, finalizedVersionRange.maxVersionLevel)
}
}
output.append(s"\tFinalizedMinVersionLevel: $finalizedMinVersionLevel")
output.append(s"\tFinalizedMaxVersionLevel: $finalizedMaxVersionLevel")
val epoch = {
if (result.finalizedFeaturesEpoch.isPresent) {
result.finalizedFeaturesEpoch.get.toString
} else {
"-"
}
}
output.append(s"\tEpoch: $epoch")
println(output)
def handleDescribe(namespace: Namespace, admin: Admin): Unit = {
val featureFilter = parseFeaturesOrRelease(namespace) match {
case Neither() => (_: String) => true
case Features(featureNames) => (feature: String) => featureNames.contains(feature)
case Release(release) =>
// Special case, print the versions associated with the given release
printReleaseFeatures(release)
return
case Both() => throw new TerseFailure("Only one of --release or --feature may be specified with describe sub-command.")
}
}
/**
* Upgrades all features known to this tool to their highest max version levels. The method may
* add new finalized features if they were not finalized previously, but it does not delete
* any existing finalized feature. The results of the feature updates are written to STDOUT.
*
* NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
* updates to STDOUT, without applying them.
*
* @throws UpdateFeaturesException if at least one of the feature updates failed
*/
def upgradeAllFeatures(): Unit = {
val metadata = adminClient.describeFeatures.featureMetadata.get
val existingFinalizedFeatures = metadata.finalizedFeatures
val updates = supportedFeatures.features.asScala.map {
case (feature, targetVersionRange) =>
val existingVersionRange = existingFinalizedFeatures.get(feature)
if (existingVersionRange == null) {
val updateStr =
addOp +
s"\tFeature: $feature" +
s"\tExistingFinalizedMaxVersion: -" +
s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
(feature, Some((updateStr, new FeatureUpdate(targetVersionRange.max, false))))
} else {
if (targetVersionRange.max > existingVersionRange.maxVersionLevel) {
val updateStr =
upgradeOp +
s"\tFeature: $feature" +
s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
(feature, Some((updateStr, new FeatureUpdate(targetVersionRange.max, false))))
} else {
(feature, Option.empty)
}
}
}.filter {
case(_, updateInfo) => updateInfo.isDefined
}.map {
case(feature, updateInfo) => (feature, updateInfo.get)
}.toMap
if (updates.nonEmpty) {
maybeApplyFeatureUpdates(updates)
}
}
/**
* Downgrades existing finalized features to the highest max version levels known to this tool.
* The method may delete existing finalized features if they are no longer seen to be supported,
* but it does not add a feature that was not finalized previously. The results of the feature
* updates are written to STDOUT.
*
* NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
* updates to STDOUT, without applying them.
*
* @throws UpdateFeaturesException if at least one of the feature updates failed
*/
def downgradeAllFeatures(): Unit = {
val metadata = adminClient.describeFeatures.featureMetadata.get
val existingFinalizedFeatures = metadata.finalizedFeatures
val supportedFeaturesMap = supportedFeatures.features
val updates = existingFinalizedFeatures.asScala.map {
case (feature, existingVersionRange) =>
val targetVersionRange = supportedFeaturesMap.get(feature)
if (targetVersionRange == null) {
val updateStr =
deleteOp +
s"\tFeature: $feature" +
s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
s"\tNewFinalizedMaxVersion: -"
(feature, Some(updateStr, new FeatureUpdate(0, true)))
} else {
if (targetVersionRange.max < existingVersionRange.maxVersionLevel) {
val updateStr =
downgradeOp +
s"\tFeature: $feature" +
s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
(feature, Some(updateStr, new FeatureUpdate(targetVersionRange.max, true)))
} else {
(feature, Option.empty)
}
}
}.filter {
case(_, updateInfo) => updateInfo.isDefined
}.map {
case(feature, updateInfo) => (feature, updateInfo.get)
}.toMap
if (updates.nonEmpty) {
maybeApplyFeatureUpdates(updates)
}
}
/**
* Applies the provided feature updates. If the --dry-run CLI option is provided, the method
* only prints the expected feature updates to STDOUT without applying them.
*
* @param updates the feature updates to be applied via the admin client
*
* @throws UpdateFeaturesException if at least one of the feature updates failed
*/
private def maybeApplyFeatureUpdates(updates: Map[String, (String, FeatureUpdate)]): Unit = {
if (opts.hasDryRunOption) {
println("Expected feature updates:" + ListMap(
updates
.toSeq
.sortBy { case(feature, _) => feature} :_*)
.map { case(_, (updateStr, _)) => updateStr}
.mkString("\n"))
val featureMetadata = admin.describeFeatures().featureMetadata().get()
val featureEpoch = featureMetadata.finalizedFeaturesEpoch()
val epochString = if (featureEpoch.isPresent) {
s"Epoch: ${featureEpoch.get}"
} else {
val result = adminClient.updateFeatures(
updates
.map { case(feature, (_, update)) => (feature, update)}
.asJava,
new UpdateFeaturesOptions())
val resultSortedByFeature = ListMap(
result
.values
.asScala
.toSeq
.sortBy { case(feature, _) => feature} :_*)
val failures = resultSortedByFeature.map {
case (feature, updateFuture) =>
val (updateStr, _) = updates(feature)
try {
updateFuture.get
println(updateStr + "\tResult: OK")
0
} catch {
case e: ExecutionException =>
val cause = if (e.getCause == null) e else e.getCause
println(updateStr + "\tResult: FAILED due to " + cause)
1
case e: Throwable =>
println(updateStr + "\tResult: FAILED due to " + e)
1
"Epoch: -"
}
val finalized = featureMetadata.finalizedFeatures().asScala
featureMetadata.supportedFeatures().asScala.foreach {
case (feature, range) =>
if (featureFilter.apply(feature)) {
if (finalized.contains(feature)) {
println(s"Feature: $feature\tSupportedMinVersion: ${range.minVersion()}\t" +
s"SupportedMaxVersion: ${range.maxVersion()}\tFinalizedVersionLevel: ${finalized(feature).maxVersionLevel()}\t$epochString")
} else {
println(s"Feature: $feature\tSupportedMinVersion: ${range.minVersion()}\t" +
s"SupportedMaxVersion: ${range.maxVersion()}\tFinalizedVersionLevel: -\t$epochString")
}
}.sum
if (failures > 0) {
throw new UpdateFeaturesException(s"$failures feature updates failed!")
}
}
}
def printReleaseFeatures(release: String): Unit = {
println(s"Default feature versions for release $release:")
}
def handleUpgrade(namespace: Namespace, admin: Admin): Unit = {
val featuresToUpgrade = parseFeaturesOrRelease(namespace) match {
case Features(featureNames) => parseVersions(featureNames, namespace)
case Release(release) => featuresForRelease(release)
case Neither() => throw new TerseFailure("Must specify either --release or at least one --feature and --version with upgrade sub-command.")
case Both() => throw new TerseFailure("Cannot specify both --release and --feature with upgrade sub-command.")
}
val dryRun = namespace.getBoolean("dry_run")
val updateResult = admin.updateFeatures(featuresToUpgrade.map { case (feature, version) =>
feature -> new FeatureUpdate(version, UpgradeType.UPGRADE)
}.asJava, new UpdateFeaturesOptions().validateOnly(dryRun))
handleUpdateFeaturesResponse(updateResult, featuresToUpgrade, dryRun, "upgrade")
}
def handleDowngrade(namespace: Namespace, admin: Admin): Unit = {
val featuresToDowngrade = parseFeaturesOrRelease(namespace) match {
case Features(featureNames) => parseVersions(featureNames, namespace)
case Neither() => throw new TerseFailure("Must specify at least one --feature and --version with downgrade sub-command.")
case _ => throw new IllegalStateException()
}
val dryRun = namespace.getBoolean("dry_run")
val unsafe = namespace.getBoolean("unsafe")
val updateResult = admin.updateFeatures(featuresToDowngrade.map { case (feature, version) =>
if (unsafe) {
feature -> new FeatureUpdate(version, UpgradeType.UNSAFE_DOWNGRADE)
} else {
feature -> new FeatureUpdate(version, UpgradeType.SAFE_DOWNGRADE)
}
}.asJava, new UpdateFeaturesOptions().validateOnly(dryRun))
handleUpdateFeaturesResponse(updateResult, featuresToDowngrade, dryRun, "downgrade")
}
def handleDisable(namespace: Namespace, admin: Admin): Unit = {
val featuresToDisable = parseFeaturesOrRelease(namespace) match {
case Features(featureNames) => featureNames
case Neither() => throw new TerseFailure("Must specify at least one --feature and --version with downgrade sub-command.")
case _ => throw new IllegalStateException()
}
val dryRun = namespace.getBoolean("dry_run")
val unsafe = namespace.getBoolean("unsafe")
val updateResult = admin.updateFeatures(featuresToDisable.map { feature =>
if (unsafe) {
feature -> new FeatureUpdate(0.toShort, UpgradeType.UNSAFE_DOWNGRADE)
} else {
feature -> new FeatureUpdate(0.toShort, UpgradeType.SAFE_DOWNGRADE)
}
}.toMap.asJava, new UpdateFeaturesOptions().validateOnly(dryRun))
handleUpdateFeaturesResponse(updateResult, featuresToDisable.map {
feature => feature -> 0.toShort
}.toMap, dryRun, "disable")
}
def handleUpdateFeaturesResponse(updateResult: UpdateFeaturesResult,
updatedFeatures: Map[String, Short],
dryRun: Boolean,
op: String): Unit = {
val errors = updateResult.values().asScala.map { case (feature, future) =>
try {
future.get()
feature -> None
} catch {
case e: ExecutionException => feature -> Some(e.getCause)
case t: Throwable => feature -> Some(t)
}
}
errors.foreach { case (feature, maybeThrowable) =>
if (maybeThrowable.isDefined) {
if (dryRun) {
System.out.println(s"Can not $op feature '$feature' to ${updatedFeatures(feature)}. ${maybeThrowable.get.getMessage}")
} else {
System.out.println(s"Could not $op feature '$feature' to ${updatedFeatures(feature)}. ${maybeThrowable.get.getMessage}")
}
} else {
if (dryRun) {
System.out.println(s"Feature '$feature' can be ${op}d to ${updatedFeatures(feature)}.")
} else {
System.out.println(s"Feature '$feature' was ${op}d to ${updatedFeatures(feature)}.")
}
}
}
}
def execute(): Unit = {
if (opts.hasDescribeOption) {
describeFeatures()
} else if (opts.hasUpgradeAllOption) {
upgradeAllFeatures()
} else if (opts.hasDowngradeAllOption) {
downgradeAllFeatures()
sealed trait ReleaseOrFeatures { }
case class Neither() extends ReleaseOrFeatures
case class Release(release: String) extends ReleaseOrFeatures
case class Features(featureNames: Seq[String]) extends ReleaseOrFeatures
case class Both() extends ReleaseOrFeatures
def parseFeaturesOrRelease(namespace: Namespace): ReleaseOrFeatures = {
val release = namespace.getString("release")
val features = namespace.getList[String]("feature").asScala
if (release != null && features != null) {
Both()
} else if (release == null && features == null) {
Neither()
} else if (release != null) {
Release(release)
} else {
throw new IllegalStateException("Unexpected state: no CLI command could be executed.")
Features(features)
}
}
def close(): Unit = {
adminClient.close()
}
}
class FeatureCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
private val bootstrapServerOpt = parser.accepts(
"bootstrap-server",
"REQUIRED: A comma-separated list of host:port pairs to use for establishing the connection" +
" to the Kafka cluster.")
.withRequiredArg
.describedAs("server to connect to")
.ofType(classOf[String])
private val commandConfigOpt = parser.accepts(
"command-config",
"Property file containing configs to be passed to Admin Client." +
" This is used with --bootstrap-server option when required.")
.withOptionalArg
.describedAs("command config property file")
.ofType(classOf[String])
private val describeOpt = parser.accepts(
"describe",
"Describe supported and finalized features from a random broker.")
private val upgradeAllOpt = parser.accepts(
"upgrade-all",
"Upgrades all finalized features to the maximum version levels known to the tool." +
" This command finalizes new features known to the tool that were never finalized" +
" previously in the cluster, but it is guaranteed to not delete any existing feature.")
private val downgradeAllOpt = parser.accepts(
"downgrade-all",
"Downgrades all finalized features to the maximum version levels known to the tool." +
" This command deletes unknown features from the list of finalized features in the" +
" cluster, but it is guaranteed to not add a new feature.")
private val dryRunOpt = parser.accepts(
"dry-run",
"Performs a dry-run of upgrade/downgrade mutations to finalized feature without applying them.")
options = parser.parse(args : _*)
checkArgs()
def has(builder: OptionSpec[_]): Boolean = options.has(builder)
def hasDescribeOption: Boolean = has(describeOpt)
def hasDryRunOption: Boolean = has(dryRunOpt)
def hasUpgradeAllOption: Boolean = has(upgradeAllOpt)
def hasDowngradeAllOption: Boolean = has(downgradeAllOpt)
def commandConfig: Properties = {
if (has(commandConfigOpt))
Utils.loadProps(options.valueOf(commandConfigOpt))
else
new Properties()
}
def bootstrapServers: String = options.valueOf(bootstrapServerOpt)
def checkArgs(): Unit = {
CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool describes and updates finalized features.")
val numActions = Seq(describeOpt, upgradeAllOpt, downgradeAllOpt).count(has)
if (numActions != 1) {
CommandLineUtils.printUsageAndDie(
parser,
"Command must include exactly one action: --describe, --upgrade-all, --downgrade-all.")
def parseVersions(features: Seq[String], namespace: Namespace): Map[String, Short] = {
val versions = namespace.getList[Short]("version").asScala
if (versions == null) {
throw new TerseFailure("Must specify --version when using --feature argument(s).")
}
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
if (hasDryRunOption && !hasUpgradeAllOption && !hasDowngradeAllOption) {
CommandLineUtils.printUsageAndDie(
parser,
"Command can contain --dry-run option only when either --upgrade-all or --downgrade-all actions are provided.")
if (versions.size != features.size) {
if (versions.size > features.size) {
throw new TerseFailure("Too many --version arguments given. For each --feature argument there should be one --version argument.")
} else {
throw new TerseFailure("Too many --feature arguments given. For each --feature argument there should be one --version argument.")
}
}
features.zip(versions).map { case (feature, version) =>
feature -> version
}.toMap
}
}
object FeatureApis {
private def createAdminClient(opts: FeatureCommandOptions): Admin = {
val props = new Properties()
props.putAll(opts.commandConfig)
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.bootstrapServers)
Admin.create(props)
def defaultFeatures(): Map[String, Short] = {
Map.empty
}
def featuresForRelease(release: String): Map[String, Short] = {
Map.empty
}
}

View File

@ -33,12 +33,13 @@ import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zk.TopicZNode.TopicIdReplicaAssignment
import kafka.zk.{FeatureZNodeStatus, _}
import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler, ZNodeChildChangeHandler}
import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType
import org.apache.kafka.common.ElectionType
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, StaleBrokerEpochException}
import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
import org.apache.kafka.common.message.{AllocateProducerIdsRequestData, AllocateProducerIdsResponseData, AlterPartitionRequestData, AlterPartitionResponseData, UpdateFeaturesRequestData}
import org.apache.kafka.common.message.{AllocateProducerIdsRequestData, AllocateProducerIdsResponseData, AlterPartitionRequestData, AlterPartitionResponseData}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse}
@ -1928,8 +1929,9 @@ class KafkaController(val config: KafkaConfig,
*
* @return the new FinalizedVersionRange or error, as described above.
*/
private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, ApiError] = {
if (UpdateFeaturesRequest.isDeleteRequest(update)) {
private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequest.FeatureUpdateItem):
Either[FinalizedVersionRange, ApiError] = {
if (update.isDeleteRequest) {
throw new IllegalArgumentException(s"Provided feature update can not be meant to delete the feature: $update")
}
@ -1941,7 +1943,7 @@ class KafkaController(val config: KafkaConfig,
} else {
var newVersionRange: FinalizedVersionRange = null
try {
newVersionRange = new FinalizedVersionRange(supportedVersionRange.min, update.maxVersionLevel)
newVersionRange = new FinalizedVersionRange(update.versionLevel(), update.versionLevel())
} catch {
case _: IllegalArgumentException => {
// This exception means the provided maxVersionLevel is invalid. It is handled below
@ -1951,7 +1953,7 @@ class KafkaController(val config: KafkaConfig,
if (newVersionRange == null) {
Right(new ApiError(Errors.INVALID_REQUEST,
"Could not apply finalized feature update because the provided" +
s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
s" maxVersionLevel:${update.versionLevel} is lower than the" +
s" supported minVersion:${supportedVersionRange.min}."))
} else {
val newFinalizedFeature =
@ -1985,9 +1987,9 @@ class KafkaController(val config: KafkaConfig,
* @return the new FinalizedVersionRange to be updated into ZK or error
* as described above.
*/
private def validateFeatureUpdate(update: UpdateFeaturesRequestData.FeatureUpdateKey,
private def validateFeatureUpdate(update: UpdateFeaturesRequest.FeatureUpdateItem,
existingVersionRange: Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] = {
def newVersionRangeOrError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[Option[FinalizedVersionRange], ApiError] = {
def newVersionRangeOrError(update: UpdateFeaturesRequest.FeatureUpdateItem): Either[Option[FinalizedVersionRange], ApiError] = {
newFinalizedVersionRangeOrIncompatibilityError(update)
.fold(versionRange => Left(Some(versionRange)), error => Right(error))
}
@ -1995,9 +1997,12 @@ class KafkaController(val config: KafkaConfig,
if (update.feature.isEmpty) {
// Check that the feature name is not empty.
Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be empty."))
} else if (update.upgradeType.equals(UpgradeType.UNKNOWN)) {
Right(new ApiError(Errors.INVALID_REQUEST, "Received unknown upgrade type."))
} else {
// We handle deletion requests separately from non-deletion requests.
if (UpdateFeaturesRequest.isDeleteRequest(update)) {
if (update.isDeleteRequest) {
if (existingVersionRange.isEmpty) {
// Disallow deletion of a non-existing finalized feature.
Right(new ApiError(Errors.INVALID_REQUEST,
@ -2005,39 +2010,33 @@ class KafkaController(val config: KafkaConfig,
} else {
Left(Option.empty)
}
} else if (update.maxVersionLevel() < 1) {
// Disallow deletion of a finalized feature without allowDowngrade flag set.
} else if (update.versionLevel() < 1) {
// Disallow deletion of a finalized feature without SAFE downgrade type.
Right(new ApiError(Errors.INVALID_REQUEST,
s"Can not provide maxVersionLevel: ${update.maxVersionLevel} less" +
s" than 1 without setting the allowDowngrade flag to true in the request."))
s"Can not provide maxVersionLevel: ${update.versionLevel} less" +
s" than 1 without setting the SAFE downgradeType in the request."))
} else {
existingVersionRange.map(existing =>
if (update.maxVersionLevel == existing.max) {
if (update.versionLevel == existing.max) {
// Disallow a case where target maxVersionLevel matches existing maxVersionLevel.
Right(new ApiError(Errors.INVALID_REQUEST,
s"Can not ${if (update.allowDowngrade) "downgrade" else "upgrade"}" +
s"Can not ${if (update.upgradeType.equals(UpgradeType.SAFE_DOWNGRADE)) "downgrade" else "upgrade"}" +
s" a finalized feature from existing maxVersionLevel:${existing.max}" +
" to the same value."))
} else if (update.maxVersionLevel < existing.max && !update.allowDowngrade) {
// Disallow downgrade of a finalized feature without the allowDowngrade flag set.
} else if (update.versionLevel < existing.max && !update.upgradeType.equals(UpgradeType.SAFE_DOWNGRADE)) {
// Disallow downgrade of a finalized feature without the downgradeType set.
Right(new ApiError(Errors.INVALID_REQUEST,
s"Can not downgrade finalized feature from existing" +
s" maxVersionLevel:${existing.max} to provided" +
s" maxVersionLevel:${update.maxVersionLevel} without setting the" +
" allowDowngrade flag in the request."))
} else if (update.allowDowngrade && update.maxVersionLevel > existing.max) {
// Disallow a request that sets allowDowngrade flag without specifying a
s" maxVersionLevel:${update.versionLevel} without setting the" +
" downgradeType to SAFE in the request."))
} else if (!update.upgradeType.equals(UpgradeType.UPGRADE) && update.versionLevel > existing.max) {
// Disallow a request that sets downgradeType without specifying a
// maxVersionLevel that's lower than the existing maxVersionLevel.
Right(new ApiError(Errors.INVALID_REQUEST,
s"When the allowDowngrade flag set in the request, the provided" +
s" maxVersionLevel:${update.maxVersionLevel} can not be greater than" +
s"When the downgradeType is set to SAFE set in the request, the provided" +
s" maxVersionLevel:${update.versionLevel} can not be greater than" +
s" existing maxVersionLevel:${existing.max}."))
} else if (update.maxVersionLevel < existing.min) {
// Disallow downgrade of a finalized feature below the existing finalized
// minVersionLevel.
Right(new ApiError(Errors.INVALID_REQUEST,
s"Can not downgrade finalized feature to maxVersionLevel:${update.maxVersionLevel}" +
s" because it's lower than the existing minVersionLevel:${existing.min}."))
} else {
newVersionRangeOrError(update)
}
@ -2057,7 +2056,7 @@ class KafkaController(val config: KafkaConfig,
private def processFeatureUpdatesWithActiveController(request: UpdateFeaturesRequest,
callback: UpdateFeaturesCallback): Unit = {
val updates = request.data.featureUpdates
val updates = request.featureUpdates
val existingFeatures = featureCache.get
.map(featuresAndEpoch => featuresAndEpoch.features.features().asScala)
.getOrElse(Map[String, FinalizedVersionRange]())

View File

@ -19,8 +19,8 @@ package kafka.server
import kafka.utils.Logging
import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
import org.apache.kafka.common.feature.Features._
import java.util
import scala.jdk.CollectionConverters._
/**
@ -32,7 +32,9 @@ import scala.jdk.CollectionConverters._
class BrokerFeatures private (@volatile var supportedFeatures: Features[SupportedVersionRange]) {
// For testing only.
def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
supportedFeatures = newFeatures
val combined = new util.HashMap[String, SupportedVersionRange](supportedFeatures.features())
combined.putAll(newFeatures.features())
supportedFeatures = Features.supportedFeatures(combined)
}
/**
@ -43,7 +45,7 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte
Features.finalizedFeatures(
supportedFeatures.features.asScala.map {
case(name, versionRange) => (
name, new FinalizedVersionRange(versionRange.min, versionRange.max))
name, new FinalizedVersionRange(versionRange.max, versionRange.max))
}.asJava)
}
@ -70,9 +72,7 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte
object BrokerFeatures extends Logging {
def createDefault(): BrokerFeatures = {
// The arguments are currently empty, but, in the future as we define features we should
// populate the required values here.
new BrokerFeatures(emptySupportedFeatures)
new BrokerFeatures(Features.emptySupportedFeatures())
}
/**

View File

@ -33,6 +33,7 @@ import kafka.security.CredentialProvider
import kafka.server.KafkaRaftServer.ControllerRole
import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache, SnapshotWriterBuilder}
import kafka.utils.{CoreUtils, KafkaScheduler}
import org.apache.kafka.common.feature.SupportedVersionRange
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
import org.apache.kafka.common.metrics.Metrics
@ -80,8 +81,7 @@ class BrokerServer(
val metrics: Metrics,
val threadNamePrefix: Option[String],
val initialOfflineDirs: Seq[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
val supportedFeatures: util.Map[String, VersionRange]
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
) extends KafkaBroker {
override def brokerState: BrokerState = lifecycleManager.state
@ -141,10 +141,6 @@ class BrokerServer(
@volatile var brokerTopicStats: BrokerTopicStats = null
val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
val featureCache: FinalizedFeatureCache = new FinalizedFeatureCache(brokerFeatures)
val clusterId: String = metaProps.clusterId
var metadataSnapshotter: Option[BrokerMetadataSnapshotter] = None
@ -153,6 +149,8 @@ class BrokerServer(
var metadataPublisher: BrokerMetadataPublisher = null
val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
@ -223,6 +221,8 @@ class BrokerServer(
clientToControllerChannelManager.start()
forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager)
val featureCache: FinalizedFeatureCache = new FinalizedFeatureCache(brokerFeatures)
val apiVersionManager = ApiVersionManager(
ListenerType.BROKER,
config,
@ -332,10 +332,16 @@ class BrokerServer(
setPort(if (ep.port == 0) socketServer.boundPort(ep.listenerName) else ep.port).
setSecurityProtocol(ep.securityProtocol.id))
}
val featuresRemapped = brokerFeatures.supportedFeatures.features().asScala.map {
case (k: String, v: SupportedVersionRange) =>
k -> VersionRange.of(v.min, v.max)
}.asJava
lifecycleManager.start(() => metadataListener.highestMetadataOffset,
BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config,
"heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong),
metaProps.clusterId, networkListeners, supportedFeatures)
metaProps.clusterId, networkListeners, featuresRemapped)
// Register a listener with the Raft layer to receive metadata event notifications
raftManager.register(metadataListener)

View File

@ -22,7 +22,6 @@ import java.util.Collections
import java.util.Map.Entry
import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS}
import java.util.concurrent.{CompletableFuture, ExecutionException}
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
@ -48,7 +47,7 @@ import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, Uuid}
import org.apache.kafka.controller.Controller
import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply, VersionRange}
import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
@ -62,7 +61,6 @@ class ControllerApis(val requestChannel: RequestChannel,
val authorizer: Option[Authorizer],
val quotas: QuotaManagers,
val time: Time,
val supportedFeatures: Map[String, VersionRange],
val controller: Controller,
val raftManager: RaftManager[ApiMessageAndVersion],
val config: KafkaConfig,
@ -108,6 +106,7 @@ class ControllerApis(val requestChannel: RequestChannel,
case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request)
case ApiKeys.ELECT_LEADERS => handleElectLeaders(request)
case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request)
case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
}
} catch {
@ -784,4 +783,18 @@ class ControllerApis(val requestChannel: RequestChannel,
}
})
}
def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
authHelper.authorizeClusterOperation(request, ALTER)
controller.updateFeatures(updateFeaturesRequest.data)
.whenComplete((response, exception) => {
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new UpdateFeaturesResponse(response.setThrottleTimeMs(requestThrottleMs)))
}
})
}
}

View File

@ -37,8 +37,8 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumControllerMetrics}
import org.apache.kafka.metadata.{KafkaConfigSchema, VersionRange}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumControllerMetrics, QuorumFeatures}
import org.apache.kafka.metadata.KafkaConfigSchema
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.server.authorizer.Authorizer
@ -79,7 +79,6 @@ class ControllerServer(
var createTopicPolicy: Option[CreateTopicPolicy] = None
var alterConfigPolicy: Option[AlterConfigPolicy] = None
var controller: Controller = null
val supportedFeatures: Map[String, VersionRange] = Map()
var quotaManagers: QuotaManagers = null
var controllerApis: ControllerApis = null
var controllerApisHandlerPool: KafkaRequestHandlerPool = null
@ -161,6 +160,8 @@ class ControllerServer(
alterConfigPolicy = Option(config.
getConfiguredInstance(AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))
val quorumFeatures = QuorumFeatures.create(config.nodeId, QuorumFeatures.defaultFeatureMap())
val controllerBuilder = {
val leaderImbalanceCheckIntervalNs = if (config.autoLeaderRebalanceEnable) {
OptionalLong.of(TimeUnit.NANOSECONDS.convert(config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS))
@ -173,6 +174,7 @@ class ControllerServer(
setThreadNamePrefix(threadNamePrefixAsString).
setConfigSchema(configSchema).
setRaftClient(raftManager.client).
setQuorumFeatures(quorumFeatures).
setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
setDefaultNumPartitions(config.numPartitions.intValue()).
setIsLeaderRecoverySupported(config.interBrokerProtocolVersion >= KAFKA_3_2_IV0).
@ -198,7 +200,6 @@ class ControllerServer(
authorizer,
quotaManagers,
time,
supportedFeatures,
controller,
raftManager,
config,

View File

@ -138,10 +138,10 @@ class FinalizedFeatureCache(private val brokerFeatures: BrokerFeatures) extends
val newFeatures = new util.HashMap[String, FinalizedVersionRange]()
newFeatures.putAll(features.features.features())
featuresDelta.changes().entrySet().forEach { e =>
e.getValue().asScala match {
e.getValue.asScala match {
case None => newFeatures.remove(e.getKey)
case Some(feature) => newFeatures.put(e.getKey,
new FinalizedVersionRange(feature.min(), feature.max()))
case Some(version) => newFeatures.put(e.getKey,
new FinalizedVersionRange(version, version))
}
}
featuresAndEpoch = Some(FinalizedFeaturesAndEpoch(Features.finalizedFeatures(

View File

@ -86,8 +86,7 @@ class KafkaRaftServer(
metrics,
threadNamePrefix,
offlineDirs,
controllerQuorumVotersFuture,
Server.SUPPORTED_FEATURES
controllerQuorumVotersFuture
))
} else {
None

View File

@ -16,15 +16,12 @@
*/
package kafka.server
import java.util.Collections
import java.util.concurrent.TimeUnit
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, MetricConfig, Metrics, MetricsReporter, Sensor}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.VersionRange
import scala.jdk.CollectionConverters._
trait Server {
def startup(): Unit
@ -99,7 +96,4 @@ object Server {
case object STARTING extends ProcessStatus
case object STARTED extends ProcessStatus
case object SHUTTING_DOWN extends ProcessStatus
val SUPPORTED_FEATURES = Collections.
unmodifiableMap[String, VersionRange](Map[String, VersionRange]().asJava)
}

View File

@ -41,6 +41,8 @@ import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
@ -49,7 +51,7 @@ import org.apache.kafka.controller.Controller;
import org.apache.kafka.controller.ResultOrError;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
@ -246,7 +248,7 @@ public class MockController implements Controller {
}
@Override
public CompletableFuture<FeatureMapAndEpoch> finalizedFeatures() {
public CompletableFuture<FinalizedControllerFeatures> finalizedFeatures() {
throw new UnsupportedOperationException();
}
@ -349,6 +351,11 @@ public class MockController implements Controller {
throw new UnsupportedOperationException();
}
@Override
public CompletableFuture<UpdateFeaturesResponseData> updateFeatures(UpdateFeaturesRequestData request) {
throw new UnsupportedOperationException();
}
@Override
synchronized public CompletableFuture<List<CreatePartitionsTopicResult>>
createPartitions(long deadlineNs, List<CreatePartitionsTopic> topicList) {

View File

@ -24,7 +24,6 @@ import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaRaftServer;
import kafka.server.MetaProperties;
import kafka.server.Server;
import kafka.tools.StorageTool;
import kafka.utils.Logging;
import org.apache.kafka.clients.CommonClientConfigs;
@ -238,8 +237,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
new Metrics(),
Option.apply(threadNamePrefix),
JavaConverters.asScalaBuffer(Collections.<String>emptyList()).toSeq(),
connectFutureManager.future,
Server.SUPPORTED_FEATURES()
connectFutureManager.future
);
brokers.put(node.id(), broker);
raftManagers.put(node.id(), raftManager);

View File

@ -85,8 +85,7 @@ class KRaftQuorumImplementation(val raftManager: KafkaRaftManager[ApiMessageAndV
metrics = new Metrics(),
threadNamePrefix = Some("Broker%02d_".format(config.nodeId)),
initialOfflineDirs = Seq(),
controllerQuorumVotersFuture = controllerQuorumVotersFuture,
supportedFeatures = Collections.emptyMap())
controllerQuorumVotersFuture = controllerQuorumVotersFuture)
if (startup) broker.startup()
broker
}

View File

@ -26,7 +26,7 @@ import org.apache.kafka.common.utils.Utils
import java.util.Properties
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, assertThrows}
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
class FeatureCommandTest extends BaseRequestTest {
@ -75,167 +75,37 @@ class FeatureCommandTest extends BaseRequestTest {
@Test
def testDescribeFeaturesSuccess(): Unit = {
updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
val featureApis = new FeatureApis(new FeatureCommandOptions(Array("--bootstrap-server", bootstrapServers(), "--describe")))
featureApis.setSupportedFeatures(defaultSupportedFeatures)
try {
val initialDescribeOutput = TestUtils.grabConsoleOutput(featureApis.describeFeatures())
val expectedInitialDescribeOutput =
"Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedMinVersionLevel: -\tFinalizedMaxVersionLevel: -\tEpoch: 0\n" +
"Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedMinVersionLevel: -\tFinalizedMaxVersionLevel: -\tEpoch: 0\n"
assertEquals(expectedInitialDescribeOutput, initialDescribeOutput)
featureApis.upgradeAllFeatures()
val finalDescribeOutput = TestUtils.grabConsoleOutput(featureApis.describeFeatures())
val expectedFinalDescribeOutput =
"Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedMinVersionLevel: 1\tFinalizedMaxVersionLevel: 3\tEpoch: 1\n" +
"Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedMinVersionLevel: 1\tFinalizedMaxVersionLevel: 5\tEpoch: 1\n"
assertEquals(expectedFinalDescribeOutput, finalDescribeOutput)
} finally {
featureApis.close()
val initialDescribeOutput = TestUtils.grabConsoleOutput(FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "describe")))
val expectedInitialDescribeOutputs = Seq(
"Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedVersionLevel: -",
"Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedVersionLevel: -"
)
expectedInitialDescribeOutputs.foreach { expectedOutput =>
assertTrue(initialDescribeOutput.contains(expectedOutput))
}
}
/**
* Tests if the FeatureApis#upgradeAllFeatures API works as expected during a success case.
*/
@Test
def testUpgradeAllFeaturesSuccess(): Unit = {
val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", bootstrapServers(), "--upgrade-all"))
val featureApis = new FeatureApis(upgradeOpts)
try {
// Step (1):
// - Update the supported features across all brokers.
// - Upgrade non-existing feature_1 to maxVersionLevel: 2.
// - Verify results.
val initialSupportedFeatures = Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 2))))
updateSupportedFeaturesInAllBrokers(initialSupportedFeatures)
featureApis.setSupportedFeatures(initialSupportedFeatures)
var output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
var expected =
" [Add]\tFeature: feature_1\tExistingFinalizedMaxVersion: -\tNewFinalizedMaxVersion: 2\tResult: OK\n"
assertEquals(expected, output)
// Step (2):
// - Update the supported features across all brokers.
// - Upgrade existing feature_1 to maxVersionLevel: 3.
// - Upgrade non-existing feature_2 to maxVersionLevel: 5.
// - Verify results.
updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
featureApis.setSupportedFeatures(defaultSupportedFeatures)
output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
expected =
" [Upgrade]\tFeature: feature_1\tExistingFinalizedMaxVersion: 2\tNewFinalizedMaxVersion: 3\tResult: OK\n" +
" [Add]\tFeature: feature_2\tExistingFinalizedMaxVersion: -\tNewFinalizedMaxVersion: 5\tResult: OK\n"
assertEquals(expected, output)
// Step (3):
// - Perform an upgrade of all features again.
// - Since supported features have not changed, expect that the above action does not yield
// any results.
output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
assertTrue(output.isEmpty)
featureApis.setOptions(upgradeOpts)
output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
assertTrue(output.isEmpty)
} finally {
featureApis.close()
FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "upgrade",
"--feature", "feature_1", "--version", "3", "--feature", "feature_2", "--version", "5"))
val upgradeDescribeOutput = TestUtils.grabConsoleOutput(FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "describe")))
val expectedUpgradeDescribeOutput = Seq(
"Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedVersionLevel: 3",
"Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedVersionLevel: 5"
)
expectedUpgradeDescribeOutput.foreach { expectedOutput =>
assertTrue(upgradeDescribeOutput.contains(expectedOutput))
}
}
/**
* Tests if the FeatureApis#downgradeAllFeatures API works as expected during a success case.
*/
@Test
def testDowngradeFeaturesSuccess(): Unit = {
val downgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", bootstrapServers(), "--downgrade-all"))
val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", bootstrapServers(), "--upgrade-all"))
val featureApis = new FeatureApis(upgradeOpts)
try {
// Step (1):
// - Update the supported features across all brokers.
// - Upgrade non-existing feature_1 to maxVersionLevel: 3.
// - Upgrade non-existing feature_2 to maxVersionLevel: 5.
updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
featureApis.setSupportedFeatures(defaultSupportedFeatures)
featureApis.upgradeAllFeatures()
// Step (2):
// - Downgrade existing feature_1 to maxVersionLevel: 2.
// - Delete feature_2 since it is no longer supported by the FeatureApis object.
// - Verify results.
val downgradedFeatures = Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 2))))
featureApis.setSupportedFeatures(downgradedFeatures)
featureApis.setOptions(downgradeOpts)
var output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures())
var expected =
"[Downgrade]\tFeature: feature_1\tExistingFinalizedMaxVersion: 3\tNewFinalizedMaxVersion: 2\tResult: OK\n" +
" [Delete]\tFeature: feature_2\tExistingFinalizedMaxVersion: 5\tNewFinalizedMaxVersion: -\tResult: OK\n"
assertEquals(expected, output)
// Step (3):
// - Perform a downgrade of all features again.
// - Since supported features have not changed, expect that the above action does not yield
// any results.
updateSupportedFeaturesInAllBrokers(downgradedFeatures)
output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures())
assertTrue(output.isEmpty)
// Step (4):
// - Delete feature_1 since it is no longer supported by the FeatureApis object.
// - Verify results.
featureApis.setSupportedFeatures(Features.emptySupportedFeatures())
output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures())
expected =
" [Delete]\tFeature: feature_1\tExistingFinalizedMaxVersion: 2\tNewFinalizedMaxVersion: -\tResult: OK\n"
assertEquals(expected, output)
} finally {
featureApis.close()
}
}
/**
* Tests if the FeatureApis#upgradeAllFeatures API works as expected during a partial failure case.
*/
@Test
def testUpgradeFeaturesFailure(): Unit = {
val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", bootstrapServers(), "--upgrade-all"))
val featureApis = new FeatureApis(upgradeOpts)
try {
// Step (1): Update the supported features across all brokers.
updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
// Step (2):
// - Intentionally setup the FeatureApis object such that it contains incompatible target
// features (viz. feature_2 and feature_3).
// - Upgrade non-existing feature_1 to maxVersionLevel: 4. Expect the operation to fail with
// an incompatibility failure.
// - Upgrade non-existing feature_2 to maxVersionLevel: 5. Expect the operation to succeed.
// - Upgrade non-existing feature_3 to maxVersionLevel: 3. Expect the operation to fail
// since the feature is not supported.
val targetFeaturesWithIncompatibilities =
Features.supportedFeatures(
Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 4)),
Utils.mkEntry("feature_2", new SupportedVersionRange(1, 5)),
Utils.mkEntry("feature_3", new SupportedVersionRange(1, 3))))
featureApis.setSupportedFeatures(targetFeaturesWithIncompatibilities)
val output = TestUtils.grabConsoleOutput({
val exception = assertThrows(classOf[UpdateFeaturesException], () => featureApis.upgradeAllFeatures())
assertEquals("2 feature updates failed!", exception.getMessage)
})
val expected =
" [Add]\tFeature: feature_1\tExistingFinalizedMaxVersion: -" +
"\tNewFinalizedMaxVersion: 4\tResult: FAILED due to" +
" org.apache.kafka.common.errors.InvalidRequestException: Could not apply finalized" +
" feature update because brokers were found to have incompatible versions for the" +
" feature.\n" +
" [Add]\tFeature: feature_2\tExistingFinalizedMaxVersion: -" +
"\tNewFinalizedMaxVersion: 5\tResult: OK\n" +
" [Add]\tFeature: feature_3\tExistingFinalizedMaxVersion: -" +
"\tNewFinalizedMaxVersion: 3\tResult: FAILED due to" +
" org.apache.kafka.common.errors.InvalidRequestException: Could not apply finalized" +
" feature update because the provided feature is not supported.\n"
assertEquals(expected, output)
} finally {
featureApis.close()
FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "downgrade",
"--feature", "feature_1", "--version", "2", "--feature", "feature_2", "--version", "2"))
val downgradeDescribeOutput = TestUtils.grabConsoleOutput(FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "describe")))
val expectedFinalDescribeOutput = Seq(
"Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedVersionLevel: 2",
"Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedVersionLevel: 2"
)
expectedFinalDescribeOutput.foreach { expectedOutput =>
assertTrue(downgradeDescribeOutput.contains(expectedOutput))
}
}
}

View File

@ -19,7 +19,7 @@ package kafka.server
import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.{Disabled, Test}
import scala.jdk.CollectionConverters._
@ -89,6 +89,7 @@ class BrokerFeaturesTest {
}
@Test
@Disabled("Need to remove or rewrite this test after we fully remove FinalizedVersionRange")
def testDefaultFinalizedFeatures(): Unit = {
val brokerFeatures = BrokerFeatures.createDefault()
val supportedFeatures = Features.supportedFeatures(Map[String, SupportedVersionRange](

View File

@ -101,7 +101,6 @@ class ControllerApisTest {
authorizer,
quotas,
time,
Map.empty,
controller,
raftManager,
new KafkaConfig(props),

View File

@ -763,7 +763,7 @@ class RequestQuotaTest extends BaseRequestTest {
object RequestQuotaTest {
val ClusterActions = ApiKeys.zkBrokerApis.asScala.filter(_.clusterAction).toSet
val ClusterActionsWithThrottle = Set(ApiKeys.ALLOCATE_PRODUCER_IDS)
val ClusterActionsWithThrottle = Set(ApiKeys.ALLOCATE_PRODUCER_IDS, ApiKeys.UPDATE_FEATURES)
val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE)
val ClientActions = ApiKeys.zkBrokerApis.asScala.toSet -- ClusterActions -- SaslActions

View File

@ -213,8 +213,8 @@ class UpdateFeaturesTest extends BaseRequestTest {
val targetMaxVersionLevel = (defaultFinalizedFeatures().get("feature_1").max() - 1).asInstanceOf[Short]
testWithInvalidFeatureUpdate[InvalidRequestException](
"feature_1",
new FeatureUpdate(targetMaxVersionLevel,false),
".*Can not downgrade finalized feature.*allowDowngrade.*".r)
new FeatureUpdate(targetMaxVersionLevel, FeatureUpdate.UpgradeType.UPGRADE),
".*Can not downgrade finalized feature.*".r)
}
/**
@ -226,8 +226,8 @@ class UpdateFeaturesTest extends BaseRequestTest {
val targetMaxVersionLevel = (defaultFinalizedFeatures().get("feature_1").max() + 1).asInstanceOf[Short]
testWithInvalidFeatureUpdate[InvalidRequestException](
"feature_1",
new FeatureUpdate(targetMaxVersionLevel, true),
".*When the allowDowngrade flag set in the request, the provided maxVersionLevel:3.*existing maxVersionLevel:2.*".r)
new FeatureUpdate(targetMaxVersionLevel, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
".*When the downgradeType is set to SAFE set in the request, the provided maxVersionLevel:3.*existing maxVersionLevel:2.*".r)
}
/**
@ -264,7 +264,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
assertEquals(Errors.INVALID_REQUEST, Errors.forCode(result.errorCode))
assertNotNull(result.errorMessage)
assertFalse(result.errorMessage.isEmpty)
val exceptionMsgPattern = ".*Can not provide maxVersionLevel: 0 less than 1.*allowDowngrade.*".r
val exceptionMsgPattern = ".*Can not provide maxVersionLevel: 0 less than 1.*".r
assertTrue(exceptionMsgPattern.findFirstIn(result.errorMessage).isDefined, result.errorMessage)
checkFeatures(
adminClient,
@ -282,7 +282,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
def testShouldFailRequestDuringDeletionOfNonExistingFeature(): Unit = {
testWithInvalidFeatureUpdate[InvalidRequestException](
"feature_non_existing",
new FeatureUpdate(3, true),
new FeatureUpdate(3.toShort, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
".*Could not apply finalized feature update because the provided feature is not supported.*".r)
}
@ -295,7 +295,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
val targetMaxVersionLevel = defaultFinalizedFeatures().get("feature_1").max()
testWithInvalidFeatureUpdate[InvalidRequestException](
"feature_1",
new FeatureUpdate(targetMaxVersionLevel, false),
new FeatureUpdate(targetMaxVersionLevel, FeatureUpdate.UpgradeType.UPGRADE),
".*Can not upgrade a finalized feature.*to the same value.*".r)
}
@ -331,7 +331,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
).getOrElse(Features.emptyFinalizedFeatures())
val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
val invalidUpdate = new FeatureUpdate(supportedVersionRange.max(), false)
val invalidUpdate = new FeatureUpdate(supportedVersionRange.max(), FeatureUpdate.UpgradeType.UPGRADE)
val nodeBefore = getFeatureZNode()
val adminClient = createAdminClient()
val result = adminClient.updateFeatures(
@ -393,10 +393,10 @@ class UpdateFeaturesTest extends BaseRequestTest {
val targetFinalizedFeatures = Features.finalizedFeatures(
Utils.mkMap(
Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 3)),
Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 3))))
val update1 = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), false)
val update2 = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), false)
Utils.mkEntry("feature_1", new FinalizedVersionRange(3, 3)),
Utils.mkEntry("feature_2", new FinalizedVersionRange(3, 3))))
val update1 = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), FeatureUpdate.UpgradeType.UPGRADE)
val update2 = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), FeatureUpdate.UpgradeType.UPGRADE)
val adminClient = createAdminClient()
adminClient.updateFeatures(
@ -427,8 +427,8 @@ class UpdateFeaturesTest extends BaseRequestTest {
updateSupportedFeaturesInAllBrokers(supportedFeatures)
val initialFinalizedFeatures = Features.finalizedFeatures(
Utils.mkMap(
Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2)),
Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 4))))
Utils.mkEntry("feature_1", new FinalizedVersionRange(2, 2)),
Utils.mkEntry("feature_2", new FinalizedVersionRange(4, 4))))
val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
// Below we aim to do the following:
@ -436,10 +436,10 @@ class UpdateFeaturesTest extends BaseRequestTest {
// - Valid downgrade of feature_2 maxVersionLevel from 4 to 3
val targetFinalizedFeatures = Features.finalizedFeatures(
Utils.mkMap(
Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 3)),
Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 3))))
val update1 = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), false)
val update2 = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), true)
Utils.mkEntry("feature_1", new FinalizedVersionRange(3, 3)),
Utils.mkEntry("feature_2", new FinalizedVersionRange(3, 3))))
val update1 = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), FeatureUpdate.UpgradeType.UPGRADE)
val update2 = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)
val adminClient = createAdminClient()
adminClient.updateFeatures(
@ -471,8 +471,8 @@ class UpdateFeaturesTest extends BaseRequestTest {
updateSupportedFeaturesInAllBrokers(supportedFeatures)
val initialFinalizedFeatures = Features.finalizedFeatures(
Utils.mkMap(
Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2)),
Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 4))))
Utils.mkEntry("feature_1", new FinalizedVersionRange(2, 2)),
Utils.mkEntry("feature_2", new FinalizedVersionRange(4, 4))))
val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
// Below we aim to do the following:
@ -481,10 +481,10 @@ class UpdateFeaturesTest extends BaseRequestTest {
// (because we intentionally do not set the allowDowngrade flag)
val targetFinalizedFeatures = Features.finalizedFeatures(
Utils.mkMap(
Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 3)),
Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 3))))
val validUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), false)
val invalidUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), false)
Utils.mkEntry("feature_1", new FinalizedVersionRange(3, 3)),
Utils.mkEntry("feature_2", new FinalizedVersionRange(3, 3))))
val validUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), FeatureUpdate.UpgradeType.UPGRADE)
val invalidUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), FeatureUpdate.UpgradeType.UPGRADE)
val adminClient = createAdminClient()
val result = adminClient.updateFeatures(
@ -495,7 +495,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
result.values().get("feature_1").get()
// Expect update for "feature_2" to have failed.
checkException[InvalidRequestException](
result, Map("feature_2" -> ".*Can not downgrade finalized feature.*allowDowngrade.*".r))
result, Map("feature_2" -> ".*Can not downgrade finalized feature.*".r))
val expectedFeatures = Features.finalizedFeatures(
Utils.mkMap(
Utils.mkEntry("feature_1", targetFinalizedFeatures.get("feature_1")),
@ -539,8 +539,8 @@ class UpdateFeaturesTest extends BaseRequestTest {
val initialFinalizedFeatures = Features.finalizedFeatures(
Utils.mkMap(
Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2)),
Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 4))))
Utils.mkEntry("feature_1", new FinalizedVersionRange(2, 2)),
Utils.mkEntry("feature_2", new FinalizedVersionRange(4, 4))))
val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
// Below we aim to do the following:
@ -549,10 +549,10 @@ class UpdateFeaturesTest extends BaseRequestTest {
// - Valid downgrade of feature_2 maxVersionLevel from 4 to 3
val targetFinalizedFeatures = Features.finalizedFeatures(
Utils.mkMap(
Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 3)),
Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 3))))
val invalidUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), false)
val validUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), true)
Utils.mkEntry("feature_1", new FinalizedVersionRange(3, 3)),
Utils.mkEntry("feature_2", new FinalizedVersionRange(3, 3))))
val invalidUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), FeatureUpdate.UpgradeType.UPGRADE)
val validUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)
val adminClient = createAdminClient()
val result = adminClient.updateFeatures(

View File

@ -36,7 +36,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
@ -178,6 +178,13 @@ public class ClusterControlManager {
return brokerRegistrations;
}
Map<Integer, Map<String, VersionRange>> brokerSupportedVersions() {
return brokerRegistrations()
.entrySet()
.stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().supportedFeatures()));
}
Set<Integer> fencedBrokerIds() {
return brokerRegistrations.values()
.stream()
@ -192,7 +199,7 @@ public class ClusterControlManager {
public ControllerResult<BrokerRegistrationReply> registerBroker(
BrokerRegistrationRequestData request,
long brokerEpoch,
FeatureMapAndEpoch finalizedFeatures) {
FinalizedControllerFeatures finalizedFeatures) {
if (heartbeatManager == null) {
throw new RuntimeException("ClusterControlManager is not active.");
}
@ -229,13 +236,14 @@ public class ClusterControlManager {
setSecurityProtocol(listener.securityProtocol()));
}
for (BrokerRegistrationRequestData.Feature feature : request.features()) {
Optional<VersionRange> finalized = finalizedFeatures.map().get(feature.name());
Optional<Short> finalized = finalizedFeatures.get(feature.name());
if (finalized.isPresent()) {
if (!finalized.get().contains(new VersionRange(feature.minSupportedVersion(),
feature.maxSupportedVersion()))) {
if (!VersionRange.of(feature.minSupportedVersion(), feature.maxSupportedVersion()).contains(finalized.get())) {
throw new UnsupportedVersionException("Unable to register because " +
"the broker has an unsupported version of " + feature.name());
"the broker has an unsupported version of " + feature.name());
}
} else {
log.warn("Broker registered with feature {} that is unknown to the controller", feature.name());
}
record.features().add(new BrokerFeature().
setName(feature.name()).
@ -265,7 +273,7 @@ public class ClusterControlManager {
}
Map<String, VersionRange> features = new HashMap<>();
for (BrokerFeature feature : record.features()) {
features.put(feature.name(), new VersionRange(
features.put(feature.name(), VersionRange.of(
feature.minSupportedVersion(), feature.maxSupportedVersion()));
}

View File

@ -36,12 +36,14 @@ import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.authorizer.AclMutator;
import java.util.Collection;
@ -152,7 +154,7 @@ public interface Controller extends AclMutator, AutoCloseable {
*
* @return A future yielding the feature ranges.
*/
CompletableFuture<FeatureMapAndEpoch> finalizedFeatures();
CompletableFuture<FinalizedControllerFeatures> finalizedFeatures();
/**
* Perform some incremental configuration changes.
@ -247,6 +249,15 @@ public interface Controller extends AclMutator, AutoCloseable {
AllocateProducerIdsRequestData request
);
/**
* Update a set of feature flags
* @param request The update features request
* @return A future which yields the result of the action
*/
CompletableFuture<UpdateFeaturesResponseData> updateFeatures(
UpdateFeaturesRequestData request
);
/**
* Begin writing a controller snapshot. If there was already an ongoing snapshot, it
* simply returns information about that snapshot rather than starting a new one.

View File

@ -25,109 +25,140 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metadata.FeatureMap;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;
import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_RECORD;
public class FeatureControlManager {
private final Logger log;
/**
* An immutable map containing the features supported by this controller's software.
*/
private final Map<String, VersionRange> supportedFeatures;
private final QuorumFeatures quorumFeatures;
/**
* Maps feature names to finalized version ranges.
*/
private final TimelineHashMap<String, VersionRange> finalizedVersions;
private final TimelineHashMap<String, Short> finalizedVersions;
FeatureControlManager(Map<String, VersionRange> supportedFeatures,
FeatureControlManager(LogContext logContext,
QuorumFeatures quorumFeatures,
SnapshotRegistry snapshotRegistry) {
this.supportedFeatures = supportedFeatures;
this.log = logContext.logger(FeatureControlManager.class);
this.quorumFeatures = quorumFeatures;
this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
}
ControllerResult<Map<String, ApiError>> updateFeatures(
Map<String, VersionRange> updates, Set<String> downgradeables,
Map<Integer, Map<String, VersionRange>> brokerFeatures) {
Map<String, Short> updates,
Map<String, FeatureUpdate.UpgradeType> upgradeTypes,
Map<Integer, Map<String, VersionRange>> brokerFeatures,
boolean validateOnly) {
TreeMap<String, ApiError> results = new TreeMap<>();
List<ApiMessageAndVersion> records = new ArrayList<>();
for (Entry<String, VersionRange> entry : updates.entrySet()) {
for (Entry<String, Short> entry : updates.entrySet()) {
results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(),
downgradeables.contains(entry.getKey()), brokerFeatures, records));
upgradeTypes.getOrDefault(entry.getKey(), FeatureUpdate.UpgradeType.UPGRADE), brokerFeatures, records));
}
return ControllerResult.atomicOf(records, results);
if (validateOnly) {
return ControllerResult.of(Collections.emptyList(), results);
} else {
return ControllerResult.atomicOf(records, results);
}
}
boolean canSupportVersion(String featureName, short versionRange) {
return quorumFeatures.localSupportedFeature(featureName)
.filter(localRange -> localRange.contains(versionRange))
.isPresent();
}
boolean featureExists(String featureName) {
return quorumFeatures.localSupportedFeature(featureName).isPresent();
}
private ApiError updateFeature(String featureName,
VersionRange newRange,
boolean downgradeable,
Map<Integer, Map<String, VersionRange>> brokerFeatures,
short newVersion,
FeatureUpdate.UpgradeType upgradeType,
Map<Integer, Map<String, VersionRange>> brokersAndFeatures,
List<ApiMessageAndVersion> records) {
if (newRange.min() <= 0) {
if (!featureExists(featureName)) {
return new ApiError(Errors.INVALID_UPDATE_VERSION,
"The lower value for the new range cannot be less than 1.");
"The controller does not support the given feature.");
}
if (newRange.max() <= 0) {
if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) {
return new ApiError(Errors.INVALID_UPDATE_VERSION,
"The controller does not support the given upgrade type.");
}
final Short currentVersion = finalizedVersions.get(featureName);
if (newVersion <= 0) {
return new ApiError(Errors.INVALID_UPDATE_VERSION,
"The upper value for the new range cannot be less than 1.");
}
VersionRange localRange = supportedFeatures.get(featureName);
if (localRange == null || !localRange.contains(newRange)) {
if (!canSupportVersion(featureName, newVersion)) {
return new ApiError(Errors.INVALID_UPDATE_VERSION,
"The controller does not support the given feature range.");
}
for (Entry<Integer, Map<String, VersionRange>> brokerEntry :
brokerFeatures.entrySet()) {
for (Entry<Integer, Map<String, VersionRange>> brokerEntry : brokersAndFeatures.entrySet()) {
VersionRange brokerRange = brokerEntry.getValue().get(featureName);
if (brokerRange == null || !brokerRange.contains(newRange)) {
if (brokerRange == null || !brokerRange.contains(newVersion)) {
return new ApiError(Errors.INVALID_UPDATE_VERSION,
"Broker " + brokerEntry.getKey() + " does not support the given " +
"feature range.");
}
}
VersionRange currentRange = finalizedVersions.get(featureName);
if (currentRange != null && currentRange.max() > newRange.max()) {
if (!downgradeable) {
if (currentVersion != null && newVersion < currentVersion) {
if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) {
return new ApiError(Errors.INVALID_UPDATE_VERSION,
"Can't downgrade the maximum version of this feature without " +
"setting downgradable to true.");
"Can't downgrade the maximum version of this feature without setting the upgrade type to safe or unsafe downgrade.");
}
}
records.add(new ApiMessageAndVersion(
new FeatureLevelRecord().setName(featureName).
setMinFeatureLevel(newRange.min()).setMaxFeatureLevel(newRange.max()),
new FeatureLevelRecord()
.setName(featureName)
.setFeatureLevel(newVersion),
FEATURE_LEVEL_RECORD.highestSupportedVersion()));
return ApiError.NONE;
}
FeatureMapAndEpoch finalizedFeatures(long lastCommittedOffset) {
Map<String, VersionRange> features = new HashMap<>();
for (Entry<String, VersionRange> entry : finalizedVersions.entrySet(lastCommittedOffset)) {
FinalizedControllerFeatures finalizedFeatures(long lastCommittedOffset) {
Map<String, Short> features = new HashMap<>();
for (Entry<String, Short> entry : finalizedVersions.entrySet(lastCommittedOffset)) {
features.put(entry.getKey(), entry.getValue());
}
return new FeatureMapAndEpoch(new FeatureMap(features), lastCommittedOffset);
return new FinalizedControllerFeatures(features, lastCommittedOffset);
}
public void replay(FeatureLevelRecord record) {
finalizedVersions.put(record.name(),
new VersionRange(record.minFeatureLevel(), record.maxFeatureLevel()));
log.info("Setting feature {} to {}", record.name(), record.featureLevel());
finalizedVersions.put(record.name(), record.featureLevel());
}
class FeatureControlIterator implements Iterator<List<ApiMessageAndVersion>> {
private final Iterator<Entry<String, VersionRange>> iterator;
private final Iterator<Entry<String, Short>> iterator;
FeatureControlIterator(long epoch) {
this.iterator = finalizedVersions.entrySet(epoch).iterator();
@ -141,12 +172,10 @@ public class FeatureControlManager {
@Override
public List<ApiMessageAndVersion> next() {
if (!hasNext()) throw new NoSuchElementException();
Entry<String, VersionRange> entry = iterator.next();
VersionRange versions = entry.getValue();
return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(entry.getKey()).
setMinFeatureLevel(versions.min()).
setMaxFeatureLevel(versions.max()), FEATURE_LEVEL_RECORD.highestSupportedVersion()));
Entry<String, Short> entry = iterator.next();
return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
.setName(entry.getKey())
.setFeatureLevel(entry.getValue()), FEATURE_LEVEL_RECORD.highestSupportedVersion()));
}
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.ProducerIdsBlock;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineLong;
import org.apache.kafka.timeline.TimelineObject;
import java.util.ArrayList;
import java.util.Collections;
@ -33,17 +34,19 @@ import java.util.List;
public class ProducerIdControlManager {
private final ClusterControlManager clusterControlManager;
private final TimelineLong nextProducerId; // Initializes to 0
private final TimelineObject<ProducerIdsBlock> nextProducerBlock;
private final TimelineLong brokerEpoch;
ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry snapshotRegistry) {
this.clusterControlManager = clusterControlManager;
this.nextProducerId = new TimelineLong(snapshotRegistry);
this.nextProducerBlock = new TimelineObject<>(snapshotRegistry, ProducerIdsBlock.EMPTY);
this.brokerEpoch = new TimelineLong(snapshotRegistry);
}
ControllerResult<ProducerIdsBlock> generateNextProducerId(int brokerId, long brokerEpoch) {
clusterControlManager.checkBrokerEpoch(brokerId, brokerEpoch);
long firstProducerIdInBlock = nextProducerId.get();
long firstProducerIdInBlock = nextProducerBlock.get().firstProducerId();
if (firstProducerIdInBlock > Long.MAX_VALUE - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
throw new UnknownServerException("Exhausted all producerIds as the next block's end producerId " +
"has exceeded the int64 type limit");
@ -60,25 +63,26 @@ public class ProducerIdControlManager {
}
void replay(ProducerIdsRecord record) {
long currentNextProducerId = nextProducerId.get();
long currentNextProducerId = nextProducerBlock.get().firstProducerId();
if (record.nextProducerId() <= currentNextProducerId) {
throw new RuntimeException("Next Producer ID from replayed record (" + record.nextProducerId() + ")" +
" is not greater than current next Producer ID (" + currentNextProducerId + ")");
} else {
nextProducerId.set(record.nextProducerId());
nextProducerBlock.set(new ProducerIdsBlock(record.brokerId(), record.nextProducerId(), ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE));
brokerEpoch.set(record.brokerEpoch());
}
}
Iterator<List<ApiMessageAndVersion>> iterator(long epoch) {
List<ApiMessageAndVersion> records = new ArrayList<>(1);
long producerId = nextProducerId.get(epoch);
if (producerId > 0) {
ProducerIdsBlock producerIdBlock = nextProducerBlock.get(epoch);
if (producerIdBlock.firstProducerId() > 0) {
records.add(new ApiMessageAndVersion(
new ProducerIdsRecord()
.setNextProducerId(producerId)
.setBrokerId(0)
.setBrokerEpoch(0L),
.setNextProducerId(producerIdBlock.firstProducerId())
.setBrokerId(producerIdBlock.assignedBrokerId())
.setBrokerEpoch(brokerEpoch.get(epoch)),
(short) 0));
}
return Collections.singleton(records).iterator();

View File

@ -18,6 +18,7 @@
package org.apache.kafka.controller;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
@ -44,6 +45,8 @@ import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
@ -74,8 +77,7 @@ import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
import org.apache.kafka.queue.KafkaEventQueue;
@ -94,6 +96,7 @@ import org.slf4j.Logger;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.Map;
@ -143,7 +146,7 @@ public final class QuorumController implements Controller {
private LogContext logContext = null;
private KafkaConfigSchema configSchema = KafkaConfigSchema.EMPTY;
private RaftClient<ApiMessageAndVersion> raftClient = null;
private Map<String, VersionRange> supportedFeatures = Collections.emptyMap();
private QuorumFeatures quorumFeatures = null;
private short defaultReplicationFactor = 3;
private int defaultNumPartitions = 1;
private boolean isLeaderRecoverySupported = false;
@ -188,8 +191,8 @@ public final class QuorumController implements Controller {
return this;
}
public Builder setSupportedFeatures(Map<String, VersionRange> supportedFeatures) {
this.supportedFeatures = supportedFeatures;
public Builder setQuorumFeatures(QuorumFeatures quorumFeatures) {
this.quorumFeatures = quorumFeatures;
return this;
}
@ -263,6 +266,9 @@ public final class QuorumController implements Controller {
if (raftClient == null) {
throw new RuntimeException("You must set a raft client.");
}
if (quorumFeatures == null) {
throw new RuntimeException("You must specify the quorum features");
}
if (threadNamePrefix == null) {
threadNamePrefix = String.format("Node%d_", nodeId);
}
@ -273,11 +279,12 @@ public final class QuorumController implements Controller {
controllerMetrics = (ControllerMetrics) Class.forName(
"org.apache.kafka.controller.MockControllerMetrics").getConstructor().newInstance();
}
KafkaEventQueue queue = null;
try {
queue = new KafkaEventQueue(time, logContext, threadNamePrefix + "QuorumController");
return new QuorumController(logContext, nodeId, clusterId, queue, time,
configSchema, raftClient, supportedFeatures, defaultReplicationFactor,
configSchema, raftClient, quorumFeatures, defaultReplicationFactor,
defaultNumPartitions, isLeaderRecoverySupported, replicaPlacer, snapshotMaxNewRecordBytes,
leaderImbalanceCheckIntervalNs, sessionTimeoutNs, controllerMetrics,
createTopicPolicy, alterConfigPolicy, configurationValidator, authorizer,
@ -1312,7 +1319,7 @@ public final class QuorumController implements Controller {
Time time,
KafkaConfigSchema configSchema,
RaftClient<ApiMessageAndVersion> raftClient,
Map<String, VersionRange> supportedFeatures,
QuorumFeatures quorumFeatures,
short defaultReplicationFactor,
int defaultNumPartitions,
boolean isLeaderRecoverySupported,
@ -1349,7 +1356,7 @@ public final class QuorumController implements Controller {
this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
this.clusterControl = new ClusterControlManager(logContext, clusterId, time,
snapshotRegistry, sessionTimeoutNs, replicaPlacer, controllerMetrics);
this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry);
this.featureControl = new FeatureControlManager(logContext, quorumFeatures, snapshotRegistry);
this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
@ -1446,7 +1453,7 @@ public final class QuorumController implements Controller {
}
@Override
public CompletableFuture<FeatureMapAndEpoch> finalizedFeatures() {
public CompletableFuture<FinalizedControllerFeatures> finalizedFeatures() {
return appendReadEvent("getFinalizedFeatures",
() -> featureControl.finalizedFeatures(lastCommittedOffset));
}
@ -1575,6 +1582,31 @@ public final class QuorumController implements Controller {
.setProducerIdLen(result.size()));
}
@Override
public CompletableFuture<UpdateFeaturesResponseData> updateFeatures(
UpdateFeaturesRequestData request) {
return appendWriteEvent("updateFeatures", () -> {
Map<String, Short> updates = new HashMap<>();
Map<String, FeatureUpdate.UpgradeType> upgradeTypes = new HashMap<>();
request.featureUpdates().forEach(featureUpdate -> {
String featureName = featureUpdate.feature();
upgradeTypes.put(featureName, FeatureUpdate.UpgradeType.fromCode(featureUpdate.upgradeType()));
updates.put(featureName, featureUpdate.maxVersionLevel());
});
return featureControl.updateFeatures(updates, upgradeTypes, clusterControl.brokerSupportedVersions(),
request.validateOnly());
}).thenApply(result -> {
UpdateFeaturesResponseData responseData = new UpdateFeaturesResponseData();
responseData.setResults(new UpdateFeaturesResponseData.UpdatableFeatureResultCollection(result.size()));
result.forEach((featureName, error) -> responseData.results().add(
new UpdateFeaturesResponseData.UpdatableFeatureResult()
.setFeature(featureName)
.setErrorCode(error.error().code())
.setErrorMessage(error.message())));
return responseData;
});
}
@Override
public CompletableFuture<List<CreatePartitionsTopicResult>>
createPartitions(long deadlineNs, List<CreatePartitionsTopic> topics) {

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.metadata.VersionRange;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
/**
* A holder class of the local node's supported feature flags.
*/
public class QuorumFeatures {
private final int nodeId;
private final Map<String, VersionRange> supportedFeatures;
QuorumFeatures(int nodeId,
Map<String, VersionRange> supportedFeatures) {
this.nodeId = nodeId;
this.supportedFeatures = Collections.unmodifiableMap(supportedFeatures);
}
public static QuorumFeatures create(int nodeId,
Map<String, VersionRange> supportedFeatures) {
return new QuorumFeatures(nodeId, supportedFeatures);
}
public static Map<String, VersionRange> defaultFeatureMap() {
return Collections.emptyMap();
}
Optional<VersionRange> localSupportedFeature(String featureName) {
return Optional.ofNullable(supportedFeatures.get(featureName));
}
}

View File

@ -19,7 +19,6 @@ package org.apache.kafka.image;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord;
import org.apache.kafka.metadata.VersionRange;
import java.util.HashMap;
import java.util.Map;
@ -33,13 +32,13 @@ import java.util.Optional;
public final class FeaturesDelta {
private final FeaturesImage image;
private final Map<String, Optional<VersionRange>> changes = new HashMap<>();
private final Map<String, Optional<Short>> changes = new HashMap<>();
public FeaturesDelta(FeaturesImage image) {
this.image = image;
}
public Map<String, Optional<VersionRange>> changes() {
public Map<String, Optional<Short>> changes() {
return changes;
}
@ -52,8 +51,7 @@ public final class FeaturesDelta {
}
public void replay(FeatureLevelRecord record) {
changes.put(record.name(), Optional.of(
new VersionRange(record.minFeatureLevel(), record.maxFeatureLevel())));
changes.put(record.name(), Optional.of(record.featureLevel()));
}
public void replay(RemoveFeatureLevelRecord record) {
@ -61,26 +59,27 @@ public final class FeaturesDelta {
}
public FeaturesImage apply() {
Map<String, VersionRange> newFinalizedVersions =
Map<String, Short> newFinalizedVersions =
new HashMap<>(image.finalizedVersions().size());
for (Entry<String, VersionRange> entry : image.finalizedVersions().entrySet()) {
for (Entry<String, Short> entry : image.finalizedVersions().entrySet()) {
String name = entry.getKey();
Optional<VersionRange> change = changes.get(name);
Optional<Short> change = changes.get(name);
if (change == null) {
newFinalizedVersions.put(name, entry.getValue());
} else if (change.isPresent()) {
newFinalizedVersions.put(name, change.get());
}
}
for (Entry<String, Optional<VersionRange>> entry : changes.entrySet()) {
for (Entry<String, Optional<Short>> entry : changes.entrySet()) {
String name = entry.getKey();
Optional<VersionRange> change = entry.getValue();
Optional<Short> change = entry.getValue();
if (!newFinalizedVersions.containsKey(name)) {
if (change.isPresent()) {
newFinalizedVersions.put(name, change.get());
}
}
}
return new FeaturesImage(newFinalizedVersions);
}

View File

@ -18,7 +18,6 @@
package org.apache.kafka.image;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.util.ArrayList;
@ -28,7 +27,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_RECORD;
@ -41,9 +39,9 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_
public final class FeaturesImage {
public static final FeaturesImage EMPTY = new FeaturesImage(Collections.emptyMap());
private final Map<String, VersionRange> finalizedVersions;
private final Map<String, Short> finalizedVersions;
public FeaturesImage(Map<String, VersionRange> finalizedVersions) {
public FeaturesImage(Map<String, Short> finalizedVersions) {
this.finalizedVersions = Collections.unmodifiableMap(finalizedVersions);
}
@ -51,22 +49,20 @@ public final class FeaturesImage {
return finalizedVersions.isEmpty();
}
Map<String, VersionRange> finalizedVersions() {
Map<String, Short> finalizedVersions() {
return finalizedVersions;
}
private Optional<VersionRange> finalizedVersion(String feature) {
private Optional<Short> finalizedVersion(String feature) {
return Optional.ofNullable(finalizedVersions.get(feature));
}
public void write(Consumer<List<ApiMessageAndVersion>> out) {
List<ApiMessageAndVersion> batch = new ArrayList<>();
for (Entry<String, VersionRange> entry : finalizedVersions.entrySet()) {
for (Entry<String, Short> entry : finalizedVersions.entrySet()) {
batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(entry.getKey()).
setMinFeatureLevel(entry.getValue().min()).
setMaxFeatureLevel(entry.getValue().max()),
FEATURE_LEVEL_RECORD.highestSupportedVersion()));
setFeatureLevel(entry.getValue()), FEATURE_LEVEL_RECORD.highestSupportedVersion()));
}
out.accept(batch);
}
@ -83,9 +79,11 @@ public final class FeaturesImage {
return finalizedVersions.equals(other.finalizedVersions);
}
@Override
public String toString() {
return finalizedVersions.entrySet().stream().
map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", "));
return "FeaturesImage{" +
"finalizedVersions=" + finalizedVersions +
'}';
}
}

View File

@ -104,7 +104,7 @@ public class BrokerRegistration {
}
Map<String, VersionRange> supportedFeatures = new HashMap<>();
for (BrokerFeature feature : record.features()) {
supportedFeatures.put(feature.name(), new VersionRange(
supportedFeatures.put(feature.name(), VersionRange.of(
feature.minSupportedVersion(), feature.maxSupportedVersion()));
}
return new BrokerRegistration(record.brokerId(),

View File

@ -1,67 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* A map of feature names to their supported versions.
*/
public class FeatureMap {
private final Map<String, VersionRange> features;
public FeatureMap(Map<String, VersionRange> features) {
this.features = Collections.unmodifiableMap(new HashMap<>(features));
}
public Optional<VersionRange> get(String name) {
return Optional.ofNullable(features.get(name));
}
public Map<String, VersionRange> features() {
return features;
}
@Override
public int hashCode() {
return features.hashCode();
}
@Override
public boolean equals(Object o) {
if (!(o instanceof FeatureMap)) return false;
FeatureMap other = (FeatureMap) o;
return features.equals(other.features);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("{");
bld.append(features.keySet().stream().sorted().
map(k -> k + ": " + features.get(k)).
collect(Collectors.joining(", ")));
bld.append("}");
return bld.toString();
}
}

View File

@ -17,23 +17,31 @@
package org.apache.kafka.metadata;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
/**
* A map of feature names to their supported versions.
*/
public class FeatureMapAndEpoch {
private final FeatureMap map;
public class FinalizedControllerFeatures {
private final Map<String, Short> featureMap;
private final long epoch;
public FeatureMapAndEpoch(FeatureMap map, long epoch) {
this.map = map;
public FinalizedControllerFeatures(Map<String, Short> featureMap, long epoch) {
this.featureMap = Collections.unmodifiableMap(featureMap);
this.epoch = epoch;
}
public FeatureMap map() {
return map;
public Optional<Short> get(String name) {
return Optional.ofNullable(featureMap.get(name));
}
public Set<String> featureNames() {
return featureMap.keySet();
}
public long epoch() {
@ -42,21 +50,21 @@ public class FeatureMapAndEpoch {
@Override
public int hashCode() {
return Objects.hash(map, epoch);
return Objects.hash(featureMap, epoch);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof FeatureMapAndEpoch)) return false;
FeatureMapAndEpoch other = (FeatureMapAndEpoch) o;
return map.equals(other.map) && epoch == other.epoch;
if (!(o instanceof FinalizedControllerFeatures)) return false;
FinalizedControllerFeatures other = (FinalizedControllerFeatures) o;
return featureMap.equals(other.featureMap) && epoch == other.epoch;
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("{");
bld.append("map=").append(map.toString());
bld.append("featureMap=").append(featureMap.toString());
bld.append(", epoch=").append(epoch);
bld.append("}");
return bld.toString();

View File

@ -23,16 +23,20 @@ import java.util.Objects;
* An immutable class which represents version ranges.
*/
public class VersionRange {
public final static VersionRange ALL = new VersionRange((short) 0, Short.MAX_VALUE);
public final static VersionRange ALL = of((short) 0, Short.MAX_VALUE);
private final short min;
private final short max;
public VersionRange(short min, short max) {
private VersionRange(short min, short max) {
this.min = min;
this.max = max;
}
public static VersionRange of(short min, short max) {
return new VersionRange(min, max);
}
public short min() {
return min;
}
@ -41,8 +45,18 @@ public class VersionRange {
return max;
}
public boolean contains(VersionRange other) {
return other.min >= min && other.max <= max;
/**
* Check if a given version is fully contained within this range
*/
public boolean contains(short version) {
return version >= min && version <= max;
}
/**
* Check if a given version range has overlap with this one
*/
public boolean intersects(VersionRange other) {
return other.min <= max && other.max >= min;
}
@Override

View File

@ -22,9 +22,7 @@
"fields": [
{ "name": "Name", "type": "string", "versions": "0+",
"about": "The feature name." },
{ "name": "MinFeatureLevel", "type": "int16", "versions": "0+",
"about": "The current finalized minimum feature level of this feature for the cluster." },
{ "name": "MaxFeatureLevel", "type": "int16", "versions": "0+",
"about": "The current finalized maximum feature level of this feature for the cluster." }
{ "name": "FeatureLevel", "type": "int16", "versions": "0+",
"about": "The current finalized feature level of this feature for the cluster." }
]
}

View File

@ -37,8 +37,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.FeatureMap;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
@ -103,7 +102,7 @@ public class ClusterControlManagerTest {
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
new FeatureMapAndEpoch(new FeatureMap(Collections.emptyMap()), 456L)));
new FinalizedControllerFeatures(Collections.emptyMap(), 456L)));
}
@Test

View File

@ -23,12 +23,13 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.FeatureMap;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@ -48,59 +49,87 @@ public class FeatureControlManagerTest {
String feature = (String) args[i];
Integer low = (Integer) args[i + 1];
Integer high = (Integer) args[i + 2];
result.put(feature, new VersionRange(low.shortValue(), high.shortValue()));
result.put(feature, VersionRange.of(low.shortValue(), high.shortValue()));
}
return result;
}
private static Map<String, Short> versionMap(Object... args) {
Map<String, Short> result = new HashMap<>();
for (int i = 0; i < args.length; i += 2) {
String feature = (String) args[i];
Integer ver = (Integer) args[i + 1];
result.put(feature, ver.shortValue());
}
return result;
}
public static QuorumFeatures features(Object... args) {
return QuorumFeatures.create(0, rangeMap(args));
}
private static Map<String, Short> updateMap(Object... args) {
Map<String, Short> result = new HashMap<>();
for (int i = 0; i < args.length; i += 2) {
String feature = (String) args[i];
Integer ver = (Integer) args[i + 1];
result.put(feature, ver.shortValue());
}
return result;
}
@Test
public void testUpdateFeatures() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
snapshotRegistry.getOrCreateSnapshot(-1);
FeatureControlManager manager = new FeatureControlManager(
rangeMap("foo", 1, 2), snapshotRegistry);
assertEquals(new FeatureMapAndEpoch(new FeatureMap(Collections.emptyMap()), -1),
FeatureControlManager manager = new FeatureControlManager(logContext,
features("foo", 1, 2), snapshotRegistry);
assertEquals(new FinalizedControllerFeatures(Collections.emptyMap(), -1),
manager.finalizedFeatures(-1));
assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
"The controller does not support the given feature range."))),
manager.updateFeatures(rangeMap("foo", 1, 3),
Collections.singleton("foo"),
Collections.emptyMap()));
manager.updateFeatures(updateMap("foo", 3),
Collections.singletonMap("foo", FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
Collections.emptyMap(), false));
ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
rangeMap("foo", 1, 2, "bar", 1, 1), Collections.emptySet(),
Collections.emptyMap());
updateMap("foo", 2, "bar", 1), Collections.emptyMap(),
Collections.emptyMap(), false);
Map<String, ApiError> expectedMap = new HashMap<>();
expectedMap.put("foo", ApiError.NONE);
expectedMap.put("bar", new ApiError(Errors.INVALID_UPDATE_VERSION,
"The controller does not support the given feature range."));
"The controller does not support the given feature."));
assertEquals(expectedMap, result.response());
List<ApiMessageAndVersion> expectedMessages = new ArrayList<>();
expectedMessages.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName("foo").setMinFeatureLevel((short) 1).setMaxFeatureLevel((short) 2),
setName("foo").setFeatureLevel((short) 2),
(short) 0));
assertEquals(expectedMessages, result.records());
}
@Test
public void testReplay() {
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
FeatureLevelRecord record = new FeatureLevelRecord().
setName("foo").setMinFeatureLevel((short) 1).setMaxFeatureLevel((short) 2);
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
setName("foo").setFeatureLevel((short) 2);
snapshotRegistry.getOrCreateSnapshot(-1);
FeatureControlManager manager = new FeatureControlManager(
rangeMap("foo", 1, 2), snapshotRegistry);
FeatureControlManager manager = new FeatureControlManager(logContext,
features("foo", 1, 2), snapshotRegistry);
manager.replay(record);
snapshotRegistry.getOrCreateSnapshot(123);
assertEquals(new FeatureMapAndEpoch(new FeatureMap(rangeMap("foo", 1, 2)), 123),
assertEquals(new FinalizedControllerFeatures(versionMap("foo", 2), 123),
manager.finalizedFeatures(123));
}
@Test
public void testUpdateFeaturesErrorCases() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager manager = new FeatureControlManager(
rangeMap("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
FeatureControlManager manager = new FeatureControlManager(logContext,
features("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
assertEquals(
ControllerResult.atomicOf(
@ -114,24 +143,24 @@ public class FeatureControlManagerTest {
)
),
manager.updateFeatures(
rangeMap("foo", 1, 3),
Collections.singleton("foo"),
Collections.singletonMap(5, rangeMap())
)
updateMap("foo", 3),
Collections.singletonMap("foo", FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
Collections.singletonMap(5, rangeMap()),
false)
);
ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
rangeMap("foo", 1, 3), Collections.emptySet(), Collections.emptyMap());
updateMap("foo", 3), Collections.emptyMap(), Collections.emptyMap(), false);
assertEquals(Collections.singletonMap("foo", ApiError.NONE), result.response());
manager.replay((FeatureLevelRecord) result.records().get(0).message());
snapshotRegistry.getOrCreateSnapshot(3);
assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
"Can't downgrade the maximum version of this feature without " +
"setting downgradable to true."))),
manager.updateFeatures(rangeMap("foo", 1, 2),
Collections.emptySet(), Collections.emptyMap()));
"Can't downgrade the maximum version of this feature without setting the upgrade type to " +
"safe or unsafe downgrade."))),
manager.updateFeatures(updateMap("foo", 2),
Collections.emptyMap(), Collections.emptyMap(), false));
assertEquals(
ControllerResult.atomicOf(
@ -139,39 +168,37 @@ public class FeatureControlManagerTest {
new ApiMessageAndVersion(
new FeatureLevelRecord()
.setName("foo")
.setMinFeatureLevel((short) 1)
.setMaxFeatureLevel((short) 2),
.setFeatureLevel((short) 2),
(short) 0
)
),
Collections.singletonMap("foo", ApiError.NONE)
),
manager.updateFeatures(
rangeMap("foo", 1, 2),
Collections.singleton("foo"),
Collections.emptyMap()
)
updateMap("foo", 2),
Collections.singletonMap("foo", FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
Collections.emptyMap(),
false)
);
}
@Test
public void testFeatureControlIterator() throws Exception {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager manager = new FeatureControlManager(
rangeMap("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
FeatureControlManager manager = new FeatureControlManager(logContext,
features("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
ControllerResult<Map<String, ApiError>> result = manager.
updateFeatures(rangeMap("foo", 1, 5, "bar", 1, 1),
Collections.emptySet(), Collections.emptyMap());
updateFeatures(updateMap("foo", 5, "bar", 1),
Collections.emptyMap(), Collections.emptyMap(), false);
RecordTestUtils.replayAll(manager, result.records());
RecordTestUtils.assertBatchIteratorContains(Arrays.asList(
Arrays.asList(new ApiMessageAndVersion(new FeatureLevelRecord().
setName("foo").
setMinFeatureLevel((short) 1).
setMaxFeatureLevel((short) 5), (short) 0)),
setFeatureLevel((short) 5), (short) 0)),
Arrays.asList(new ApiMessageAndVersion(new FeatureLevelRecord().
setName("bar").
setMinFeatureLevel((short) 1).
setMaxFeatureLevel((short) 1), (short) 0))),
setFeatureLevel((short) 1), (short) 0))),
manager.iterator(Long.MAX_VALUE));
}
}

View File

@ -178,7 +178,7 @@ public class QuorumControllerTest {
private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
QuorumController controller)
throws Throwable {
logEnv.logManagers().forEach(m -> m.setMaxReadOffset(0L));
logEnv.logManagers().forEach(m -> m.setMaxReadOffset(2L));
CompletableFuture<Map<ConfigResource, ApiError>> future1 =
controller.incrementalAlterConfigs(Collections.singletonMap(
BROKER0, Collections.singletonMap("baz", entry(SET, "123"))), false);
@ -187,7 +187,7 @@ public class QuorumControllerTest {
new ResultOrError<>(Collections.emptyMap())),
controller.describeConfigs(Collections.singletonMap(
BROKER0, Collections.emptyList())).get());
logEnv.logManagers().forEach(m -> m.setMaxReadOffset(2L));
logEnv.logManagers().forEach(m -> m.setMaxReadOffset(3L));
assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get());
}
@ -426,6 +426,7 @@ public class QuorumControllerTest {
setBrokerId(0).
setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
setFeatures(brokerFeatures()).
setListeners(listeners));
assertEquals(0L, reply.get().epoch());
CreateTopicsRequestData createTopicsRequestData =
@ -466,6 +467,11 @@ public class QuorumControllerTest {
}
}
private BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection();
return features;
}
@Test
public void testSnapshotSaveAndLoad() throws Throwable {
final int numBrokers = 4;

View File

@ -61,6 +61,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
for (int i = 0; i < numControllers; i++) {
QuorumController.Builder builder = new QuorumController.Builder(i, logEnv.clusterId());
builder.setRaftClient(logEnv.logManagers().get(i));
builder.setQuorumFeatures(new QuorumFeatures(i, QuorumFeatures.defaultFeatureMap()));
sessionTimeoutMillis.ifPresent(timeout -> {
builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
});

View File

@ -60,14 +60,14 @@ public class ClusterImageTest {
1000,
Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q"),
Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9092)),
Collections.singletonMap("foo", new VersionRange((short) 1, (short) 3)),
Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3)),
Optional.empty(),
true));
map1.put(1, new BrokerRegistration(1,
1001,
Uuid.fromString("U52uRe20RsGI0RvpcTx33Q"),
Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093)),
Collections.singletonMap("foo", new VersionRange((short) 1, (short) 3)),
Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3)),
Optional.empty(),
false));
map1.put(2, new BrokerRegistration(2,
@ -96,14 +96,14 @@ public class ClusterImageTest {
1000,
Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q"),
Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9092)),
Collections.singletonMap("foo", new VersionRange((short) 1, (short) 3)),
Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3)),
Optional.empty(),
false));
map2.put(1, new BrokerRegistration(1,
1001,
Uuid.fromString("U52uRe20RsGI0RvpcTx33Q"),
Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093)),
Collections.singletonMap("foo", new VersionRange((short) 1, (short) 3)),
Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3)),
Optional.empty(),
true));
IMAGE2 = new ClusterImage(map2);

View File

@ -20,7 +20,6 @@ package org.apache.kafka.image;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@ -43,15 +42,15 @@ public class FeaturesImageTest {
final static FeaturesImage IMAGE2;
static {
Map<String, VersionRange> map1 = new HashMap<>();
map1.put("foo", new VersionRange((short) 1, (short) 2));
map1.put("bar", new VersionRange((short) 1, (short) 1));
map1.put("baz", new VersionRange((short) 1, (short) 8));
Map<String, Short> map1 = new HashMap<>();
map1.put("foo", (short) 2);
map1.put("bar", (short) 1);
map1.put("baz", (short) 8);
IMAGE1 = new FeaturesImage(map1);
DELTA1_RECORDS = new ArrayList<>();
DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName("foo").setMinFeatureLevel((short) 1).setMaxFeatureLevel((short) 3),
setName("foo").setFeatureLevel((short) 3),
FEATURE_LEVEL_RECORD.highestSupportedVersion()));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveFeatureLevelRecord().
setName("bar"), REMOVE_FEATURE_LEVEL_RECORD.highestSupportedVersion()));
@ -61,8 +60,8 @@ public class FeaturesImageTest {
DELTA1 = new FeaturesDelta(IMAGE1);
RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
Map<String, VersionRange> map2 = new HashMap<>();
map2.put("foo", new VersionRange((short) 1, (short) 3));
Map<String, Short> map2 = new HashMap<>();
map2.put("foo", (short) 3);
IMAGE2 = new FeaturesImage(map2);
}

View File

@ -40,15 +40,15 @@ public class BrokerRegistrationTest {
private static final List<BrokerRegistration> REGISTRATIONS = Arrays.asList(
new BrokerRegistration(0, 0, Uuid.fromString("pc1GhUlBS92cGGaKXl6ipw"),
Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9090)),
Collections.singletonMap("foo", new VersionRange((short) 1, (short) 2)),
Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 2)),
Optional.empty(), false),
new BrokerRegistration(1, 0, Uuid.fromString("3MfdxWlNSn2UDYsmDP1pYg"),
Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9091)),
Collections.singletonMap("foo", new VersionRange((short) 1, (short) 2)),
Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 2)),
Optional.empty(), false),
new BrokerRegistration(2, 0, Uuid.fromString("eY7oaG1RREie5Kk9uy1l6g"),
Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9092)),
Collections.singletonMap("foo", new VersionRange((short) 2, (short) 3)),
Collections.singletonMap("foo", VersionRange.of((short) 2, (short) 3)),
Optional.of("myrack"), false));
@Test

View File

@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
@ -31,24 +32,24 @@ public class VersionRangeTest {
assertTrue(a >= Short.MIN_VALUE);
assertTrue(b <= Short.MAX_VALUE);
assertTrue(b >= Short.MIN_VALUE);
return new VersionRange((short) a, (short) b);
return VersionRange.of((short) a, (short) b);
}
@Test
public void testEquality() {
assertEquals(v(1, 1), v(1, 1));
assertFalse(v(1, 1).equals(v(1, 2)));
assertFalse(v(2, 1).equals(v(1, 2)));
assertFalse(v(2, 1).equals(v(2, 2)));
assertNotEquals(v(1, 2), v(1, 1));
assertNotEquals(v(1, 2), v(2, 1));
assertNotEquals(v(2, 2), v(2, 1));
}
@Test
public void testContains() {
assertTrue(v(1, 1).contains(v(1, 1)));
assertFalse(v(1, 1).contains(v(1, 2)));
assertTrue(v(1, 2).contains(v(1, 1)));
assertFalse(v(4, 10).contains(v(3, 8)));
assertTrue(v(2, 12).contains(v(3, 11)));
assertTrue(v(1, 1).contains((short) 1));
assertFalse(v(1, 1).contains((short) 2));
assertTrue(v(1, 2).contains((short) 1));
assertFalse(v(4, 10).contains((short) 3));
assertTrue(v(2, 12).contains((short) 11));
}
@Test

View File

@ -240,6 +240,7 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
throw new IllegalArgumentException();
}
// Read the metadata record body from the file input reader
T record = serde.read(input, valueSize);
int numHeaders = input.readVarint();