KAFKA-17564 Move BrokerFeatures to server module (#17228)

Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2024-10-07 15:16:48 +08:00 committed by GitHub
parent 7c34cf6e2c
commit 10a0905628
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 314 additions and 298 deletions

View File

@ -43,6 +43,7 @@ import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, Leade
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState} import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.metadata.migration.ZkMigrationState import org.apache.kafka.metadata.migration.ZkMigrationState
import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.{AdminOperationException, ProducerIdsBlock} import org.apache.kafka.server.common.{AdminOperationException, ProducerIdsBlock}
import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.KafkaScheduler import org.apache.kafka.server.util.KafkaScheduler
@ -431,7 +432,7 @@ class KafkaController(val config: KafkaConfig,
val newVersion = createFeatureZNode( val newVersion = createFeatureZNode(
FeatureZNode(config.interBrokerProtocolVersion, FeatureZNode(config.interBrokerProtocolVersion,
FeatureZNodeStatus.Enabled, FeatureZNodeStatus.Enabled,
brokerFeatures.defaultFinalizedFeatures brokerFeatures.defaultFinalizedFeatures.asScala.map { case (k, v) => (k, v.shortValue()) }
)) ))
featureCache.waitUntilFeatureEpochOrThrow(newVersion, config.zkConnectionTimeoutMs) featureCache.waitUntilFeatureEpochOrThrow(newVersion, config.zkConnectionTimeoutMs)
} else { } else {
@ -1601,7 +1602,7 @@ class KafkaController(val config: KafkaConfig,
latestFinalizedFeatures => latestFinalizedFeatures =>
BrokerFeatures.hasIncompatibleFeatures(broker.features, BrokerFeatures.hasIncompatibleFeatures(broker.features,
latestFinalizedFeatures.finalizedFeatures().asScala. latestFinalizedFeatures.finalizedFeatures().asScala.
map(kv => (kv._1, kv._2.toShort)).toMap)) map(kv => (kv._1, kv._2.toShort: java.lang.Short)).toMap.asJava))
} }
} }
@ -1983,7 +1984,7 @@ class KafkaController(val config: KafkaConfig,
s" versionLevel:${update.versionLevel} is lower than the" + s" versionLevel:${update.versionLevel} is lower than the" +
s" supported minVersion:${supportedVersionRange.min}.")) s" supported minVersion:${supportedVersionRange.min}."))
} else { } else {
val newFinalizedFeature = Utils.mkMap(Utils.mkEntry(update.feature, newVersion)).asScala.toMap val newFinalizedFeature = Utils.mkMap(Utils.mkEntry(update.feature, newVersion: java.lang.Short))
val numIncompatibleBrokers = controllerContext.liveOrShuttingDownBrokers.count(broker => { val numIncompatibleBrokers = controllerContext.liveOrShuttingDownBrokers.count(broker => {
BrokerFeatures.hasIncompatibleFeatures(broker.features, newFinalizedFeature) BrokerFeatures.hasIncompatibleFeatures(broker.features, newFinalizedFeature)
}) })

View File

@ -21,7 +21,7 @@ import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.ApiVersionsResponse import org.apache.kafka.common.requests.ApiVersionsResponse
import org.apache.kafka.network.metrics.RequestChannelMetrics import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.server.ClientMetricsManager import org.apache.kafka.server.{BrokerFeatures, ClientMetricsManager}
import org.apache.kafka.server.common.FinalizedFeatures import org.apache.kafka.server.common.FinalizedFeatures
import scala.collection.mutable import scala.collection.mutable

View File

@ -1,155 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.utils.Logging
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.metadata.VersionRange
import org.apache.kafka.server.common.Features.PRODUCTION_FEATURES
import org.apache.kafka.server.common.MetadataVersion
import java.util
import scala.jdk.CollectionConverters._
/**
* A class that encapsulates the latest features supported by the Broker and also provides APIs to
* check for incompatibilities between the features supported by the Broker and finalized features.
* This class is immutable in production. It provides few APIs to mutate state only for the purpose
* of testing.
*/
class BrokerFeatures private (@volatile var supportedFeatures: Features[SupportedVersionRange]) {
// For testing only.
def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
val combined = new util.HashMap[String, SupportedVersionRange](supportedFeatures.features())
combined.putAll(newFeatures.features())
supportedFeatures = Features.supportedFeatures(combined)
}
/**
* Returns the default finalized features that a new Kafka cluster with IBP config >= IBP_2_7_IV0
* needs to be bootstrapped with.
*/
def defaultFinalizedFeatures: Map[String, Short] = {
supportedFeatures.features.asScala.map {
case(name, versionRange) =>
if (name.equals("kraft.version")) {
(name, 0.toShort)
} else {
(name, versionRange.max)
}
}.toMap
}
/**
* Returns the set of feature names found to be incompatible.
* A feature incompatibility is a version mismatch between the latest feature supported by the
* Broker, and a provided finalized feature. This can happen because a provided finalized
* feature:
* 1) Does not exist in the Broker (i.e. it is unknown to the Broker).
* [OR]
* 2) Exists but the FinalizedVersionRange does not match with the SupportedVersionRange
* of the supported feature.
*
* @param finalized The finalized features against which incompatibilities need to be checked for.
*
* @return The subset of input features which are incompatible. If the returned object
* is empty, it means there were no feature incompatibilities found.
*/
def incompatibleFeatures(finalized: Map[String, Short]): Map[String, Short] = {
BrokerFeatures.incompatibleFeatures(supportedFeatures, finalized, logIncompatibilities = true)
}
}
object BrokerFeatures extends Logging {
def createDefault(unstableFeatureVersionsEnabled: Boolean): BrokerFeatures = {
new BrokerFeatures(defaultSupportedFeatures(unstableFeatureVersionsEnabled))
}
def createDefaultFeatureMap(features: BrokerFeatures): Map[String, VersionRange] = {
features.supportedFeatures.features.asScala.map {
case (name, versionRange) =>
(name, VersionRange.of(versionRange.min, versionRange.max))
}.toMap
}
def defaultSupportedFeatures(unstableFeatureVersionsEnabled: Boolean): Features[SupportedVersionRange] = {
val features = new util.HashMap[String, SupportedVersionRange]()
features.put(MetadataVersion.FEATURE_NAME,
new SupportedVersionRange(
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
if (unstableFeatureVersionsEnabled) {
MetadataVersion.latestTesting.featureLevel
} else {
MetadataVersion.latestProduction.featureLevel
}))
PRODUCTION_FEATURES.forEach {
feature =>
val maxVersion = if (unstableFeatureVersionsEnabled)
feature.latestTesting
else
feature.latestProduction
if (maxVersion > 0) {
features.put(feature.featureName, new SupportedVersionRange(feature.minimumProduction(), maxVersion))
}
}
Features.supportedFeatures(features)
}
def createEmpty(): BrokerFeatures = {
new BrokerFeatures(Features.emptySupportedFeatures())
}
/**
* Returns true if any of the provided finalized features are incompatible with the provided
* supported features.
*
* @param supportedFeatures The supported features to be compared
* @param finalizedFeatures The finalized features to be compared
*
* @return - True if there are any feature incompatibilities found.
* - False otherwise.
*/
def hasIncompatibleFeatures(supportedFeatures: Features[SupportedVersionRange],
finalizedFeatures: Map[String, Short]): Boolean = {
incompatibleFeatures(supportedFeatures, finalizedFeatures, logIncompatibilities = false).nonEmpty
}
private def incompatibleFeatures(supportedFeatures: Features[SupportedVersionRange],
finalizedFeatures: Map[String, Short],
logIncompatibilities: Boolean): Map[String, Short] = {
val incompatibleFeaturesInfo = finalizedFeatures.map {
case (feature, versionLevels) =>
val supportedVersions = supportedFeatures.get(feature)
if (supportedVersions == null) {
(feature, versionLevels, "{feature=%s, reason='Unsupported feature'}".format(feature))
} else if (supportedVersions.isIncompatibleWith(versionLevels)) {
(feature, versionLevels, "{feature=%s, reason='%s is incompatible with %s'}".format(
feature, versionLevels, supportedVersions))
} else {
(feature, versionLevels, null)
}
}.filter{ case(_, _, errorReason) => errorReason != null}.toList
if (logIncompatibilities && incompatibleFeaturesInfo.nonEmpty) {
warn("Feature incompatibilities seen: " +
incompatibleFeaturesInfo.map { case(_, _, errorReason) => errorReason }.mkString(", "))
}
incompatibleFeaturesInfo.map { case(feature, versionLevels, _) => (feature, versionLevels) }.toMap
}
}

View File

@ -43,7 +43,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRec
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher} import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo} import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
import org.apache.kafka.security.CredentialProvider import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.{AssignmentsManager, ClientMetricsManager, NodeToControllerChannelManager} import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetricsManager, NodeToControllerChannelManager}
import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, TopicIdPartition} import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, TopicIdPartition}
import org.apache.kafka.server.config.ConfigType import org.apache.kafka.server.config.ConfigType
@ -378,7 +378,7 @@ class BrokerServer(
ConfigType.CLIENT_METRICS -> new ClientMetricsConfigHandler(clientMetricsManager), ConfigType.CLIENT_METRICS -> new ClientMetricsConfigHandler(clientMetricsManager),
ConfigType.GROUP -> new GroupConfigHandler(groupCoordinator)) ConfigType.GROUP -> new GroupConfigHandler(groupCoordinator))
val featuresRemapped = BrokerFeatures.createDefaultFeatureMap(brokerFeatures).asJava val featuresRemapped = BrokerFeatures.createDefaultFeatureMap(brokerFeatures)
val brokerLifecycleChannelManager = new NodeToControllerChannelManagerImpl( val brokerLifecycleChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider, controllerNodeProvider,

View File

@ -54,7 +54,7 @@ import org.apache.kafka.metadata.{BrokerState, MetadataRecordSerde, VersionRange
import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.raft.Endpoints import org.apache.kafka.raft.Endpoints
import org.apache.kafka.security.CredentialProvider import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.NodeToControllerChannelManager import org.apache.kafka.server.{BrokerFeatures, NodeToControllerChannelManager}
import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion._ import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion} import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
@ -472,7 +472,7 @@ class KafkaServer(
setSecurityProtocol(ep.securityProtocol.id)) setSecurityProtocol(ep.securityProtocol.id))
} }
val features = BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(config.unstableFeatureVersionsEnabled)) val features = BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(config.unstableFeatureVersionsEnabled)).asScala
// Even though ZK brokers don't use "metadata.version" feature, we need to overwrite it with our IBP as part of registration // Even though ZK brokers don't use "metadata.version" feature, we need to overwrite it with our IBP as part of registration
// so the KRaft controller can verify that all brokers are on the same IBP before starting the migration. // so the KRaft controller can verify that all brokers are on the same IBP before starting the migration.

View File

@ -22,6 +22,7 @@ import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid} import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion} import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion}
import java.util import java.util

View File

@ -24,7 +24,7 @@ import scala.collection.{Seq, Set, mutable}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import kafka.cluster.{Broker, EndPoint} import kafka.cluster.{Broker, EndPoint}
import kafka.controller.StateChangeLogger import kafka.controller.StateChangeLogger
import kafka.server.{BrokerFeatures, CachedControllerId, KRaftCachedControllerId, MetadataCache, ZkCachedControllerId} import kafka.server.{CachedControllerId, KRaftCachedControllerId, MetadataCache, ZkCachedControllerId}
import kafka.utils.CoreUtils._ import kafka.utils.CoreUtils._
import kafka.utils.Logging import kafka.utils.Logging
import org.apache.kafka.admin.BrokerMetadata import org.apache.kafka.admin.BrokerMetadata
@ -39,6 +39,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.requests.{AbstractControlRequest, ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest}
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.metadata.LeaderAndIsr import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion} import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import java.util.concurrent.{ThreadLocalRandom, TimeUnit} import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
@ -649,8 +650,8 @@ class ZkMetadataCache(
throw new FeatureCacheUpdateException(errorMsg) throw new FeatureCacheUpdateException(errorMsg)
} else { } else {
val incompatibleFeatures = brokerFeatures.incompatibleFeatures( val incompatibleFeatures = brokerFeatures.incompatibleFeatures(
latest.finalizedFeatures().asScala.map(kv => (kv._1, kv._2.toShort)).toMap) latest.finalizedFeatures().asScala.map(kv => (kv._1, kv._2.toShort: java.lang.Short)).toMap.asJava)
if (incompatibleFeatures.nonEmpty) { if (!incompatibleFeatures.isEmpty) {
val errorMsg = "FinalizedFeatureCache update failed since feature compatibility" + val errorMsg = "FinalizedFeatureCache update failed since feature compatibility" +
s" checks failed! Supported ${brokerFeatures.supportedFeatures} has incompatibilities" + s" checks failed! Supported ${brokerFeatures.supportedFeatures} has incompatibilities" +
s" with the latest $latest." s" with the latest $latest."

View File

@ -20,6 +20,7 @@ import kafka.server.metadata.ZkMetadataCache
import org.apache.kafka.clients.NodeApiVersions import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.{Disabled, Test} import org.junit.jupiter.api.{Disabled, Test}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._

View File

@ -1,117 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.server.common.{MetadataVersion, Features => ServerFeatures}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
class BrokerFeaturesTest {
@Test
def testEmpty(): Unit = {
assertTrue(BrokerFeatures.createEmpty().supportedFeatures.empty)
}
@Test
def testIncompatibilitiesDueToAbsentFeature(): Unit = {
val brokerFeatures = BrokerFeatures.createDefault(true)
val supportedFeatures = Features.supportedFeatures(Map[String, SupportedVersionRange](
"test_feature_1" -> new SupportedVersionRange(1, 4),
"test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
brokerFeatures.setSupportedFeatures(supportedFeatures)
val compatibleFeatures = Map[String, Short]("test_feature_1" -> 4)
val inCompatibleFeatures = Map[String, Short]("test_feature_3" -> 4)
val features = compatibleFeatures++inCompatibleFeatures
val finalizedFeatures = features
assertEquals(inCompatibleFeatures,
brokerFeatures.incompatibleFeatures(finalizedFeatures))
assertTrue(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, finalizedFeatures))
}
@Test
def testIncompatibilitiesDueToIncompatibleFeature(): Unit = {
val brokerFeatures = BrokerFeatures.createDefault(true)
val supportedFeatures = Features.supportedFeatures(Map[String, SupportedVersionRange](
"test_feature_1" -> new SupportedVersionRange(1, 4),
"test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
brokerFeatures.setSupportedFeatures(supportedFeatures)
val compatibleFeatures = Map[String, Short]("test_feature_1" -> 3)
val inCompatibleFeatures = Map[String, Short]("test_feature_2" -> 4)
val features = compatibleFeatures++inCompatibleFeatures
val finalizedFeatures = features
assertEquals(
inCompatibleFeatures,
brokerFeatures.incompatibleFeatures(finalizedFeatures))
assertTrue(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, finalizedFeatures))
}
@Test
def testCompatibleFeatures(): Unit = {
val brokerFeatures = BrokerFeatures.createDefault(true)
val supportedFeatures = Features.supportedFeatures(Map[String, SupportedVersionRange](
"test_feature_1" -> new SupportedVersionRange(1, 4),
"test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
brokerFeatures.setSupportedFeatures(supportedFeatures)
val compatibleFeatures = Map[String, Short](
"test_feature_1" -> 3,
"test_feature_2" -> 3)
val finalizedFeatures = compatibleFeatures
assertTrue(brokerFeatures.incompatibleFeatures(finalizedFeatures).isEmpty)
assertFalse(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, finalizedFeatures))
}
@Test
def testDefaultFinalizedFeatures(): Unit = {
val brokerFeatures = BrokerFeatures.createDefault(true)
val supportedFeatures = Features.supportedFeatures(Map[String, SupportedVersionRange](
"test_feature_1" -> new SupportedVersionRange(1, 4),
"test_feature_2" -> new SupportedVersionRange(1, 3),
"test_feature_3" -> new SupportedVersionRange(3, 7)).asJava)
brokerFeatures.setSupportedFeatures(supportedFeatures)
val expectedFeatures = Map[String, Short](
MetadataVersion.FEATURE_NAME -> MetadataVersion.latestTesting().featureLevel(),
ServerFeatures.TRANSACTION_VERSION.featureName() -> ServerFeatures.TRANSACTION_VERSION.latestTesting(),
ServerFeatures.GROUP_VERSION.featureName() -> ServerFeatures.GROUP_VERSION.latestTesting(),
"kraft.version" -> 0,
"test_feature_1" -> 4,
"test_feature_2" -> 3,
"test_feature_3" -> 7)
assertEquals(expectedFeatures, brokerFeatures.defaultFinalizedFeatures)
}
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def ensureDefaultSupportedFeaturesRangeMaxNotZero(unstableVersionsEnabled: Boolean): Unit = {
val brokerFeatures = BrokerFeatures.createDefault(unstableVersionsEnabled)
brokerFeatures.supportedFeatures.features().values().forEach { supportedVersionRange =>
assertNotEquals(0, supportedVersionRange.max())
}
}
}

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse} import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse}
import org.apache.kafka.metadata.{BrokerState, VersionRange} import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.{Features, KRaftVersion, MetadataVersion} import org.apache.kafka.server.common.{Features, KRaftVersion, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion.{IBP_3_8_IV0, IBP_3_9_IV0} import org.apache.kafka.server.common.MetadataVersion.{IBP_3_8_IV0, IBP_3_9_IV0}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs, ZkConfigs} import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs, ZkConfigs}
@ -120,7 +121,7 @@ class BrokerLifecycleManagerTest {
manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
val controllerNode = new Node(3000, "localhost", 8021) val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode) context.controllerNodeProvider.node.set(controllerNode)
val features = BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(true)) val features = BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(true)).asScala
// Even though ZK brokers don't use "metadata.version" feature, we need to overwrite it with our IBP as part of registration // Even though ZK brokers don't use "metadata.version" feature, we need to overwrite it with our IBP as part of registration
// so the KRaft controller can verify that all brokers are on the same IBP before starting the migration. // so the KRaft controller can verify that all brokers are on the same IBP before starting the migration.

View File

@ -19,6 +19,7 @@ package kafka.server
import kafka.server.metadata.{FeatureCacheUpdateException, ZkMetadataCache} import kafka.server.metadata.{FeatureCacheUpdateException, ZkMetadataCache}
import org.apache.kafka.common.feature.{Features, SupportedVersionRange} import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
@ -40,8 +41,7 @@ class FinalizedFeatureCacheTest {
def testUpdateOrThrowFailedDueToInvalidEpoch(): Unit = { def testUpdateOrThrowFailedDueToInvalidEpoch(): Unit = {
val supportedFeatures = Map[String, SupportedVersionRange]( val supportedFeatures = Map[String, SupportedVersionRange](
"feature_1" -> new SupportedVersionRange(1, 4)) "feature_1" -> new SupportedVersionRange(1, 4))
val brokerFeatures = BrokerFeatures.createDefault(true) val brokerFeatures = BrokerFeatures.createDefault(true, Features.supportedFeatures(supportedFeatures.asJava))
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
val finalizedFeatures = Map[String, Short]("feature_1" -> 4) val finalizedFeatures = Map[String, Short]("feature_1" -> 4)
@ -63,8 +63,7 @@ class FinalizedFeatureCacheTest {
def testUpdateOrThrowFailedDueToInvalidFeatures(): Unit = { def testUpdateOrThrowFailedDueToInvalidFeatures(): Unit = {
val supportedFeatures = val supportedFeatures =
Map[String, SupportedVersionRange]("feature_1" -> new SupportedVersionRange(1, 1)) Map[String, SupportedVersionRange]("feature_1" -> new SupportedVersionRange(1, 1))
val brokerFeatures = BrokerFeatures.createDefault(true) val brokerFeatures = BrokerFeatures.createDefault(true, Features.supportedFeatures(supportedFeatures.asJava))
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
val finalizedFeatures = Map[String, Short]("feature_1" -> 2) val finalizedFeatures = Map[String, Short]("feature_1" -> 2)
@ -79,8 +78,7 @@ class FinalizedFeatureCacheTest {
def testUpdateOrThrowSuccess(): Unit = { def testUpdateOrThrowSuccess(): Unit = {
val supportedFeatures = val supportedFeatures =
Map[String, SupportedVersionRange]("feature_1" -> new SupportedVersionRange(1, 4)) Map[String, SupportedVersionRange]("feature_1" -> new SupportedVersionRange(1, 4))
val brokerFeatures = BrokerFeatures.createDefault(true) val brokerFeatures = BrokerFeatures.createDefault(true, Features.supportedFeatures(supportedFeatures.asJava))
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
val finalizedFeatures = Map[String, Short]("feature_1" -> 3) val finalizedFeatures = Map[String, Short]("feature_1" -> 3)
@ -95,8 +93,7 @@ class FinalizedFeatureCacheTest {
def testClear(): Unit = { def testClear(): Unit = {
val supportedFeatures = val supportedFeatures =
Map[String, SupportedVersionRange]("feature_1" -> new SupportedVersionRange(1, 4)) Map[String, SupportedVersionRange]("feature_1" -> new SupportedVersionRange(1, 4))
val brokerFeatures = BrokerFeatures.createDefault(true) val brokerFeatures = BrokerFeatures.createDefault(true, Features.supportedFeatures(supportedFeatures.asJava))
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
val finalizedFeatures = Map[String, Short]("feature_1" -> 3) val finalizedFeatures = Map[String, Short]("feature_1" -> 3)

View File

@ -82,7 +82,7 @@ import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics} import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.ClientMetricsManager import org.apache.kafka.server.{BrokerFeatures, ClientMetricsManager}
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1} import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, TransactionVersion} import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, TransactionVersion}

View File

@ -31,7 +31,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest} import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.common import org.apache.kafka.server.{BrokerFeatures, common}
import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion, OffsetAndEpoch} import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData} import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.apache.kafka.storage.log.metrics.BrokerTopicStats

View File

@ -35,6 +35,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBat
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, UpdateMetadataRequest} import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, UpdateMetadataRequest}
import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.config.ReplicationConfigs import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch} import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0

View File

@ -24,7 +24,6 @@ import kafka.cluster.Partition;
import kafka.log.LogManager; import kafka.log.LogManager;
import kafka.server.AlterPartitionManager; import kafka.server.AlterPartitionManager;
import kafka.server.BrokerBlockingSender; import kafka.server.BrokerBlockingSender;
import kafka.server.BrokerFeatures;
import kafka.server.FailedPartitions; import kafka.server.FailedPartitions;
import kafka.server.InitialFetchState; import kafka.server.InitialFetchState;
import kafka.server.KafkaConfig; import kafka.server.KafkaConfig;
@ -63,6 +62,7 @@ import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.BrokerFeatures;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.server.util.KafkaScheduler;

View File

@ -21,7 +21,6 @@ import kafka.controller.KafkaController;
import kafka.coordinator.transaction.TransactionCoordinator; import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel; import kafka.network.RequestChannel;
import kafka.server.AutoTopicCreationManager; import kafka.server.AutoTopicCreationManager;
import kafka.server.BrokerFeatures;
import kafka.server.ClientQuotaManager; import kafka.server.ClientQuotaManager;
import kafka.server.ClientRequestQuotaManager; import kafka.server.ClientRequestQuotaManager;
import kafka.server.ControllerMutationQuotaManager; import kafka.server.ControllerMutationQuotaManager;
@ -60,6 +59,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator; import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.network.RequestConvertToJson; import org.apache.kafka.network.RequestConvertToJson;
import org.apache.kafka.network.metrics.RequestChannelMetrics; import org.apache.kafka.network.metrics.RequestChannelMetrics;
import org.apache.kafka.server.BrokerFeatures;
import org.apache.kafka.server.common.FinalizedFeatures; import org.apache.kafka.server.common.FinalizedFeatures;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.server.config.ServerConfigs;

View File

@ -19,7 +19,6 @@ package org.apache.kafka.jmh.server;
import kafka.cluster.Partition; import kafka.cluster.Partition;
import kafka.log.LogManager; import kafka.log.LogManager;
import kafka.server.AlterPartitionManager; import kafka.server.AlterPartitionManager;
import kafka.server.BrokerFeatures;
import kafka.server.KafkaConfig; import kafka.server.KafkaConfig;
import kafka.server.MetadataCache; import kafka.server.MetadataCache;
import kafka.server.QuotaFactory; import kafka.server.QuotaFactory;
@ -32,6 +31,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.BrokerFeatures;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.server.util.KafkaScheduler;

View File

@ -19,7 +19,6 @@ package org.apache.kafka.jmh.server;
import kafka.cluster.Partition; import kafka.cluster.Partition;
import kafka.log.LogManager; import kafka.log.LogManager;
import kafka.server.AlterPartitionManager; import kafka.server.AlterPartitionManager;
import kafka.server.BrokerFeatures;
import kafka.server.KafkaConfig; import kafka.server.KafkaConfig;
import kafka.server.MetadataCache; import kafka.server.MetadataCache;
import kafka.server.QuotaFactory; import kafka.server.QuotaFactory;
@ -37,6 +36,7 @@ import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.BrokerFeatures;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.server.util.Scheduler;

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server;
import org.apache.kafka.common.feature.Features;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.kafka.server.common.Features.PRODUCTION_FEATURES;
/**
* A class that encapsulates the latest features supported by the Broker and also provides APIs to
* check for incompatibilities between the features supported by the Broker and finalized features.
* This class is immutable in production. It provides few APIs to mutate state only for the purpose
* of testing.
*/
public class BrokerFeatures {
private final Features<SupportedVersionRange> supportedFeatures;
private static final Logger log = LoggerFactory.getLogger(BrokerFeatures.class);
private BrokerFeatures(Features<SupportedVersionRange> supportedFeatures) {
this.supportedFeatures = supportedFeatures;
}
public static BrokerFeatures createDefault(boolean unstableFeatureVersionsEnabled) {
return new BrokerFeatures(defaultSupportedFeatures(unstableFeatureVersionsEnabled));
}
// only for testing
public static BrokerFeatures createDefault(boolean unstableFeatureVersionsEnabled, Features<SupportedVersionRange> newFeatures) {
Map<String, SupportedVersionRange> combined = new HashMap<>(defaultSupportedFeatures(unstableFeatureVersionsEnabled).features());
combined.putAll(newFeatures.features());
return new BrokerFeatures(Features.supportedFeatures(combined));
}
public static Map<String, VersionRange> createDefaultFeatureMap(BrokerFeatures features) {
return features.supportedFeatures.features()
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> VersionRange.of(e.getValue().min(), e.getValue().max())));
}
public static Features<SupportedVersionRange> defaultSupportedFeatures(boolean unstableFeatureVersionsEnabled) {
Map<String, SupportedVersionRange> features = new HashMap<>();
features.put(MetadataVersion.FEATURE_NAME,
new SupportedVersionRange(
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
unstableFeatureVersionsEnabled ? MetadataVersion.latestTesting().featureLevel()
: MetadataVersion.latestProduction().featureLevel()));
PRODUCTION_FEATURES.forEach(feature -> {
int maxVersion = unstableFeatureVersionsEnabled ? feature.latestTesting() : feature.latestProduction();
if (maxVersion > 0) {
features.put(feature.featureName(), new SupportedVersionRange(feature.minimumProduction(), (short) maxVersion));
}
});
return Features.supportedFeatures(features);
}
public static BrokerFeatures createEmpty() {
return new BrokerFeatures(Features.emptySupportedFeatures());
}
/**
* Returns true if any of the provided finalized features are incompatible with the provided
* supported features.
*
* @param supportedFeatures The supported features to be compared
* @param finalizedFeatures The finalized features to be compared
* @return - True if there are any feature incompatibilities found.
* - False otherwise.
*/
public static boolean hasIncompatibleFeatures(Features<SupportedVersionRange> supportedFeatures,
Map<String, Short> finalizedFeatures) {
return !incompatibleFeatures(supportedFeatures, finalizedFeatures, false).isEmpty();
}
/**
* Returns the default finalized features that a new Kafka cluster with IBP config >= IBP_2_7_IV0
* needs to be bootstrapped with.
*/
public Map<String, Short> defaultFinalizedFeatures() {
return supportedFeatures.features().entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey,
e -> e.getKey().equals(KRaftVersion.FEATURE_NAME) ? 0 : e.getValue().max()));
}
/**
* Returns the set of feature names found to be incompatible.
* A feature incompatibility is a version mismatch between the latest feature supported by the
* Broker, and a provided finalized feature. This can happen because a provided finalized
* feature:
* 1) Does not exist in the Broker (i.e. it is unknown to the Broker).
* [OR]
* 2) Exists but the FinalizedVersionRange does not match with the SupportedVersionRange
* of the supported feature.
*
* @param finalized The finalized features against which incompatibilities need to be checked for.
* @return The subset of input features which are incompatible. If the returned object
* is empty, it means there were no feature incompatibilities found.
*/
public Map<String, Short> incompatibleFeatures(Map<String, Short> finalized) {
return BrokerFeatures.incompatibleFeatures(supportedFeatures, finalized, true);
}
public Features<SupportedVersionRange> supportedFeatures() {
return supportedFeatures;
}
private static Map<String, Short> incompatibleFeatures(Features<SupportedVersionRange> supportedFeatures,
Map<String, Short> finalizedFeatures,
boolean logIncompatibilities) {
Map<String, Short> incompatibleFeaturesInfo = new HashMap<>();
finalizedFeatures.forEach((feature, versionLevels) -> {
SupportedVersionRange supportedVersions = supportedFeatures.get(feature);
if (supportedVersions == null) {
incompatibleFeaturesInfo.put(feature, versionLevels);
if (logIncompatibilities) {
log.warn("Feature incompatibilities seen: {feature={}, reason='Unknown feature'}", feature);
}
} else if (supportedVersions.isIncompatibleWith(versionLevels)) {
incompatibleFeaturesInfo.put(feature, versionLevels);
if (logIncompatibilities) {
log.warn("Feature incompatibilities seen: {feature={}, reason='{} is incompatible with {}'}",
feature, versionLevels, supportedVersions);
}
}
});
return incompatibleFeaturesInfo;
}
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server;
import org.apache.kafka.common.feature.Features;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.server.common.Features.GROUP_VERSION;
import static org.apache.kafka.server.common.Features.TRANSACTION_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class BrokerFeaturesTest {
@Test
public void testEmpty() {
assertTrue(BrokerFeatures.createEmpty().supportedFeatures().empty());
}
@Test
public void testIncompatibilitiesDueToAbsentFeature() {
Map<String, SupportedVersionRange> newFeatures = new HashMap<>();
newFeatures.put("test_feature_1", new SupportedVersionRange((short) 1, (short) 4));
newFeatures.put("test_feature_2", new SupportedVersionRange((short) 1, (short) 3));
Features<SupportedVersionRange> supportedFeatures = Features.supportedFeatures(newFeatures);
BrokerFeatures brokerFeatures = BrokerFeatures.createDefault(true, supportedFeatures);
Map<String, Short> compatibleFeatures = new HashMap<>();
compatibleFeatures.put("test_feature_1", (short) 4);
Map<String, Short> inCompatibleFeatures = new HashMap<>();
inCompatibleFeatures.put("test_feature_2", (short) 4);
Map<String, Short> finalizedFeatures = new HashMap<>(compatibleFeatures);
finalizedFeatures.putAll(inCompatibleFeatures);
assertEquals(inCompatibleFeatures,
brokerFeatures.incompatibleFeatures(finalizedFeatures));
assertTrue(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, finalizedFeatures));
}
@Test
public void testIncompatibilitiesDueToIncompatibleFeature() {
Map<String, SupportedVersionRange> newFeatures = new HashMap<>();
newFeatures.put("test_feature_1", new SupportedVersionRange((short) 1, (short) 4));
newFeatures.put("test_feature_2", new SupportedVersionRange((short) 1, (short) 3));
Features<SupportedVersionRange> supportedFeatures = Features.supportedFeatures(newFeatures);
BrokerFeatures brokerFeatures = BrokerFeatures.createDefault(true, supportedFeatures);
Map<String, Short> compatibleFeatures = new HashMap<>();
compatibleFeatures.put("test_feature_1", (short) 3);
Map<String, Short> inCompatibleFeatures = new HashMap<>();
inCompatibleFeatures.put("test_feature_2", (short) 4);
Map<String, Short> finalizedFeatures = new HashMap<>(compatibleFeatures);
finalizedFeatures.putAll(inCompatibleFeatures);
assertEquals(inCompatibleFeatures, brokerFeatures.incompatibleFeatures(finalizedFeatures));
assertTrue(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, finalizedFeatures));
}
@Test
public void testCompatibleFeatures() {
Map<String, SupportedVersionRange> newFeatures = new HashMap<>();
newFeatures.put("test_feature_1", new SupportedVersionRange((short) 1, (short) 4));
newFeatures.put("test_feature_2", new SupportedVersionRange((short) 1, (short) 3));
Features<SupportedVersionRange> supportedFeatures = Features.supportedFeatures(newFeatures);
BrokerFeatures brokerFeatures = BrokerFeatures.createDefault(true, supportedFeatures);
Map<String, Short> compatibleFeatures = new HashMap<>();
compatibleFeatures.put("test_feature_1", (short) 3);
compatibleFeatures.put("test_feature_2", (short) 3);
Map<String, Short> finalizedFeatures = new HashMap<>(compatibleFeatures);
assertTrue(brokerFeatures.incompatibleFeatures(finalizedFeatures).isEmpty());
assertFalse(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, finalizedFeatures));
}
@Test
public void testDefaultFinalizedFeatures() {
Map<String, SupportedVersionRange> newFeatures = new HashMap<>();
newFeatures.put("test_feature_1", new SupportedVersionRange((short) 1, (short) 4));
newFeatures.put("test_feature_2", new SupportedVersionRange((short) 1, (short) 3));
newFeatures.put("test_feature_3", new SupportedVersionRange((short) 3, (short) 7));
Features<SupportedVersionRange> supportedFeatures = Features.supportedFeatures(newFeatures);
BrokerFeatures brokerFeatures = BrokerFeatures.createDefault(true, supportedFeatures);
Map<String, Short> expectedFeatures = new HashMap<>();
expectedFeatures.put(MetadataVersion.FEATURE_NAME, MetadataVersion.latestTesting().featureLevel());
expectedFeatures.put(TRANSACTION_VERSION.featureName(), TRANSACTION_VERSION.latestTesting());
expectedFeatures.put(GROUP_VERSION.featureName(), GROUP_VERSION.latestTesting());
expectedFeatures.put("kraft.version", (short) 0);
expectedFeatures.put("test_feature_1", (short) 4);
expectedFeatures.put("test_feature_2", (short) 3);
expectedFeatures.put("test_feature_3", (short) 7);
assertEquals(expectedFeatures, brokerFeatures.defaultFinalizedFeatures());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void ensureDefaultSupportedFeaturesRangeMaxNotZero(boolean unstableVersionsEnabled) {
BrokerFeatures brokerFeatures = BrokerFeatures.createDefault(unstableVersionsEnabled);
brokerFeatures.supportedFeatures().features()
.values()
.forEach(supportedVersionRange -> assertNotEquals(0, supportedVersionRange.max()));
}
}