KAFKA-13830 MetadataVersion integration for KRaft controller (#12050)

This patch builds on #12072 and adds controller support for metadata.version. The kafka-storage tool now allows a
user to specify a specific metadata.version to bootstrap into the cluster, otherwise the latest version is used.

Upon the first leader election of the KRaft quroum, this initial metadata.version is written into the metadata log. When
writing snapshots, a FeatureLevelRecord for metadata.version will be written out ahead of other records so we can
decode things at the correct version level.

This also includes additional validation in the controller when setting feature levels. It will now check that a given
metadata.version is supportable by the quroum, not just the brokers.

Reviewers: José Armando García Sancio <jsancio@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>, Alyssa Huang <ahuang@confluent.io>
This commit is contained in:
David Arthur 2022-05-18 15:08:36 -04:00 committed by GitHub
parent cf34a2e4b0
commit 1135f22eaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
72 changed files with 1551 additions and 239 deletions

View File

@ -447,7 +447,8 @@ subprojects {
maxParallelForks = maxTestForks
ignoreFailures = userIgnoreFailures
maxHeapSize = defaultMaxHeapSize
// Increase heap size for integration tests
maxHeapSize = "2560m"
jvmArgs = defaultJvmArgs

View File

@ -82,6 +82,7 @@
<allow pkg="org.apache.kafka.controller"/>
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="org.apache.kafka.server.authorizer"/>
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="kafka.test.annotation"/>
<allow pkg="kafka.test.junit"/>
<allow pkg="kafka.network"/>

View File

@ -259,10 +259,12 @@
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.authorizer" />
<allow pkg="org.apache.kafka.server.common" />
@ -348,6 +350,7 @@
<allow pkg="net.sourceforge.argparse4j" />
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="org.apache.kafka.controller.util"/>
<allow pkg="org.apache.kafka.queue"/>
<allow pkg="org.apache.kafka.raft"/>
<allow pkg="org.apache.kafka.server.common" />

View File

@ -290,7 +290,7 @@
<suppress checks="ClassDataAbstractionCoupling"
files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/>
<suppress checks="ClassFanOutComplexity"
files="(QuorumController|ReplicationControlManager|ReplicationControlManagerTest).java"/>
files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/>
<suppress checks="(ParameterNumber|ClassDataAbstractionCoupling)"
files="(QuorumController).java"/>
<suppress checks="CyclomaticComplexity"
@ -303,6 +303,8 @@
files="(MetadataImage).java"/>
<suppress checks="ImportControl"
files="ApiVersionsResponse.java"/>
<suppress checks="AvoidStarImport"
files="MetadataVersionTest.java"/>
<!-- Storage -->
<suppress checks="(CyclomaticComplexity|ParameterNumber)"

View File

@ -915,7 +915,9 @@ public class NetworkClient implements KafkaClient {
}
return;
}
NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.data().apiKeys());
NodeApiVersions nodeVersionInfo = new NodeApiVersions(
apiVersionsResponse.data().apiKeys(),
apiVersionsResponse.data().supportedFeatures());
apiVersions.update(node, nodeVersionInfo);
this.connectionStates.ready(node);
log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, API versions: {}.",

View File

@ -17,8 +17,9 @@
package org.apache.kafka.clients;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
import org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureKey;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.utils.Utils;
@ -27,6 +28,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -44,6 +46,8 @@ public class NodeApiVersions {
// List of APIs which the broker supports, but which are unknown to the client
private final List<ApiVersion> unknownApis = new ArrayList<>();
private final Map<String, SupportedVersionRange> supportedFeatures;
/**
* Create a NodeApiVersions object with the current ApiVersions.
*
@ -72,7 +76,7 @@ public class NodeApiVersions {
}
if (!exists) apiVersions.add(ApiVersionsResponse.toApiVersion(apiKey));
}
return new NodeApiVersions(apiVersions);
return new NodeApiVersions(apiVersions, Collections.emptyList());
}
@ -91,7 +95,7 @@ public class NodeApiVersions {
.setMaxVersion(maxVersion)));
}
public NodeApiVersions(ApiVersionCollection nodeApiVersions) {
public NodeApiVersions(Collection<ApiVersion> nodeApiVersions, Collection<SupportedFeatureKey> nodeSupportedFeatures) {
for (ApiVersion nodeApiVersion : nodeApiVersions) {
if (ApiKeys.hasId(nodeApiVersion.apiKey())) {
ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey());
@ -101,18 +105,13 @@ public class NodeApiVersions {
unknownApis.add(nodeApiVersion);
}
}
}
public NodeApiVersions(Collection<ApiVersion> nodeApiVersions) {
for (ApiVersion nodeApiVersion : nodeApiVersions) {
if (ApiKeys.hasId(nodeApiVersion.apiKey())) {
ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey());
supportedVersions.put(nodeApiKey, nodeApiVersion);
} else {
// Newer brokers may support ApiKeys we don't know about
unknownApis.add(nodeApiVersion);
}
Map<String, SupportedVersionRange> supportedFeaturesBuilder = new HashMap<>();
for (SupportedFeatureKey supportedFeature : nodeSupportedFeatures) {
supportedFeaturesBuilder.put(supportedFeature.name(),
new SupportedVersionRange(supportedFeature.minVersion(), supportedFeature.maxVersion()));
}
this.supportedFeatures = Collections.unmodifiableMap(supportedFeaturesBuilder);
}
/**
@ -233,4 +232,8 @@ public class NodeApiVersions {
public Map<ApiKeys, ApiVersion> allSupportedApiVersions() {
return supportedVersions;
}
public Map<String, SupportedVersionRange> supportedFeatures() {
return supportedFeatures;
}
}

View File

@ -48,7 +48,7 @@ public class ApiVersionsTest {
assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
// something that doesn't support PRODUCE, which is the case with Raft-based controllers
apiVersions.update("2", new NodeApiVersions(Collections.singleton(
apiVersions.update("2", NodeApiVersions.create(Collections.singleton(
new ApiVersionsResponseData.ApiVersion()
.setApiKey(ApiKeys.FETCH.id)
.setMinVersion((short) 0)

View File

@ -27,6 +27,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@ -38,7 +39,7 @@ public class NodeApiVersionsTest {
@Test
public void testUnsupportedVersionsToString() {
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection());
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList());
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
@ -67,7 +68,7 @@ public class NodeApiVersionsTest {
.setMaxVersion((short) 10001));
} else versionList.add(ApiVersionsResponse.toApiVersion(apiKey));
}
NodeApiVersions versions = new NodeApiVersions(versionList);
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList());
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.values()) {
@ -124,7 +125,7 @@ public class NodeApiVersionsTest {
@Test
public void testUsableVersionCalculationNoKnownVersions() {
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection());
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList());
assertThrows(UnsupportedVersionException.class,
() -> versions.latestUsableVersion(ApiKeys.FETCH));
}
@ -146,7 +147,7 @@ public class NodeApiVersionsTest {
.setApiKey((short) 100)
.setMinVersion((short) 0)
.setMaxVersion((short) 1));
NodeApiVersions versions = new NodeApiVersions(versionList);
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList());
for (ApiKeys apiKey: ApiKeys.apisForListener(scope)) {
assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
}
@ -156,7 +157,7 @@ public class NodeApiVersionsTest {
@EnumSource(ApiMessageType.ListenerType.class)
public void testConstructionFromApiVersionsResponse(ApiMessageType.ListenerType scope) {
ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope);
NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys());
NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList());
for (ApiVersion apiVersionKey : apiVersionsResponse.data().apiKeys()) {
ApiVersion apiVersion = versions.apiVersion(ApiKeys.forId(apiVersionKey.apiKey()));

View File

@ -153,7 +153,7 @@ public class TransactionManagerTest {
private void initializeTransactionManager(Optional<String> transactionalId) {
Metrics metrics = new Metrics(time);
apiVersions.update("0", new NodeApiVersions(Arrays.asList(
apiVersions.update("0", NodeApiVersions.create(Arrays.asList(
new ApiVersion()
.setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
.setMinVersion((short) 0)
@ -2615,7 +2615,7 @@ public class TransactionManagerTest {
@Test
public void testTransitionToFatalErrorWhenRetriedBatchIsExpired() throws InterruptedException {
apiVersions.update("0", new NodeApiVersions(Arrays.asList(
apiVersions.update("0", NodeApiVersions.create(Arrays.asList(
new ApiVersion()
.setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
.setMinVersion((short) 0)
@ -2814,7 +2814,7 @@ public class TransactionManagerTest {
@Test
public void testAbortTransactionAndReuseSequenceNumberOnError() throws InterruptedException {
apiVersions.update("0", new NodeApiVersions(Arrays.asList(
apiVersions.update("0", NodeApiVersions.create(Arrays.asList(
new ApiVersion()
.setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
.setMinVersion((short) 0)
@ -2866,7 +2866,7 @@ public class TransactionManagerTest {
// Set the InitProducerId version such that bumping the epoch number is not supported. This will test the case
// where the sequence number is reset on an UnknownProducerId error, allowing subsequent transactions to
// append to the log successfully
apiVersions.update("0", new NodeApiVersions(Arrays.asList(
apiVersions.update("0", NodeApiVersions.create(Arrays.asList(
new ApiVersion()
.setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
.setMinVersion((short) 0)

View File

@ -40,7 +40,6 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.common.utils.{KafkaThread, Time}
import org.apache.kafka.common.Node
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, MetadataRequest, MetadataResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
@ -157,10 +156,10 @@ object BrokerApiVersionsCommand {
throw new RuntimeException(s"Request ${request.apiKey()} failed on brokers $bootstrapBrokers")
}
private def getApiVersions(node: Node): ApiVersionCollection = {
private def getNodeApiVersions(node: Node): NodeApiVersions = {
val response = send(node, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
Errors.forCode(response.data.errorCode).maybeThrow()
response.data.apiKeys
new NodeApiVersions(response.data.apiKeys, response.data.supportedFeatures)
}
/**
@ -186,7 +185,7 @@ object BrokerApiVersionsCommand {
def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
findAllBrokers().map { broker =>
broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker)))
broker -> Try[NodeApiVersions](getNodeApiVersions(broker))
}.toMap
def close(): Unit = {

View File

@ -111,6 +111,7 @@ class KafkaRaftManager[T](
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
) extends RaftManager[T] with Logging {
val apiVersions = new ApiVersions()
private val raftConfig = new RaftConfig(config)
private val threadNamePrefix = threadNamePrefixOpt.getOrElse("kafka-raft")
private val logContext = new LogContext(s"[RaftManager nodeId=${config.nodeId}] ")
@ -274,7 +275,7 @@ class KafkaRaftManager[T](
config.connectionSetupTimeoutMaxMs,
time,
discoverBrokerVersions,
new ApiVersions,
apiVersions,
logContext
)
}

View File

@ -19,6 +19,7 @@ package kafka.server
import kafka.utils.Logging
import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
import org.apache.kafka.server.common.MetadataVersion
import java.util
import scala.jdk.CollectionConverters._
@ -72,6 +73,12 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte
object BrokerFeatures extends Logging {
def createDefault(): BrokerFeatures = {
new BrokerFeatures(Features.supportedFeatures(
java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
new SupportedVersionRange(MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.latest().featureLevel()))))
}
def createEmpty(): BrokerFeatures = {
new BrokerFeatures(Features.emptySupportedFeatures())
}

View File

@ -30,13 +30,14 @@ import kafka.security.CredentialProvider
import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPolicyClassNameProp}
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.clients.ApiVersions
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumControllerMetrics, QuorumFeatures}
import org.apache.kafka.controller.{BootstrapMetadata, Controller, QuorumController, QuorumControllerMetrics, QuorumFeatures}
import org.apache.kafka.metadata.KafkaConfigSchema
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
@ -63,6 +64,8 @@ class ControllerServer(
val threadNamePrefix: Option[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
val configSchema: KafkaConfigSchema,
val raftApiVersions: ApiVersions,
val bootstrapMetadata: BootstrapMetadata
) extends Logging with KafkaMetricsGroup {
import kafka.server.Server._
@ -162,7 +165,8 @@ class ControllerServer(
alterConfigPolicy = Option(config.
getConfiguredInstance(AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))
val quorumFeatures = QuorumFeatures.create(config.nodeId, QuorumFeatures.defaultFeatureMap())
val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get())
val quorumFeatures = QuorumFeatures.create(config.nodeId, raftApiVersions, QuorumFeatures.defaultFeatureMap(), controllerNodes)
val controllerBuilder = {
val leaderImbalanceCheckIntervalNs = if (config.autoLeaderRebalanceEnable) {
@ -179,7 +183,7 @@ class ControllerServer(
setQuorumFeatures(quorumFeatures).
setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
setDefaultNumPartitions(config.numPartitions.intValue()).
setIsLeaderRecoverySupported(config.interBrokerProtocolVersion.isAtLeast(IBP_3_2_IV0)).
setIsLeaderRecoverySupported(bootstrapMetadata.metadataVersion().isAtLeast(IBP_3_2_IV0)).
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
TimeUnit.MILLISECONDS)).
setSnapshotMaxNewRecordBytes(config.metadataSnapshotMaxNewRecordBytes).
@ -188,7 +192,8 @@ class ControllerServer(
setCreateTopicPolicy(createTopicPolicy.asJava).
setAlterConfigPolicy(alterConfigPolicy.asJava).
setConfigurationValidator(new ControllerConfigurationValidator()).
setStaticConfig(config.originals)
setStaticConfig(config.originals).
setBootstrapMetadata(bootstrapMetadata)
}
authorizer match {
case Some(a: ClusterMetadataAuthorizer) => controllerBuilder.setAuthorizer(a)
@ -197,7 +202,6 @@ class ControllerServer(
controller = controllerBuilder.build()
quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
authorizer,
quotaManagers,
@ -206,7 +210,7 @@ class ControllerServer(
raftManager,
config,
metaProperties,
controllerNodes.toSeq,
controllerNodes.asScala.toSeq,
apiVersionManager)
controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
socketServer.dataPlaneRequestChannel,

View File

@ -22,10 +22,10 @@ import java.util.Collections
import kafka.utils.Logging
import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
import org.apache.kafka.image.FeaturesDelta
import org.apache.kafka.server.common.MetadataVersion
import scala.concurrent.TimeoutException
import scala.math.max
import scala.compat.java8.OptionConverters._
// Raised whenever there was an error in updating the FinalizedFeatureCache with features.
@ -144,6 +144,9 @@ class FinalizedFeatureCache(private val brokerFeatures: BrokerFeatures) extends
new FinalizedVersionRange(version, version))
}
}
featuresDelta.metadataVersionChange().ifPresent { metadataVersion =>
newFeatures.put(MetadataVersion.FEATURE_NAME, new FinalizedVersionRange(metadataVersion.featureLevel(), metadataVersion.featureLevel()))
}
featuresAndEpoch = Some(FinalizedFeaturesAndEpoch(Features.finalizedFeatures(
Collections.unmodifiableMap(newFeatures)), highestMetadataOffset))
}

View File

@ -18,7 +18,6 @@ package kafka.server
import java.io.File
import java.util.concurrent.CompletableFuture
import kafka.common.InconsistentNodeIdException
import kafka.log.{LogConfig, UnifiedLog}
import kafka.metrics.KafkaMetricsReporter
@ -28,11 +27,13 @@ import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
import org.apache.kafka.common.utils.{AppInfoParser, Time}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.controller.BootstrapMetadata
import org.apache.kafka.metadata.{KafkaConfigSchema, MetadataRecordSerde}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import java.nio.file.Paths
import scala.collection.Seq
import scala.jdk.CollectionConverters._
@ -54,7 +55,7 @@ class KafkaRaftServer(
KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals))
KafkaYammerMetrics.INSTANCE.configure(config.originals)
private val (metaProps, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
private val (metaProps, bootstrapMetadata, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
private val metrics = Server.initializeMetrics(
config,
@ -102,6 +103,8 @@ class KafkaRaftServer(
threadNamePrefix,
controllerQuorumVotersFuture,
KafkaRaftServer.configSchema,
raftManager.apiVersions,
bootstrapMetadata
))
} else {
None
@ -149,7 +152,7 @@ object KafkaRaftServer {
* @return A tuple containing the loaded meta properties (which are guaranteed to
* be consistent across all log dirs) and the offline directories
*/
def initializeLogDirs(config: KafkaConfig): (MetaProperties, Seq[String]) = {
def initializeLogDirs(config: KafkaConfig): (MetaProperties, BootstrapMetadata, Seq[String]) = {
val logDirs = (config.logDirs.toSet + config.metadataLogDir).toSeq
val (rawMetaProperties, offlineDirs) = BrokerMetadataCheckpoint.
getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false)
@ -177,7 +180,15 @@ object KafkaRaftServer {
"If you intend to create a new broker, you should remove all data in your data directories (log.dirs).")
}
(metaProperties, offlineDirs.toSeq)
// Load the bootstrap metadata file or, in the case of an upgrade from KRaft preview, bootstrap the
// metadata.version corresponding to a user-configured IBP.
val bootstrapMetadata = if (config.originals.containsKey(KafkaConfig.InterBrokerProtocolVersionProp)) {
BootstrapMetadata.load(Paths.get(config.metadataLogDir), config.interBrokerProtocolVersion)
} else {
BootstrapMetadata.load(Paths.get(config.metadataLogDir), MetadataVersion.IBP_3_0_IV0)
}
(metaProperties, bootstrapMetadata, offlineDirs.toSeq)
}
val configSchema = new KafkaConfigSchema(Map(

View File

@ -162,7 +162,7 @@ class KafkaServer(
private var _featureChangeListener: FinalizedFeatureChangeListener = null
val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
val brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty()
val featureCache: FinalizedFeatureCache = new FinalizedFeatureCache(brokerFeatures)
override def brokerState: BrokerState = _brokerState

View File

@ -118,9 +118,14 @@ class BrokerMetadataListener(
}
_publisher.foreach(publish)
// If we detected a change in metadata.version, generate a local snapshot
val metadataVersionChanged = Option(_delta.featuresDelta()).exists { featuresDelta =>
featuresDelta.metadataVersionChange().isPresent
}
snapshotter.foreach { snapshotter =>
_bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
if (shouldSnapshot()) {
if (shouldSnapshot() || metadataVersionChanged) {
if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
_bytesSinceLastSnapshot = 0L
}

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsImage}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion
import scala.collection.mutable
@ -133,19 +134,27 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
// Publish the new metadata image to the metadata cache.
metadataCache.setImage(newImage)
val metadataVersionLogMsg = newImage.features().metadataVersion() match {
case MetadataVersion.UNINITIALIZED => "un-initialized metadata.version"
case mv: MetadataVersion => s"metadata.version ${mv.featureLevel()}"
}
if (_firstPublish) {
info(s"Publishing initial metadata at offset $highestOffsetAndEpoch.")
info(s"Publishing initial metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.")
// If this is the first metadata update we are applying, initialize the managers
// first (but after setting up the metadata cache).
initializeManagers()
} else if (isDebugEnabled) {
debug(s"Publishing metadata at offset $highestOffsetAndEpoch.")
debug(s"Publishing metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.")
}
// Apply feature deltas.
Option(delta.featuresDelta()).foreach { featuresDelta =>
featureCache.update(featuresDelta, highestOffsetAndEpoch.offset)
featuresDelta.metadataVersionChange().ifPresent{ metadataVersion =>
info(s"Updating metadata.version to ${metadataVersion.featureLevel()} at offset $highestOffsetAndEpoch.")
}
}
// Apply topic deltas.

View File

@ -19,19 +19,61 @@ package kafka.tools
import java.io.PrintStream
import java.nio.file.{Files, Paths}
import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties, RawMetaProperties}
import kafka.utils.{Exit, Logging}
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue}
import net.sourceforge.argparse4j.inf.Namespace
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.controller.BootstrapMetadata
import org.apache.kafka.server.common.MetadataVersion
import scala.collection.mutable
object StorageTool extends Logging {
def main(args: Array[String]): Unit = {
try {
val namespace = parseArguments(args)
val command = namespace.getString("command")
val config = Option(namespace.getString("config")).flatMap(
p => Some(new KafkaConfig(Utils.loadProps(p))))
command match {
case "info" =>
val directories = configToLogDirectories(config.get)
val selfManagedMode = configToSelfManagedMode(config.get)
Exit.exit(infoCommand(System.out, selfManagedMode, directories))
case "format" =>
val directories = configToLogDirectories(config.get)
val clusterId = namespace.getString("cluster_id")
val metadataVersion = getMetadataVersion(namespace)
if (!metadataVersion.isKRaftSupported) {
throw new TerseFailure(s"Must specify a metadata version of at least 1.")
}
val metaProperties = buildMetadataProperties(clusterId, config.get)
val ignoreFormatted = namespace.getBoolean("ignore_formatted")
if (!configToSelfManagedMode(config.get)) {
throw new TerseFailure("The kafka configuration file appears to be for " +
"a legacy cluster. Formatting is only supported for clusters in KRaft mode.")
}
Exit.exit(formatCommand(System.out, directories, metaProperties, metadataVersion, ignoreFormatted))
case "random-uuid" =>
System.out.println(Uuid.randomUuid)
Exit.exit(0)
case _ =>
throw new RuntimeException(s"Unknown command $command")
}
} catch {
case e: TerseFailure =>
System.err.println(e.getMessage)
System.exit(1)
}
}
def parseArguments(args: Array[String]): Namespace = {
val parser = ArgumentParsers.
newArgumentParser("kafka-storage").
defaultHelp(true).
@ -55,41 +97,11 @@ object StorageTool extends Logging {
help("The cluster ID to use.")
formatParser.addArgument("--ignore-formatted", "-g").
action(storeTrue())
formatParser.addArgument("--metadata-version", "-v").
action(store()).
help(s"The initial metadata.version to use. Default is (${MetadataVersion.latest().featureLevel()}).")
val namespace = parser.parseArgsOrFail(args)
val command = namespace.getString("command")
val config = Option(namespace.getString("config")).flatMap(
p => Some(new KafkaConfig(Utils.loadProps(p))))
command match {
case "info" =>
val directories = configToLogDirectories(config.get)
val selfManagedMode = configToSelfManagedMode(config.get)
Exit.exit(infoCommand(System.out, selfManagedMode, directories))
case "format" =>
val directories = configToLogDirectories(config.get)
val clusterId = namespace.getString("cluster_id")
val metaProperties = buildMetadataProperties(clusterId, config.get)
val ignoreFormatted = namespace.getBoolean("ignore_formatted")
if (!configToSelfManagedMode(config.get)) {
throw new TerseFailure("The kafka configuration file appears to be for " +
"a legacy cluster. Formatting is only supported for clusters in KRaft mode.")
}
Exit.exit(formatCommand(System.out, directories, metaProperties, ignoreFormatted ))
case "random-uuid" =>
System.out.println(Uuid.randomUuid)
Exit.exit(0)
case _ =>
throw new RuntimeException(s"Unknown command $command")
}
} catch {
case e: TerseFailure =>
System.err.println(e.getMessage)
System.exit(1)
}
parser.parseArgsOrFail(args)
}
def configToLogDirectories(config: KafkaConfig): Seq[String] = {
@ -101,6 +113,12 @@ object StorageTool extends Logging {
def configToSelfManagedMode(config: KafkaConfig): Boolean = config.processRoles.nonEmpty
def getMetadataVersion(namespace: Namespace): MetadataVersion = {
Option(namespace.getString("metadata_version")).
map(mv => MetadataVersion.fromFeatureLevel(mv.toShort)).
getOrElse(MetadataVersion.latest())
}
def infoCommand(stream: PrintStream, selfManagedMode: Boolean, directories: Seq[String]): Int = {
val problems = new mutable.ArrayBuffer[String]
val foundDirectories = new mutable.ArrayBuffer[String]
@ -197,13 +215,16 @@ object StorageTool extends Logging {
case e: Throwable => throw new TerseFailure(s"Cluster ID string $clusterIdStr " +
s"does not appear to be a valid UUID: ${e.getMessage}")
}
require(config.nodeId >= 0, s"The node.id must be set to a non-negative integer.")
if (config.nodeId < 0) {
throw new TerseFailure(s"The node.id must be set to a non-negative integer. We saw ${config.nodeId}")
}
new MetaProperties(effectiveClusterId.toString, config.nodeId)
}
def formatCommand(stream: PrintStream,
directories: Seq[String],
metaProperties: MetaProperties,
metadataVersion: MetadataVersion,
ignoreFormatted: Boolean): Int = {
if (directories.isEmpty) {
throw new TerseFailure("No log directories found in the configuration.")
@ -231,6 +252,10 @@ object StorageTool extends Logging {
val metaPropertiesPath = Paths.get(directory, "meta.properties")
val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile)
checkpoint.write(metaProperties.toProperties)
val bootstrapMetadata = BootstrapMetadata.create(metadataVersion)
BootstrapMetadata.write(bootstrapMetadata, Paths.get(directory))
stream.println(s"Formatting ${directory}")
})
0

View File

@ -90,7 +90,7 @@ class TestRaftServer(
time,
metrics,
Some(threadNamePrefix),
CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters))
CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters)),
)
workloadGenerator = new RaftWorkloadGenerator(

View File

@ -19,6 +19,7 @@ package kafka.test;
import kafka.test.annotation.Type;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.common.MetadataVersion;
import java.io.File;
import java.util.HashMap;
@ -41,7 +42,7 @@ public class ClusterConfig {
private final SecurityProtocol securityProtocol;
private final String listenerName;
private final File trustStoreFile;
private final String ibp;
private final MetadataVersion metadataVersion;
private final Properties serverProperties = new Properties();
private final Properties producerProperties = new Properties();
@ -53,7 +54,7 @@ public class ClusterConfig {
ClusterConfig(Type type, int brokers, int controllers, String name, boolean autoStart,
SecurityProtocol securityProtocol, String listenerName, File trustStoreFile,
String ibp) {
MetadataVersion metadataVersion) {
this.type = type;
this.brokers = brokers;
this.controllers = controllers;
@ -62,7 +63,7 @@ public class ClusterConfig {
this.securityProtocol = securityProtocol;
this.listenerName = listenerName;
this.trustStoreFile = trustStoreFile;
this.ibp = ibp;
this.metadataVersion = metadataVersion;
}
public Type clusterType() {
@ -121,8 +122,8 @@ public class ClusterConfig {
return Optional.ofNullable(trustStoreFile);
}
public Optional<String> ibp() {
return Optional.ofNullable(ibp);
public Optional<MetadataVersion> metadataVersion() {
return Optional.ofNullable(metadataVersion);
}
public Properties brokerServerProperties(int brokerId) {
@ -130,16 +131,16 @@ public class ClusterConfig {
}
public Map<String, String> nameTags() {
Map<String, String> tags = new LinkedHashMap<>(3);
Map<String, String> tags = new LinkedHashMap<>(4);
name().ifPresent(name -> tags.put("Name", name));
ibp().ifPresent(ibp -> tags.put("IBP", ibp));
metadataVersion().ifPresent(mv -> tags.put("MetadataVersion", mv.toString()));
tags.put("Security", securityProtocol.name());
listenerName().ifPresent(listener -> tags.put("Listener", listener));
return tags;
}
public ClusterConfig copyOf() {
ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, ibp);
ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion);
copy.serverProperties.putAll(serverProperties);
copy.producerProperties.putAll(producerProperties);
copy.consumerProperties.putAll(consumerProperties);
@ -165,7 +166,7 @@ public class ClusterConfig {
private SecurityProtocol securityProtocol;
private String listenerName;
private File trustStoreFile;
private String ibp;
private MetadataVersion metadataVersion;
Builder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol) {
this.type = type;
@ -215,13 +216,13 @@ public class ClusterConfig {
return this;
}
public Builder ibp(String ibp) {
this.ibp = ibp;
public Builder metadataVersion(MetadataVersion metadataVersion) {
this.metadataVersion = metadataVersion;
return this;
}
public ClusterConfig build() {
return new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, ibp);
return new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion);
}
}
}

View File

@ -18,11 +18,13 @@
package kafka.test;
import kafka.network.SocketServer;
import kafka.server.BrokerFeatures;
import kafka.test.annotation.ClusterTest;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@ -95,6 +97,11 @@ public interface ClusterInstance {
*/
SocketServer anyControllerSocketServer();
/**
* Return a mapping of the underlying broker IDs to their supported features
*/
Map<Integer, BrokerFeatures> brokerFeatures();
/**
* The underlying object which is responsible for setting up and tearing down the cluster.
*/

View File

@ -18,6 +18,7 @@
package kafka.test.annotation;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.TestTemplate;
import java.lang.annotation.Documented;
@ -40,6 +41,6 @@ public @interface ClusterTest {
String name() default "";
SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
String listener() default "";
String ibp() default "";
MetadataVersion metadataVersion() default MetadataVersion.UNINITIALIZED;
ClusterConfigProperty[] serverProperties() default {};
}

View File

@ -25,6 +25,7 @@ import kafka.test.annotation.ClusterTemplate;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
@ -194,8 +195,8 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
properties.put(property.key(), property.value());
}
if (!annot.ibp().isEmpty()) {
builder.ibp(annot.ibp());
if (!annot.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
builder.metadataVersion(annot.metadataVersion());
}
ClusterConfig config = builder.build();

View File

@ -18,6 +18,7 @@
package kafka.test.junit;
import kafka.network.SocketServer;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.test.ClusterConfig;
@ -29,6 +30,7 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerState;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
import org.junit.jupiter.api.extension.Extension;
@ -38,6 +40,7 @@ import scala.compat.java8.OptionConverters;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -83,6 +86,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
return Arrays.asList(
(BeforeTestExecutionCallback) context -> {
TestKitNodes nodes = new TestKitNodes.Builder().
setBootstrapMetadataVersion(clusterConfig.metadataVersion().orElse(MetadataVersion.latest())).
setNumBrokerNodes(clusterConfig.numBrokers()).
setNumControllerNodes(clusterConfig.numControllers()).build();
nodes.brokerNodes().forEach((brokerId, brokerNode) -> {
@ -168,6 +172,14 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
.orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
}
@Override
public Map<Integer, BrokerFeatures> brokerFeatures() {
return brokers().collect(Collectors.toMap(
brokerServer -> brokerServer.config().nodeId(),
BrokerServer::brokerFeatures
));
}
@Override
public ClusterType clusterType() {
return ClusterType.RAFT;

View File

@ -19,6 +19,7 @@ package kafka.test.junit;
import kafka.api.IntegrationTestHarness;
import kafka.network.SocketServer;
import kafka.server.BrokerFeatures;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.test.ClusterConfig;
@ -41,6 +42,7 @@ import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
@ -106,7 +108,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
@Override
public Properties serverConfig() {
Properties props = clusterConfig.serverProperties();
clusterConfig.ibp().ifPresent(ibp -> props.put(KafkaConfig.InterBrokerProtocolVersionProp(), ibp));
clusterConfig.metadataVersion().ifPresent(mv -> props.put(KafkaConfig.InterBrokerProtocolVersionProp(), mv.version()));
return props;
}
@ -237,6 +239,14 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
.orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
}
@Override
public Map<Integer, BrokerFeatures> brokerFeatures() {
return servers().collect(Collectors.toMap(
brokerServer -> brokerServer.config().nodeId(),
KafkaServer::brokerFeatures
));
}
@Override
public ClusterType clusterType() {
return ClusterType.ZK;

View File

@ -34,10 +34,12 @@ import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.BootstrapMetadata;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -173,6 +175,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
String threadNamePrefix = String.format("controller%d_", node.id());
MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId().toString(), node.id());
TopicPartition metadataPartition = new TopicPartition(KafkaRaftServer.MetadataTopic(), 0);
BootstrapMetadata bootstrapMetadata = BootstrapMetadata.create(nodes.bootstrapMetadataVersion());
KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>(
metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(),
Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
@ -184,7 +187,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
new Metrics(),
Option.apply(threadNamePrefix),
connectFutureManager.future,
KafkaRaftServer.configSchema()
KafkaRaftServer.configSchema(),
raftManager.apiVersions(),
bootstrapMetadata
);
controllers.put(node.id(), controller);
controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
@ -335,6 +340,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
StorageTool.formatCommand(out,
JavaConverters.asScalaBuffer(Collections.singletonList(metadataLogDir)).toSeq(),
properties,
MetadataVersion.IBP_3_0_IV0,
false);
} finally {
for (String line : stream.toString().split(String.format("%n"))) {

View File

@ -20,6 +20,7 @@ package kafka.testkit;
import kafka.server.MetaProperties;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.server.common.MetadataVersion;
import java.nio.file.Paths;
import java.util.ArrayList;
@ -33,6 +34,7 @@ import java.util.TreeMap;
public class TestKitNodes {
public static class Builder {
private Uuid clusterId = null;
private MetadataVersion bootstrapMetadataVersion = null;
private final NavigableMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
private final NavigableMap<Integer, BrokerNode> brokerNodes = new TreeMap<>();
@ -41,6 +43,11 @@ public class TestKitNodes {
return this;
}
public Builder setBootstrapMetadataVersion(MetadataVersion metadataVersion) {
this.bootstrapMetadataVersion = metadataVersion;
return this;
}
public Builder addNodes(TestKitNode[] nodes) {
for (TestKitNode node : nodes) {
addNode(node);
@ -103,18 +110,24 @@ public class TestKitNodes {
if (clusterId == null) {
clusterId = Uuid.randomUuid();
}
return new TestKitNodes(clusterId, controllerNodes, brokerNodes);
if (bootstrapMetadataVersion == null) {
bootstrapMetadataVersion = MetadataVersion.latest();
}
return new TestKitNodes(clusterId, bootstrapMetadataVersion, controllerNodes, brokerNodes);
}
}
private final Uuid clusterId;
private final MetadataVersion bootstrapMetadataVersion;
private final NavigableMap<Integer, ControllerNode> controllerNodes;
private final NavigableMap<Integer, BrokerNode> brokerNodes;
private TestKitNodes(Uuid clusterId,
MetadataVersion bootstrapMetadataVersion,
NavigableMap<Integer, ControllerNode> controllerNodes,
NavigableMap<Integer, BrokerNode> brokerNodes) {
this.clusterId = clusterId;
this.bootstrapMetadataVersion = bootstrapMetadataVersion;
this.controllerNodes = controllerNodes;
this.brokerNodes = brokerNodes;
}
@ -123,6 +136,10 @@ public class TestKitNodes {
return clusterId;
}
public MetadataVersion bootstrapMetadataVersion() {
return bootstrapMetadataVersion;
}
public Map<Integer, ControllerNode> controllerNodes() {
return controllerNodes;
}
@ -161,7 +178,7 @@ public class TestKitNodes {
node.incarnationId(), absolutize(baseDirectory, node.metadataDirectory()),
absolutize(baseDirectory, node.logDataDirectories()), node.propertyOverrides()));
}
return new TestKitNodes(clusterId, newControllerNodes, newBrokerNodes);
return new TestKitNodes(clusterId, bootstrapMetadataVersion, newControllerNodes, newBrokerNodes);
}
private static List<String> absolutize(String base, Collection<String> directories) {

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.message.InitProducerIdRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{InitProducerIdRequest, InitProducerIdResponse}
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.extension.ExtendWith
@ -43,9 +44,9 @@ class ProducerIdsIntegrationTest {
}
@ClusterTests(Array(
new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "2.8"),
new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "3.0-IV0"),
new ClusterTest(clusterType = Type.KRAFT, brokers = 3, ibp = "3.0-IV0")
new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1),
new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0),
new ClusterTest(clusterType = Type.KRAFT, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0)
))
def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
verifyUniqueIds(clusterInstance)

View File

@ -30,18 +30,19 @@ import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, Descr
import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Tag, Test, Timeout}
import java.util
import java.util.concurrent.ExecutionException
import java.util.{Arrays, Collections, Optional}
import java.util
import java.util.{Arrays, Collections, Optional}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.server.common.MetadataVersion
import org.slf4j.LoggerFactory
import scala.annotation.nowarn
import scala.collection.mutable
import scala.concurrent.ExecutionException
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
import scala.jdk.CollectionConverters._
@ -69,8 +70,8 @@ class KRaftClusterTest {
def testCreateClusterAndWaitForBrokerInRunningState(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(3).
setNumControllerNodes(3).build()).build()
setNumBrokerNodes(1).
setNumControllerNodes(1).build()).build()
try {
cluster.format()
cluster.startup()
@ -292,6 +293,17 @@ class KRaftClusterTest {
}
}
@Test
def testCreateClusterInvalidMetadataVersion(): Unit = {
assertThrows(classOf[IllegalArgumentException], () => {
new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_2_7_IV0).
setNumBrokerNodes(1).
setNumControllerNodes(1).build()).build()
})
}
private def doOnStartedKafkaCluster(numControllerNodes: Int = 1,
numBrokerNodes: Int,
brokerPropertyOverrides: (TestKitNodes, BrokerNode) => Map[String, String])

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package integration.kafka.server
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterTest, ClusterTests, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType
import org.apache.kafka.clients.admin.{FeatureUpdate, UpdateFeaturesOptions}
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class MetadataVersionIntegrationTest {
@ClusterTests(value = Array(
new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_0_IV0),
new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_1_IV0),
new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_2_IV0)
))
def testBasicMetadataVersionUpgrade(clusterInstance: ClusterInstance): Unit = {
val admin = clusterInstance.createAdminClient()
val describeResult = admin.describeFeatures()
val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
assertEquals(ff.minVersionLevel(), clusterInstance.config().metadataVersion().get().featureLevel())
assertEquals(ff.maxVersionLevel(), clusterInstance.config().metadataVersion().get().featureLevel())
// Update to new version
val updateVersion = MetadataVersion.IBP_3_3_IV0.featureLevel.shortValue
val updateResult = admin.updateFeatures(
Map("metadata.version" -> new FeatureUpdate(updateVersion, UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
updateResult.all().get()
// Verify that new version is visible on broker
TestUtils.waitUntilTrue(() => {
val describeResult2 = admin.describeFeatures()
val ff2 = describeResult2.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
ff2.minVersionLevel() == updateVersion && ff2.maxVersionLevel() == updateVersion
}, "Never saw metadata.version increase on broker")
}
@ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV0)
def testUpgradeSameVersion(clusterInstance: ClusterInstance): Unit = {
val admin = clusterInstance.createAdminClient()
val updateVersion = MetadataVersion.IBP_3_3_IV0.featureLevel.shortValue
val updateResult = admin.updateFeatures(
Map("metadata.version" -> new FeatureUpdate(updateVersion, UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
updateResult.all().get()
}
@ClusterTest(clusterType = Type.KRAFT)
def testDefaultIsLatestVersion(clusterInstance: ClusterInstance): Unit = {
val admin = clusterInstance.createAdminClient()
val describeResult = admin.describeFeatures()
val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
assertEquals(ff.minVersionLevel(), MetadataVersion.latest().featureLevel())
assertEquals(ff.maxVersionLevel(), MetadataVersion.latest().featureLevel())
}
}

View File

@ -22,7 +22,6 @@ import java.net.InetSocketAddress
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent.CompletableFuture
import javax.security.auth.login.Configuration
import kafka.raft.KafkaRaftManager
import kafka.tools.StorageTool
@ -33,15 +32,18 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{Exit, Time}
import org.apache.kafka.controller.BootstrapMetadata
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, TestInfo}
import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, immutable}
import scala.jdk.CollectionConverters._
trait QuorumImplementation {
def createBroker(config: KafkaConfig,
@ -114,6 +116,10 @@ abstract class QuorumTestHarness extends Logging {
Seq(new Properties())
}
protected def metadataVersion: MetadataVersion = MetadataVersion.latest()
val bootstrapRecords: ListBuffer[ApiMessageAndVersion] = ListBuffer()
private var implementation: QuorumImplementation = null
def isKRaftTest(): Boolean = implementation.isInstanceOf[KRaftQuorumImplementation]
@ -227,7 +233,7 @@ abstract class QuorumTestHarness extends Logging {
var out: PrintStream = null
try {
out = new PrintStream(stream)
if (StorageTool.formatCommand(out, directories, metaProperties, false) != 0) {
if (StorageTool.formatCommand(out, directories, metaProperties, metadataVersion, ignoreFormatted = false) != 0) {
throw new RuntimeException(stream.toString())
}
debug(s"Formatted storage directory(ies) ${directories}")
@ -282,6 +288,8 @@ abstract class QuorumTestHarness extends Logging {
threadNamePrefix = Option(threadNamePrefix),
controllerQuorumVotersFuture = controllerQuorumVotersFuture,
configSchema = KafkaRaftServer.configSchema,
raftApiVersions = raftManager.apiVersions,
bootstrapMetadata = BootstrapMetadata.create(metadataVersion, bootstrapRecords.asJava),
)
controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e) => {
if (e != null) {

View File

@ -163,7 +163,7 @@ class ControllerChannelManagerTest {
def testLeaderAndIsrInterBrokerProtocolVersion(): Unit = {
testLeaderAndIsrRequestFollowsInterBrokerProtocolVersion(MetadataVersion.latest, ApiKeys.LEADER_AND_ISR.latestVersion)
for (metadataVersion <- MetadataVersion.VALUES) {
for (metadataVersion <- MetadataVersion.VERSIONS) {
val leaderAndIsrRequestVersion: Short =
if (metadataVersion.isAtLeast(IBP_3_2_IV0)) 6
else if (metadataVersion.isAtLeast(IBP_2_8_IV1)) 5
@ -380,7 +380,7 @@ class ControllerChannelManagerTest {
def testUpdateMetadataInterBrokerProtocolVersion(): Unit = {
testUpdateMetadataFollowsInterBrokerProtocolVersion(MetadataVersion.latest, ApiKeys.UPDATE_METADATA.latestVersion)
for (metadataVersion <- MetadataVersion.VALUES) {
for (metadataVersion <- MetadataVersion.VERSIONS) {
val updateMetadataRequestVersion: Short =
if (metadataVersion.isAtLeast(IBP_2_8_IV1)) 7
else if (metadataVersion.isAtLeast(IBP_2_4_IV1)) 6
@ -474,7 +474,7 @@ class ControllerChannelManagerTest {
@Test
def testStopReplicaRequestsWhileTopicQueuedForDeletion(): Unit = {
for (metadataVersion <- MetadataVersion.VALUES) {
for (metadataVersion <- MetadataVersion.VERSIONS) {
testStopReplicaRequestsWhileTopicQueuedForDeletion(metadataVersion)
}
}
@ -521,7 +521,7 @@ class ControllerChannelManagerTest {
@Test
def testStopReplicaRequestsWhileTopicDeletionStarted(): Unit = {
for (metadataVersion <- MetadataVersion.VALUES) {
for (metadataVersion <- MetadataVersion.VERSIONS) {
testStopReplicaRequestsWhileTopicDeletionStarted(metadataVersion)
}
}
@ -576,7 +576,7 @@ class ControllerChannelManagerTest {
@Test
def testStopReplicaRequestWithoutDeletePartitionWhileTopicDeletionStarted(): Unit = {
for (metadataVersion <- MetadataVersion.VALUES) {
for (metadataVersion <- MetadataVersion.VERSIONS) {
testStopReplicaRequestWithoutDeletePartitionWhileTopicDeletionStarted(metadataVersion)
}
}
@ -626,7 +626,7 @@ class ControllerChannelManagerTest {
testMixedDeleteAndNotDeleteStopReplicaRequests(MetadataVersion.latest,
ApiKeys.STOP_REPLICA.latestVersion)
for (metadataVersion <- MetadataVersion.VALUES) {
for (metadataVersion <- MetadataVersion.VERSIONS) {
if (metadataVersion.isLessThan(IBP_2_2_IV0))
testMixedDeleteAndNotDeleteStopReplicaRequests(metadataVersion, 0.toShort)
else if (metadataVersion.isLessThan(IBP_2_4_IV1))
@ -775,7 +775,7 @@ class ControllerChannelManagerTest {
def testStopReplicaInterBrokerProtocolVersion(): Unit = {
testStopReplicaFollowsInterBrokerProtocolVersion(MetadataVersion.latest, ApiKeys.STOP_REPLICA.latestVersion)
for (metadataVersion <- MetadataVersion.VALUES) {
for (metadataVersion <- MetadataVersion.VERSIONS) {
if (metadataVersion.isLessThan(IBP_2_2_IV0))
testStopReplicaFollowsInterBrokerProtocolVersion(metadataVersion, 0.toShort)
else if (metadataVersion.isLessThan(IBP_2_4_IV1))

View File

@ -1055,7 +1055,7 @@ class GroupMetadataManagerTest {
val protocol = "range"
val memberId = "memberId"
for (metadataVersion <- MetadataVersion.VALUES) {
for (metadataVersion <- MetadataVersion.VERSIONS) {
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, metadataVersion = metadataVersion)
val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time)
@ -2276,7 +2276,7 @@ class GroupMetadataManagerTest {
assertEquals(expectedLeaderEpoch, deserializedOffsetAndMetadata.leaderEpoch)
}
for (version <- MetadataVersion.VALUES) {
for (version <- MetadataVersion.VERSIONS) {
val expectedSchemaVersion = version match {
case v if v.isLessThan(IBP_2_1_IV0) => 1
case v if v.isLessThan(IBP_2_1_IV1) => 2
@ -2307,7 +2307,7 @@ class GroupMetadataManagerTest {
assertEquals(offsetAndMetadata, deserializedOffsetAndMetadata)
}
for (version <- MetadataVersion.VALUES)
for (version <- MetadataVersion.VERSIONS)
verifySerde(version)
}
@ -2335,7 +2335,7 @@ class GroupMetadataManagerTest {
assertEquals(offsetAndMetadata, deserializedOffsetAndMetadata)
}
for (version <- MetadataVersion.VALUES)
for (version <- MetadataVersion.VERSIONS)
verifySerde(version)
}

View File

@ -18,7 +18,6 @@ package kafka.raft
import java.util.concurrent.CompletableFuture
import java.util.Properties
import kafka.raft.KafkaRaftManager.RaftIoThread
import kafka.server.{KafkaConfig, MetaProperties}
import kafka.tools.TestRaftServer.ByteArraySerde

View File

@ -77,7 +77,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
ApiVersionsResponse.intersectForwardableApis(
ApiMessageType.ListenerType.BROKER,
RecordVersion.current,
new NodeApiVersions(ApiKeys.controllerApis().asScala.map(ApiVersionsResponse.toApiVersion).asJava).allSupportedApiVersions()
NodeApiVersions.create(ApiKeys.controllerApis().asScala.map(ApiVersionsResponse.toApiVersion).asJava).allSupportedApiVersions()
)
}

View File

@ -27,7 +27,7 @@ class BrokerFeaturesTest {
@Test
def testEmpty(): Unit = {
assertTrue(BrokerFeatures.createDefault().supportedFeatures.empty)
assertTrue(BrokerFeatures.createEmpty().supportedFeatures.empty)
}
@Test

View File

@ -73,7 +73,7 @@ class BrokerLifecycleManagerTest {
val metadata = new Metadata(1000, 1000, new LogContext(), new ClusterResourceListeners())
val mockClient = new MockClient(time, metadata)
val controllerNodeProvider = new SimpleControllerNodeProvider()
val nodeApiVersions = new NodeApiVersions(Seq(BROKER_REGISTRATION, BROKER_HEARTBEAT).map {
val nodeApiVersions = NodeApiVersions.create(Seq(BROKER_REGISTRATION, BROKER_HEARTBEAT).map {
apiKey => new ApiVersion().setApiKey(apiKey.id).
setMinVersion(apiKey.oldestVersion()).setMaxVersion(apiKey.latestVersion())
}.toList.asJava)

View File

@ -698,8 +698,8 @@ class KafkaConfigTest {
KafkaConfig.fromProps(props)
}
MetadataVersion.VALUES.foreach { interBrokerVersion =>
MetadataVersion.VALUES.foreach { messageFormatVersion =>
MetadataVersion.VERSIONS.foreach { interBrokerVersion =>
MetadataVersion.VERSIONS.foreach { messageFormatVersion =>
if (interBrokerVersion.highestSupportedRecordVersion.value >= messageFormatVersion.highestSupportedRecordVersion.value) {
val config = buildConfig(interBrokerVersion, messageFormatVersion)
assertEquals(interBrokerVersion, config.interBrokerProtocolVersion)

View File

@ -19,11 +19,11 @@ package kafka.server
import java.io.File
import java.nio.file.Files
import java.util.Properties
import kafka.common.{InconsistentBrokerMetadataException, InconsistentNodeIdException}
import kafka.log.UnifiedLog
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.controller.BootstrapMetadata
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@ -44,7 +44,7 @@ class KafkaRaftServerTest {
configProperties.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9093")
configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
val (loadedMetaProperties, offlineDirs) =
val (loadedMetaProperties, _, offlineDirs) =
invokeLoadMetaProperties(metaProperties, configProperties)
assertEquals(metaProperties, loadedMetaProperties)
@ -72,7 +72,7 @@ class KafkaRaftServerTest {
private def invokeLoadMetaProperties(
metaProperties: MetaProperties,
configProperties: Properties
): (MetaProperties, collection.Seq[String]) = {
): (MetaProperties, BootstrapMetadata, collection.Seq[String]) = {
val tempLogDir = TestUtils.tempDirectory()
try {
writeMetaProperties(tempLogDir, metaProperties)
@ -159,7 +159,7 @@ class KafkaRaftServerTest {
configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
val config = KafkaConfig.fromProps(configProperties)
val (loadedProperties, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
val (loadedProperties, _, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
assertEquals(nodeId, loadedProperties.nodeId)
assertEquals(Seq(invalidDir.getAbsolutePath), offlineDirs)
}

View File

@ -50,7 +50,7 @@ import scala.collection.Seq
class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness with Logging {
// Set this to IBP_0_11_0_IV1 to demonstrate the tests failing in the pre-KIP-101 case
val metadataVersion = MetadataVersion.latest
override def metadataVersion = MetadataVersion.latest
val topic = "topic1"
val msg = new Array[Byte](1000)
val msgBigger = new Array[Byte](10000)

View File

@ -26,6 +26,7 @@ import java.util.Properties
import kafka.server.{KafkaConfig, MetaProperties}
import kafka.utils.TestUtils
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.{Test, Timeout}
@ -160,11 +161,11 @@ Found problem:
clusterId = "XcZZOzUqS4yHOjhMQB6JLQ", nodeId = 2)
val stream = new ByteArrayOutputStream()
assertEquals(0, StorageTool.
formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, false))
formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, MetadataVersion.latest(), ignoreFormatted = false))
assertEquals("Formatting %s%n".format(tempDir), stream.toString())
try assertEquals(1, StorageTool.
formatCommand(new PrintStream(new ByteArrayOutputStream()), Seq(tempDir.toString), metaProperties, false)) catch {
formatCommand(new PrintStream(new ByteArrayOutputStream()), Seq(tempDir.toString), metaProperties, MetadataVersion.latest(), ignoreFormatted = false)) catch {
case e: TerseFailure => assertEquals(s"Log directory ${tempDir} is already " +
"formatted. Use --ignore-formatted to ignore this directory and format the " +
"others.", e.getMessage)
@ -172,7 +173,7 @@ Found problem:
val stream2 = new ByteArrayOutputStream()
assertEquals(0, StorageTool.
formatCommand(new PrintStream(stream2), Seq(tempDir.toString), metaProperties, true))
formatCommand(new PrintStream(stream2), Seq(tempDir.toString), metaProperties, MetadataVersion.latest(), ignoreFormatted = true))
assertEquals("All of the log directories are already formatted.%n".format(), stream2.toString())
} finally Utils.delete(tempDir)
}
@ -185,4 +186,19 @@ Found problem:
"16 bytes of a base64-encoded UUID", assertThrows(classOf[TerseFailure],
() => StorageTool.buildMetadataProperties("invalid", config)).getMessage)
}
@Test
def testDefaultMetadataVersion(): Unit = {
var namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ"))
var mv = StorageTool.getMetadataVersion(namespace)
assertEquals(MetadataVersion.latest().featureLevel(), mv.featureLevel(),
"Expected the default metadata.version to be the latest version")
namespace = StorageTool.parseArguments(Array("format", "-c", "config.props",
"--metadata-version", MetadataVersion.latest().featureLevel().toString, "-t", "XcZZOzUqS4yHOjhMQB6JLQ"))
mv = StorageTool.getMetadataVersion(namespace)
assertEquals(MetadataVersion.latest().featureLevel(), mv.featureLevel(),
"Expected the default metadata.version to be the latest version")
}
}

View File

@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.metadata.util.SnapshotFileReader;
import org.apache.kafka.metadata.util.SnapshotFileWriter;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.snapshot.SnapshotReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
/**
* A read-only class that holds the controller bootstrap metadata. A file named "bootstrap.snapshot" is used and the
* format is the same as a KRaft snapshot.
*/
public class BootstrapMetadata {
private static final Logger log = LoggerFactory.getLogger(BootstrapMetadata.class);
public static final String BOOTSTRAP_FILE = "bootstrap.checkpoint";
private final MetadataVersion metadataVersion;
private final List<ApiMessageAndVersion> records;
BootstrapMetadata(MetadataVersion metadataVersion, List<ApiMessageAndVersion> records) {
this.metadataVersion = metadataVersion;
this.records = Collections.unmodifiableList(records);
}
public MetadataVersion metadataVersion() {
return this.metadataVersion;
}
public List<ApiMessageAndVersion> records() {
return records;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BootstrapMetadata metadata = (BootstrapMetadata) o;
return metadataVersion == metadata.metadataVersion;
}
@Override
public int hashCode() {
return Objects.hash(metadataVersion);
}
@Override
public String toString() {
return "BootstrapMetadata{" +
"metadataVersion=" + metadataVersion +
'}';
}
/**
* A raft client listener that simply collects all of the commits and snapshots into a mapping of
* metadata record type to list of records.
*/
private static class BootstrapListener implements RaftClient.Listener<ApiMessageAndVersion> {
private final List<ApiMessageAndVersion> records = new ArrayList<>();
@Override
public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
try {
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
records.addAll(batch.records());
}
} finally {
reader.close();
}
}
@Override
public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
try {
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
for (ApiMessageAndVersion messageAndVersion : batch) {
records.add(messageAndVersion);
}
}
} finally {
reader.close();
}
}
}
public static BootstrapMetadata create(MetadataVersion metadataVersion) {
return create(metadataVersion, new ArrayList<>());
}
public static BootstrapMetadata create(MetadataVersion metadataVersion, List<ApiMessageAndVersion> records) {
if (!metadataVersion.isKRaftSupported()) {
throw new IllegalArgumentException("Cannot create BootstrapMetadata with a non-KRaft metadata version.");
}
records.add(new ApiMessageAndVersion(
new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(metadataVersion.featureLevel()),
FeatureLevelRecord.LOWEST_SUPPORTED_VERSION));
return new BootstrapMetadata(metadataVersion, records);
}
/**
* Load a bootstrap snapshot into a read-only bootstrap metadata object and return it.
*
* @param bootstrapDir The directory from which to read the snapshot file.
* @param fallbackPreviewVersion The metadata.version to boostrap if upgrading from KRaft preview
* @return The read-only bootstrap metadata
* @throws Exception
*/
public static BootstrapMetadata load(Path bootstrapDir, MetadataVersion fallbackPreviewVersion) throws Exception {
final Path bootstrapPath = bootstrapDir.resolve(BOOTSTRAP_FILE);
if (!Files.exists(bootstrapPath)) {
log.debug("Missing bootstrap file, this appears to be a KRaft preview cluster. Setting metadata.version to {}.",
fallbackPreviewVersion.featureLevel());
return BootstrapMetadata.create(fallbackPreviewVersion);
}
BootstrapListener listener = new BootstrapListener();
try (SnapshotFileReader reader = new SnapshotFileReader(bootstrapPath.toString(), listener)) {
reader.startup();
reader.caughtUpFuture().get();
} catch (ExecutionException e) {
throw new Exception("Failed to load snapshot", e.getCause());
}
Optional<FeatureLevelRecord> metadataVersionRecord = listener.records.stream()
.flatMap(message -> {
MetadataRecordType type = MetadataRecordType.fromId(message.message().apiKey());
if (!type.equals(MetadataRecordType.FEATURE_LEVEL_RECORD)) {
return Stream.empty();
}
FeatureLevelRecord record = (FeatureLevelRecord) message.message();
if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
return Stream.of(record);
} else {
return Stream.empty();
}
})
.findFirst();
if (metadataVersionRecord.isPresent()) {
return new BootstrapMetadata(MetadataVersion.fromFeatureLevel(metadataVersionRecord.get().featureLevel()), listener.records);
} else {
throw new RuntimeException("Expected a metadata.version to exist in the snapshot " + bootstrapPath + ", but none was found");
}
}
/**
* Write a set of bootstrap metadata to the bootstrap snapshot in a given directory
*
* @param metadata The metadata to persist
* @param bootstrapDir The directory in which to create the bootstrap snapshot
* @throws IOException
*/
public static void write(BootstrapMetadata metadata, Path bootstrapDir) throws IOException {
final Path bootstrapPath = bootstrapDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE);
if (Files.exists(bootstrapPath)) {
throw new IOException("Cannot write metadata bootstrap file " + bootstrapPath +
". File already already exists.");
}
try (SnapshotFileWriter bootstrapWriter = SnapshotFileWriter.open(bootstrapPath)) {
bootstrapWriter.append(metadata.records());
}
}
}

View File

@ -336,8 +336,7 @@ public class ClusterControlManager {
heartbeatManager.register(brokerId, record.fenced());
List<ApiMessageAndVersion> records = new ArrayList<>();
records.add(new ApiMessageAndVersion(record,
REGISTER_BROKER_RECORD.highestSupportedVersion()));
records.add(new ApiMessageAndVersion(record, REGISTER_BROKER_RECORD.highestSupportedVersion()));
return ControllerResult.atomicOf(records, new BrokerRegistrationReply(brokerEpoch));
}
@ -535,8 +534,7 @@ public class ClusterControlManager {
setEndPoints(endpoints).
setFeatures(features).
setRack(registration.rack().orElse(null)).
setFenced(registration.fenced()),
REGISTER_BROKER_RECORD.highestSupportedVersion()));
setFenced(registration.fenced()), REGISTER_BROKER_RECORD.highestSupportedVersion()));
return batch;
}
}

View File

@ -25,7 +25,9 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.TreeMap;
import java.util.function.Consumer;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
@ -33,10 +35,12 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineObject;
import org.slf4j.Logger;
import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_RECORD;
@ -55,6 +59,11 @@ public class FeatureControlManager {
*/
private final TimelineHashMap<String, Short> finalizedVersions;
/**
* The current metadata version
*/
private final TimelineObject<MetadataVersion> metadataVersion;
FeatureControlManager(LogContext logContext,
QuorumFeatures quorumFeatures,
@ -62,13 +71,15 @@ public class FeatureControlManager {
this.log = logContext.logger(FeatureControlManager.class);
this.quorumFeatures = quorumFeatures;
this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
this.metadataVersion = new TimelineObject<>(snapshotRegistry, MetadataVersion.UNINITIALIZED);
}
ControllerResult<Map<String, ApiError>> updateFeatures(
Map<String, Short> updates,
Map<String, FeatureUpdate.UpgradeType> upgradeTypes,
Map<Integer, Map<String, VersionRange>> brokerFeatures,
boolean validateOnly) {
boolean validateOnly
) {
TreeMap<String, ApiError> results = new TreeMap<>();
List<ApiMessageAndVersion> records = new ArrayList<>();
for (Entry<String, Short> entry : updates.entrySet()) {
@ -83,9 +94,28 @@ public class FeatureControlManager {
}
}
boolean canSupportVersion(String featureName, short versionRange) {
return quorumFeatures.localSupportedFeature(featureName)
.filter(localRange -> localRange.contains(versionRange))
ControllerResult<Map<String, ApiError>> initializeMetadataVersion(short initVersion) {
if (!metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
return ControllerResult.atomicOf(
Collections.emptyList(),
Collections.singletonMap(
MetadataVersion.FEATURE_NAME,
new ApiError(Errors.INVALID_UPDATE_VERSION,
"Cannot initialize metadata.version to " + initVersion + " since it has already been " +
"initialized to " + metadataVersion().featureLevel() + ".")
));
}
List<ApiMessageAndVersion> records = new ArrayList<>();
ApiError result = updateMetadataVersion(initVersion, false, records::add);
return ControllerResult.atomicOf(records, Collections.singletonMap(MetadataVersion.FEATURE_NAME, result));
}
/**
* Test if the quorum can support this feature and version
*/
boolean canSupportVersion(String featureName, short version) {
return quorumFeatures.quorumSupportedFeature(featureName)
.filter(versionRange -> versionRange.contains(version))
.isPresent();
}
@ -93,49 +123,68 @@ public class FeatureControlManager {
return quorumFeatures.localSupportedFeature(featureName).isPresent();
}
private ApiError updateFeature(String featureName,
MetadataVersion metadataVersion() {
return metadataVersion.get();
}
private ApiError updateFeature(
String featureName,
short newVersion,
FeatureUpdate.UpgradeType upgradeType,
Map<Integer, Map<String, VersionRange>> brokersAndFeatures,
List<ApiMessageAndVersion> records) {
List<ApiMessageAndVersion> records
) {
if (!featureExists(featureName)) {
return new ApiError(Errors.INVALID_UPDATE_VERSION,
return invalidUpdateVersion(featureName, newVersion,
"The controller does not support the given feature.");
}
if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) {
return new ApiError(Errors.INVALID_UPDATE_VERSION,
return invalidUpdateVersion(featureName, newVersion,
"The controller does not support the given upgrade type.");
}
final Short currentVersion = finalizedVersions.get(featureName);
final Short currentVersion;
if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
currentVersion = metadataVersion.get().featureLevel();
} else {
currentVersion = finalizedVersions.get(featureName);
}
if (newVersion <= 0) {
return new ApiError(Errors.INVALID_UPDATE_VERSION,
"The upper value for the new range cannot be less than 1.");
return invalidUpdateVersion(featureName, newVersion,
"A feature version cannot be less than 1.");
}
if (!canSupportVersion(featureName, newVersion)) {
return new ApiError(Errors.INVALID_UPDATE_VERSION,
"The controller does not support the given feature range.");
return invalidUpdateVersion(featureName, newVersion,
"The quorum does not support the given feature version.");
}
for (Entry<Integer, Map<String, VersionRange>> brokerEntry : brokersAndFeatures.entrySet()) {
VersionRange brokerRange = brokerEntry.getValue().get(featureName);
if (brokerRange == null || !brokerRange.contains(newVersion)) {
return new ApiError(Errors.INVALID_UPDATE_VERSION,
if (brokerRange == null) {
return invalidUpdateVersion(featureName, newVersion,
"Broker " + brokerEntry.getKey() + " does not support this feature.");
} else if (!brokerRange.contains(newVersion)) {
return invalidUpdateVersion(featureName, newVersion,
"Broker " + brokerEntry.getKey() + " does not support the given " +
"feature range.");
"version. It supports " + brokerRange.min() + " to " + brokerRange.max() + ".");
}
}
if (currentVersion != null && newVersion < currentVersion) {
if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) {
return new ApiError(Errors.INVALID_UPDATE_VERSION,
"Can't downgrade the maximum version of this feature without setting the upgrade type to safe or unsafe downgrade.");
return invalidUpdateVersion(featureName, newVersion,
"Can't downgrade the version of this feature without setting the " +
"upgrade type to either safe or unsafe downgrade.");
}
}
if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
// Perform additional checks if we're updating metadata.version
return updateMetadataVersion(newVersion, upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add);
} else {
records.add(new ApiMessageAndVersion(
new FeatureLevelRecord()
.setName(featureName)
@ -143,34 +192,120 @@ public class FeatureControlManager {
FEATURE_LEVEL_RECORD.highestSupportedVersion()));
return ApiError.NONE;
}
}
FinalizedControllerFeatures finalizedFeatures(long lastCommittedOffset) {
private ApiError invalidUpdateVersion(String feature, short version, String message) {
String errorMessage = String.format("Invalid update version %d for feature %s. %s", version, feature, message);
log.debug(errorMessage);
return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage);
}
/**
* Perform some additional validation for metadata.version updates.
*/
private ApiError updateMetadataVersion(
short newVersionLevel,
boolean allowUnsafeDowngrade,
Consumer<ApiMessageAndVersion> recordConsumer
) {
Optional<VersionRange> quorumSupported = quorumFeatures.quorumSupportedFeature(MetadataVersion.FEATURE_NAME);
if (!quorumSupported.isPresent()) {
return invalidMetadataVersion(newVersionLevel, "The quorum does not support metadata.version.");
}
if (newVersionLevel <= 0) {
return invalidMetadataVersion(newVersionLevel, "KRaft mode/the quorum does not support metadata.version values less than 1.");
}
if (!quorumSupported.get().contains(newVersionLevel)) {
return invalidMetadataVersion(newVersionLevel, "The controller quorum does support this version.");
}
MetadataVersion currentVersion = metadataVersion();
final MetadataVersion newVersion;
try {
newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel);
} catch (IllegalArgumentException e) {
return invalidMetadataVersion(newVersionLevel, "Unknown metadata.version.");
}
if (!currentVersion.equals(MetadataVersion.UNINITIALIZED) && newVersion.isLessThan(currentVersion)) {
// This is a downgrade
boolean metadataChanged = MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion);
if (!metadataChanged) {
log.info("Downgrading metadata.version from {} to {}.", currentVersion, newVersion);
} else {
return invalidMetadataVersion(newVersionLevel, "Unsafe metadata.version downgrades are not supported.");
}
}
recordConsumer.accept(new ApiMessageAndVersion(
new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(newVersionLevel), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
return ApiError.NONE;
}
private ApiError invalidMetadataVersion(short version, String message) {
String errorMessage = String.format("Invalid metadata.version %d. %s", version, message);
log.error(errorMessage);
return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage);
}
FinalizedControllerFeatures finalizedFeatures(long epoch) {
Map<String, Short> features = new HashMap<>();
for (Entry<String, Short> entry : finalizedVersions.entrySet(lastCommittedOffset)) {
if (!metadataVersion.get(epoch).equals(MetadataVersion.UNINITIALIZED)) {
features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get(epoch).featureLevel());
}
for (Entry<String, Short> entry : finalizedVersions.entrySet(epoch)) {
features.put(entry.getKey(), entry.getValue());
}
return new FinalizedControllerFeatures(features, lastCommittedOffset);
return new FinalizedControllerFeatures(features, epoch);
}
public void replay(FeatureLevelRecord record) {
if (!canSupportVersion(record.name(), record.featureLevel())) {
throw new RuntimeException("Controller cannot support feature " + record.name() +
" at version " + record.featureLevel());
}
if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
log.info("Setting metadata.version to {}", record.featureLevel());
metadataVersion.set(MetadataVersion.fromFeatureLevel(record.featureLevel()));
} else {
log.info("Setting feature {} to {}", record.name(), record.featureLevel());
finalizedVersions.put(record.name(), record.featureLevel());
}
}
class FeatureControlIterator implements Iterator<List<ApiMessageAndVersion>> {
private final Iterator<Entry<String, Short>> iterator;
private final MetadataVersion metadataVersion;
private boolean wroteVersion = false;
FeatureControlIterator(long epoch) {
this.iterator = finalizedVersions.entrySet(epoch).iterator();
this.metadataVersion = FeatureControlManager.this.metadataVersion.get(epoch);
if (this.metadataVersion.equals(MetadataVersion.UNINITIALIZED)) {
this.wroteVersion = true;
}
}
@Override
public boolean hasNext() {
return iterator.hasNext();
return !wroteVersion || iterator.hasNext();
}
@Override
public List<ApiMessageAndVersion> next() {
// Write the metadata.version first
if (!wroteVersion) {
wroteVersion = true;
return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
}
// Then write the rest of the features
if (!hasNext()) throw new NoSuchElementException();
Entry<String, Short> entry = iterator.next();
return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()

View File

@ -88,6 +88,7 @@ import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.snapshot.SnapshotReader;
@ -134,6 +135,10 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
* the fact that the controller may have several operations in progress at any given
* point. The future associated with each operation will not be completed until the
* results of the operation have been made durable to the metadata log.
*
* The QuorumController uses the "metadata.version" feature flag as a mechanism to control
* the usage of new log record schemas. Starting with 3.3, this version must be set before
* the controller can fully initialize.
*/
public final class QuorumController implements Controller {
/**
@ -161,6 +166,7 @@ public final class QuorumController implements Controller {
private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP;
private Optional<ClusterMetadataAuthorizer> authorizer = Optional.empty();
private Map<String, Object> staticConfig = Collections.emptyMap();
private BootstrapMetadata bootstrapMetadata = null;
public Builder(int nodeId, String clusterId) {
this.nodeId = nodeId;
@ -241,6 +247,11 @@ public final class QuorumController implements Controller {
return this;
}
public Builder setBootstrapMetadata(BootstrapMetadata bootstrapMetadata) {
this.bootstrapMetadata = bootstrapMetadata;
return this;
}
public Builder setCreateTopicPolicy(Optional<CreateTopicPolicy> createTopicPolicy) {
this.createTopicPolicy = createTopicPolicy;
return this;
@ -271,6 +282,9 @@ public final class QuorumController implements Controller {
if (raftClient == null) {
throw new RuntimeException("You must set a raft client.");
}
if (bootstrapMetadata == null || bootstrapMetadata.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
throw new RuntimeException("You must specify an initial metadata.version using the kafka-storage tool.");
}
if (quorumFeatures == null) {
throw new RuntimeException("You must specify the quorum features");
}
@ -293,7 +307,7 @@ public final class QuorumController implements Controller {
defaultNumPartitions, isLeaderRecoverySupported, replicaPlacer, snapshotMaxNewRecordBytes,
leaderImbalanceCheckIntervalNs, sessionTimeoutNs, controllerMetrics,
createTopicPolicy, alterConfigPolicy, configurationValidator, authorizer,
staticConfig);
staticConfig, bootstrapMetadata);
} catch (Exception e) {
Utils.closeQuietly(queue, "event queue");
throw e;
@ -886,16 +900,58 @@ public final class QuorumController implements Controller {
newEpoch + ", but we never renounced controller epoch " +
curEpoch);
}
log.info(
"Becoming the active controller at epoch {}, committed offset {} and committed epoch {}.",
newEpoch, lastCommittedOffset, lastCommittedEpoch
);
curClaimEpoch = newEpoch;
controllerMetrics.setActive(true);
writeOffset = lastCommittedOffset;
clusterControl.activate();
// Check if we need to bootstrap metadata into the log. This must happen before we can
// write any other records to the log since we need the metadata.version to determine the correct
// record version
final MetadataVersion metadataVersion;
if (featureControl.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
final CompletableFuture<Map<String, ApiError>> future;
if (!bootstrapMetadata.metadataVersion().isKRaftSupported()) {
metadataVersion = MetadataVersion.UNINITIALIZED;
future = new CompletableFuture<>();
future.completeExceptionally(
new IllegalStateException("Cannot become leader without an initial metadata.version of " +
"at least 1. Got " + bootstrapMetadata.metadataVersion().featureLevel()));
} else {
metadataVersion = bootstrapMetadata.metadataVersion();
future = appendWriteEvent("bootstrapMetadata", OptionalLong.empty(), () -> {
if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_3_IV0)) {
log.info("Initializing metadata.version to {}", metadataVersion.featureLevel());
} else {
log.info("Upgrading from KRaft preview. Initializing metadata.version to {}",
metadataVersion.featureLevel());
}
return ControllerResult.atomicOf(bootstrapMetadata.records(), null);
});
}
future.whenComplete((result, exception) -> {
if (exception != null) {
log.error("Failed to bootstrap metadata.", exception);
appendRaftEvent("bootstrapMetadata[" + curClaimEpoch + "]", () -> {
log.warn("Renouncing the leadership at oldEpoch {} since we could not bootstrap " +
"metadata. Reverting to last committed offset {}.",
curClaimEpoch, lastCommittedOffset);
renounce();
});
}
});
} else {
metadataVersion = featureControl.metadataVersion();
}
log.info(
"Becoming the active controller at epoch {}, committed offset {}, committed epoch {}, and metadata.version {}",
newEpoch, lastCommittedOffset, lastCommittedEpoch, metadataVersion.featureLevel()
);
// Before switching to active, create an in-memory snapshot at the last committed offset. This is
// required because the active controller assumes that there is always an in-memory snapshot at the
// last committed offset.
@ -1343,6 +1399,8 @@ public final class QuorumController implements Controller {
*/
private ImbalanceSchedule imbalancedScheduled = ImbalanceSchedule.DEFERRED;
private final BootstrapMetadata bootstrapMetadata;
private QuorumController(LogContext logContext,
int nodeId,
String clusterId,
@ -1363,7 +1421,8 @@ public final class QuorumController implements Controller {
Optional<AlterConfigPolicy> alterConfigPolicy,
ConfigurationValidator configurationValidator,
Optional<ClusterMetadataAuthorizer> authorizer,
Map<String, Object> staticConfig) {
Map<String, Object> staticConfig,
BootstrapMetadata bootstrapMetadata) {
this.logContext = logContext;
this.log = logContext.logger(QuorumController.class);
this.nodeId = nodeId;
@ -1414,6 +1473,7 @@ public final class QuorumController implements Controller {
authorizer.ifPresent(a -> a.setAclMutator(this));
this.aclControlManager = new AclControlManager(snapshotRegistry, authorizer);
this.raftClient = raftClient;
this.bootstrapMetadata = bootstrapMetadata;
this.metaLogListener = new QuorumMetaLogListener();
this.curClaimEpoch = -1;
this.writeOffset = -1L;

View File

@ -17,32 +17,102 @@
package org.apache.kafka.controller;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.MetadataVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.stream.Collectors;
/**
* A holder class of the local node's supported feature flags.
* A holder class of the local node's supported feature flags as well as the ApiVersions of other nodes.
*/
public class QuorumFeatures {
private final int nodeId;
private final Map<String, VersionRange> supportedFeatures;
private static final Logger log = LoggerFactory.getLogger(QuorumFeatures.class);
QuorumFeatures(int nodeId,
Map<String, VersionRange> supportedFeatures) {
private final int nodeId;
private final ApiVersions apiVersions;
private final Map<String, VersionRange> supportedFeatures;
private final List<Integer> quorumNodeIds;
QuorumFeatures(
int nodeId,
ApiVersions apiVersions,
Map<String, VersionRange> supportedFeatures,
List<Integer> quorumNodeIds
) {
this.nodeId = nodeId;
this.apiVersions = apiVersions;
this.supportedFeatures = Collections.unmodifiableMap(supportedFeatures);
this.quorumNodeIds = Collections.unmodifiableList(quorumNodeIds);
}
public static QuorumFeatures create(int nodeId,
Map<String, VersionRange> supportedFeatures) {
return new QuorumFeatures(nodeId, supportedFeatures);
public static QuorumFeatures create(
int nodeId,
ApiVersions apiVersions,
Map<String, VersionRange> supportedFeatures,
Collection<Node> quorumNodes
) {
List<Integer> nodeIds = quorumNodes.stream().map(Node::id).collect(Collectors.toList());
return new QuorumFeatures(nodeId, apiVersions, supportedFeatures, nodeIds);
}
public static Map<String, VersionRange> defaultFeatureMap() {
return Collections.emptyMap();
Map<String, VersionRange> features = new HashMap<>(1);
features.put(MetadataVersion.FEATURE_NAME, VersionRange.of(MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.latest().featureLevel()));
return features;
}
Optional<VersionRange> quorumSupportedFeature(String featureName) {
List<VersionRange> supportedVersions = new ArrayList<>(quorumNodeIds.size());
for (int nodeId : quorumNodeIds) {
if (nodeId == this.nodeId) {
// We get this node's features from "supportedFeatures"
continue;
}
NodeApiVersions nodeVersions = apiVersions.get(Integer.toString(nodeId));
if (nodeVersions == null) {
continue;
}
SupportedVersionRange supportedRange = nodeVersions.supportedFeatures().get(featureName);
if (supportedRange == null) {
supportedVersions.add(VersionRange.of(0, 0));
} else {
supportedVersions.add(VersionRange.of(supportedRange.min(), supportedRange.max()));
}
}
localSupportedFeature(featureName).ifPresent(supportedVersions::add);
if (supportedVersions.isEmpty()) {
return Optional.empty();
} else {
OptionalInt highestMinVersion = supportedVersions.stream().mapToInt(VersionRange::min).max();
OptionalInt lowestMaxVersion = supportedVersions.stream().mapToInt(VersionRange::max).min();
if (highestMinVersion.isPresent() && lowestMaxVersion.isPresent()) {
if (highestMinVersion.getAsInt() <= lowestMaxVersion.getAsInt()) {
if (supportedVersions.size() < quorumNodeIds.size()) {
log.info("Using incomplete set of quorum supported features.");
}
return Optional.of(VersionRange.of((short) highestMinVersion.getAsInt(), (short) lowestMaxVersion.getAsInt()));
} else {
return Optional.empty();
}
} else {
return Optional.empty();
}
}
}
Optional<VersionRange> localSupportedFeature(String featureName) {

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.metadata.authorizer.StandardAcl;
import org.apache.kafka.metadata.authorizer.StandardAclWithId;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.HashMap;
import java.util.LinkedHashMap;
@ -51,6 +52,10 @@ public final class AclsDelta {
this.isSnapshotDelta = true;
}
public void handleMetadataVersionChange(MetadataVersion newVersion) {
// no-op
}
public boolean isSnapshotDelta() {
return isSnapshotDelta;
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.image;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.HashMap;
import java.util.Map;
@ -47,6 +48,10 @@ public final class ClientQuotasDelta {
}
}
public void handleMetadataVersionChange(MetadataVersion newVersion) {
// no-op
}
public void replay(ClientQuotaRecord record) {
ClientQuotaEntity entity = ClientQuotaImage.dataToEntity(record.entity());
ClientQuotaDelta change = changes.computeIfAbsent(entity, __ ->

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.HashMap;
import java.util.Map;
@ -61,6 +62,10 @@ public final class ClusterDelta {
}
}
public void handleMetadataVersionChange(MetadataVersion newVersion) {
// no-op
}
public void replay(RegisterBrokerRecord record) {
BrokerRegistration broker = BrokerRegistration.fromRecord(record);
changedBrokers.put(broker.id(), Optional.of(broker));

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.HashMap;
import java.util.Map;
@ -52,6 +53,10 @@ public final class ConfigurationsDelta {
}
}
public void handleMetadataVersionChange(MetadataVersion newVersion) {
// no-op
}
public void replay(ConfigRecord record) {
ConfigResource resource =
new ConfigResource(Type.forId(record.resourceType()), record.resourceName());

View File

@ -19,6 +19,7 @@ package org.apache.kafka.image;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.HashMap;
import java.util.Map;
@ -34,6 +35,8 @@ public final class FeaturesDelta {
private final Map<String, Optional<Short>> changes = new HashMap<>();
private MetadataVersion metadataVersionChange = null;
public FeaturesDelta(FeaturesImage image) {
this.image = image;
}
@ -42,6 +45,10 @@ public final class FeaturesDelta {
return changes;
}
public Optional<MetadataVersion> metadataVersionChange() {
return Optional.ofNullable(metadataVersionChange);
}
public void finishSnapshot() {
for (String featureName : image.finalizedVersions().keySet()) {
if (!changes.containsKey(featureName)) {
@ -51,12 +58,20 @@ public final class FeaturesDelta {
}
public void replay(FeatureLevelRecord record) {
if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
metadataVersionChange = MetadataVersion.fromFeatureLevel(record.featureLevel());
} else {
changes.put(record.name(), Optional.of(record.featureLevel()));
}
}
public void replay(RemoveFeatureLevelRecord record) {
if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
metadataVersionChange = null;
} else {
changes.put(record.name(), Optional.empty());
}
}
public FeaturesImage apply() {
Map<String, Short> newFinalizedVersions =
@ -80,13 +95,20 @@ public final class FeaturesDelta {
}
}
return new FeaturesImage(newFinalizedVersions);
final MetadataVersion metadataVersion;
if (metadataVersionChange == null) {
metadataVersion = image.metadataVersion();
} else {
metadataVersion = metadataVersionChange;
}
return new FeaturesImage(newFinalizedVersions, metadataVersion);
}
@Override
public String toString() {
return "FeaturesDelta(" +
"changes=" + changes +
", metadataVersionChange=" + metadataVersionChange +
')';
}
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.image;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.ArrayList;
import java.util.Collections;
@ -37,18 +38,25 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_
* This class is thread-safe.
*/
public final class FeaturesImage {
public static final FeaturesImage EMPTY = new FeaturesImage(Collections.emptyMap());
public static final FeaturesImage EMPTY = new FeaturesImage(Collections.emptyMap(), MetadataVersion.UNINITIALIZED);
private final Map<String, Short> finalizedVersions;
public FeaturesImage(Map<String, Short> finalizedVersions) {
private final MetadataVersion metadataVersion;
public FeaturesImage(Map<String, Short> finalizedVersions, MetadataVersion metadataVersion) {
this.finalizedVersions = Collections.unmodifiableMap(finalizedVersions);
this.metadataVersion = metadataVersion;
}
public boolean isEmpty() {
return finalizedVersions.isEmpty();
}
public MetadataVersion metadataVersion() {
return metadataVersion;
}
Map<String, Short> finalizedVersions() {
return finalizedVersions;
}
@ -59,7 +67,16 @@ public final class FeaturesImage {
public void write(Consumer<List<ApiMessageAndVersion>> out) {
List<ApiMessageAndVersion> batch = new ArrayList<>();
// Write out the metadata.version record first, and then the rest of the finalized features
if (!metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
}
for (Entry<String, Short> entry : finalizedVersions.entrySet()) {
if (entry.getKey().equals(MetadataVersion.FEATURE_NAME)) {
continue;
}
batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(entry.getKey()).
setFeatureLevel(entry.getValue()), FEATURE_LEVEL_RECORD.highestSupportedVersion()));
@ -84,6 +101,7 @@ public final class FeaturesImage {
public String toString() {
return "FeaturesImage{" +
"finalizedVersions=" + finalizedVersions +
", metadataVersion=" + metadataVersion +
'}';
}
}

View File

@ -37,9 +37,11 @@ import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
/**
@ -143,6 +145,14 @@ public final class MetadataDelta {
return aclsDelta;
}
public Optional<MetadataVersion> metadataVersionChanged() {
if (featuresDelta == null) {
return Optional.empty();
} else {
return featuresDelta.metadataVersionChange();
}
}
public void read(long highestOffset, int highestEpoch, Iterator<List<ApiMessageAndVersion>> reader) {
while (reader.hasNext()) {
List<ApiMessageAndVersion> batch = reader.next();
@ -253,6 +263,15 @@ public final class MetadataDelta {
public void replay(FeatureLevelRecord record) {
getOrCreateFeaturesDelta().replay(record);
featuresDelta.metadataVersionChange().ifPresent(changedMetadataVersion -> {
// If any feature flags change, need to immediately check if any metadata needs to be downgraded.
getOrCreateClusterDelta().handleMetadataVersionChange(changedMetadataVersion);
getOrCreateConfigsDelta().handleMetadataVersionChange(changedMetadataVersion);
getOrCreateTopicsDelta().handleMetadataVersionChange(changedMetadataVersion);
getOrCreateClientQuotasDelta().handleMetadataVersionChange(changedMetadataVersion);
getOrCreateProducerIdsDelta().handleMetadataVersionChange(changedMetadataVersion);
getOrCreateAclsDelta().handleMetadataVersionChange(changedMetadataVersion);
});
}
public void replay(BrokerRegistrationChangeRecord record) {

View File

@ -120,6 +120,8 @@ public final class MetadataImage {
}
public void write(Consumer<List<ApiMessageAndVersion>> out) {
// Features should be written out first so we can include the metadata.version at the beginning of the
// snapshot
features.write(out);
cluster.write(out);
topics.write(out);

View File

@ -18,6 +18,7 @@
package org.apache.kafka.image;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.server.common.MetadataVersion;
public final class ProducerIdsDelta {
@ -39,6 +40,10 @@ public final class ProducerIdsDelta {
// Nothing to do
}
public void handleMetadataVersionChange(MetadataVersion newVersion) {
// no-op
}
public void replay(ProducerIdsRecord record) {
nextProducerId = record.nextProducerId();
}

View File

@ -24,6 +24,7 @@ import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.Collections;
import java.util.HashMap;
@ -117,6 +118,10 @@ public final class TopicsDelta {
}
}
public void handleMetadataVersionChange(MetadataVersion newVersion) {
// no-op
}
public TopicsImage apply() {
Map<Uuid, TopicImage> newTopicsById = new HashMap<>(image.topicsById().size());
Map<String, TopicImage> newTopicsByName = new HashMap<>(image.topicsByName().size());

View File

@ -37,6 +37,10 @@ public class VersionRange {
return new VersionRange(min, max);
}
public static VersionRange of(int min, int max) {
return new VersionRange((short) min, (short) max);
}
public short min() {
return min;
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.kafka.shell;
package org.apache.kafka.metadata.util;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.util;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.BatchMemoryPool;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.List;
import static org.apache.kafka.raft.KafkaRaftClient.MAX_BATCH_SIZE_BYTES;
/**
* Write an arbitrary set of metadata records into a Kafka metadata snapshot format. The resulting snapshot will be use
* epoch of zero and an initial offset of zero. This class should not be used for creating actual metadata snapshots.
*/
public class SnapshotFileWriter implements AutoCloseable {
private final FileChannel channel;
private final BatchAccumulator<ApiMessageAndVersion> batchAccumulator;
SnapshotFileWriter(FileChannel channel, BatchAccumulator<ApiMessageAndVersion> batchAccumulator) {
this.channel = channel;
this.batchAccumulator = batchAccumulator;
}
public void append(ApiMessageAndVersion apiMessageAndVersion) {
batchAccumulator.append(0, Collections.singletonList(apiMessageAndVersion));
}
public void append(List<ApiMessageAndVersion> messageBatch) {
batchAccumulator.append(0, messageBatch);
}
public void close() throws IOException {
for (BatchAccumulator.CompletedBatch<ApiMessageAndVersion> batch : batchAccumulator.drain()) {
Utils.writeFully(channel, batch.data.buffer());
}
channel.close();
}
public static SnapshotFileWriter open(Path snapshotPath) throws IOException {
BatchAccumulator<ApiMessageAndVersion> batchAccumulator = new BatchAccumulator<>(
0,
0,
Integer.MAX_VALUE,
MAX_BATCH_SIZE_BYTES,
new BatchMemoryPool(5, MAX_BATCH_SIZE_BYTES),
Time.SYSTEM,
CompressionType.NONE,
new MetadataRecordSerde());
FileChannel channel = FileChannel.open(snapshotPath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
return new SnapshotFileWriter(channel, batchAccumulator);
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Random;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class BootstrapMetadataTest {
@Test
public void testWriteAndReadBootstrapFile() throws Exception {
Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
BootstrapMetadata metadata = BootstrapMetadata.create(MetadataVersion.IBP_3_0_IV0);
BootstrapMetadata.write(metadata, tmpDir);
assertTrue(Files.exists(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE)));
BootstrapMetadata newMetadata = BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0);
assertEquals(metadata, newMetadata);
}
@Test
public void testNoBootstrapFile() throws Exception {
Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
BootstrapMetadata metadata = BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0);
assertEquals(MetadataVersion.IBP_3_0_IV0, metadata.metadataVersion());
metadata = BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_2_IV0);
assertEquals(MetadataVersion.IBP_3_2_IV0, metadata.metadataVersion());
}
@Test
public void testExistingBootstrapFile() throws Exception {
Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
BootstrapMetadata.write(BootstrapMetadata.create(MetadataVersion.IBP_3_0_IV0), tmpDir);
assertThrows(IOException.class, () -> {
BootstrapMetadata.write(BootstrapMetadata.create(MetadataVersion.IBP_3_1_IV0), tmpDir);
});
}
@Test
public void testEmptyBootstrapFile() throws Exception {
Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
Files.createFile(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE));
assertThrows(RuntimeException.class, () -> BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0),
"Should fail to load if no metadata.version is set");
}
@Test
public void testGarbageBootstrapFile() throws Exception {
Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
Files.createFile(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE));
Random random = new Random(1);
byte[] data = new byte[100];
random.nextBytes(data);
Files.write(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE), data, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
assertThrows(RuntimeException.class, () -> BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0),
"Should fail on invalid data");
}
}

View File

@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.protocol.Errors;
@ -33,6 +34,7 @@ import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@ -42,13 +44,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(value = 40)
public class FeatureControlManagerTest {
@SuppressWarnings("unchecked")
private static Map<String, VersionRange> rangeMap(Object... args) {
Map<String, VersionRange> result = new HashMap<>();
for (int i = 0; i < args.length; i += 3) {
String feature = (String) args[i];
Integer low = (Integer) args[i + 1];
Integer high = (Integer) args[i + 2];
Number low = (Number) args[i + 1];
Number high = (Number) args[i + 2];
result.put(feature, VersionRange.of(low.shortValue(), high.shortValue()));
}
return result;
@ -58,21 +61,23 @@ public class FeatureControlManagerTest {
Map<String, Short> result = new HashMap<>();
for (int i = 0; i < args.length; i += 2) {
String feature = (String) args[i];
Integer ver = (Integer) args[i + 1];
Number ver = (Number) args[i + 1];
result.put(feature, ver.shortValue());
}
return result;
}
public static QuorumFeatures features(Object... args) {
return QuorumFeatures.create(0, rangeMap(args));
Map<String, VersionRange> features = QuorumFeatures.defaultFeatureMap();
features.putAll(rangeMap(args));
return new QuorumFeatures(0, new ApiVersions(), features, Collections.emptyList());
}
private static Map<String, Short> updateMap(Object... args) {
Map<String, Short> result = new HashMap<>();
for (int i = 0; i < args.length; i += 2) {
String feature = (String) args[i];
Integer ver = (Integer) args[i + 1];
Number ver = (Number) args[i + 1];
result.put(feature, ver.shortValue());
}
return result;
@ -89,7 +94,7 @@ public class FeatureControlManagerTest {
manager.finalizedFeatures(-1));
assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
"The controller does not support the given feature range."))),
"Invalid update version 3 for feature foo. The quorum does not support the given feature version."))),
manager.updateFeatures(updateMap("foo", 3),
Collections.singletonMap("foo", FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
Collections.emptyMap(), false));
@ -99,7 +104,7 @@ public class FeatureControlManagerTest {
Map<String, ApiError> expectedMap = new HashMap<>();
expectedMap.put("foo", ApiError.NONE);
expectedMap.put("bar", new ApiError(Errors.INVALID_UPDATE_VERSION,
"The controller does not support the given feature."));
"Invalid update version 1 for feature bar. The controller does not support the given feature."));
assertEquals(expectedMap, result.response());
List<ApiMessageAndVersion> expectedMessages = new ArrayList<>();
expectedMessages.add(new ApiMessageAndVersion(new FeatureLevelRecord().
@ -138,7 +143,7 @@ public class FeatureControlManagerTest {
"foo",
new ApiError(
Errors.INVALID_UPDATE_VERSION,
"Broker 5 does not support the given feature range."
"Invalid update version 3 for feature foo. Broker 5 does not support this feature."
)
)
),
@ -157,8 +162,8 @@ public class FeatureControlManagerTest {
assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
"Can't downgrade the maximum version of this feature without setting the upgrade type to " +
"safe or unsafe downgrade."))),
"Invalid update version 2 for feature foo. Can't downgrade the version of this feature " +
"without setting the upgrade type to either safe or unsafe downgrade."))),
manager.updateFeatures(updateMap("foo", 2),
Collections.emptyMap(), Collections.emptyMap(), false));
@ -201,4 +206,81 @@ public class FeatureControlManagerTest {
setFeatureLevel((short) 1), (short) 0))),
manager.iterator(Long.MAX_VALUE));
}
@Test
public void testInitializeMetadataVersion() {
// Default QuorumFeatures
checkMetadataVersion(features(), MetadataVersion.IBP_3_0_IV0, Errors.NONE);
checkMetadataVersion(features(), MetadataVersion.latest(), Errors.NONE);
checkMetadataVersion(features(), MetadataVersion.UNINITIALIZED, Errors.INVALID_UPDATE_VERSION);
checkMetadataVersion(features(), MetadataVersion.IBP_2_7_IV1, Errors.INVALID_UPDATE_VERSION);
// Increased QuorumFeatures
QuorumFeatures features = features(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel());
checkMetadataVersion(features, MetadataVersion.IBP_3_0_IV0, Errors.INVALID_UPDATE_VERSION);
// Empty QuorumFeatures
features = new QuorumFeatures(0, new ApiVersions(), Collections.emptyMap(), Collections.emptyList());
checkMetadataVersion(features, MetadataVersion.latest(), Errors.INVALID_UPDATE_VERSION);
checkMetadataVersion(features, MetadataVersion.IBP_3_0_IV0, Errors.INVALID_UPDATE_VERSION);
}
@Test
public void reInitializeMetadataVersion() {
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
FeatureControlManager manager = new FeatureControlManager(logContext, features(), snapshotRegistry);
ControllerResult<Map<String, ApiError>> result = manager.initializeMetadataVersion(MetadataVersion.IBP_3_0_IV0.featureLevel());
Errors actual = result.response().get(MetadataVersion.FEATURE_NAME).error();
assertEquals(Errors.NONE, actual);
RecordTestUtils.replayAll(manager, result.records());
result = manager.initializeMetadataVersion(MetadataVersion.latest().featureLevel());
actual = result.response().get(MetadataVersion.FEATURE_NAME).error();
assertEquals(Errors.INVALID_UPDATE_VERSION, actual);
}
public void checkMetadataVersion(QuorumFeatures features, MetadataVersion version, Errors expected) {
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
FeatureControlManager manager = new FeatureControlManager(logContext, features, snapshotRegistry);
ControllerResult<Map<String, ApiError>> result = manager.initializeMetadataVersion(version.featureLevel());
Errors actual = result.response().get(MetadataVersion.FEATURE_NAME).error();
assertEquals(expected, actual);
}
@Test
public void testDowngradeMetadataVersion() {
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
QuorumFeatures features = features(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel());
FeatureControlManager manager = new FeatureControlManager(logContext, features, snapshotRegistry);
ControllerResult<Map<String, ApiError>> result = manager.initializeMetadataVersion(MetadataVersion.IBP_3_3_IV0.featureLevel());
RecordTestUtils.replayAll(manager, result.records());
assertEquals(manager.metadataVersion(), MetadataVersion.latest());
result = manager.updateFeatures(
Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel()),
Collections.singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
Collections.emptyMap(),
true);
assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().get(MetadataVersion.FEATURE_NAME).error());
result = manager.updateFeatures(
Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel()),
Collections.singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
Collections.emptyMap(),
true);
assertEquals(Errors.NONE, result.response().get(MetadataVersion.FEATURE_NAME).error());
result = manager.updateFeatures(
Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_0_IV0.featureLevel()),
Collections.singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
Collections.emptyMap(),
true);
assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().get(MetadataVersion.FEATURE_NAME).error());
assertEquals("Invalid update version 1 for feature metadata.version. The quorum does not support the given feature version.",
result.response().get(MetadataVersion.FEATURE_NAME).message());
}
}

View File

@ -67,6 +67,7 @@ import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
@ -86,6 +87,7 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
@ -210,7 +212,7 @@ public class QuorumControllerTest {
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
}, OptionalLong.of(sessionTimeoutMillis), OptionalLong.empty());
}, OptionalLong.of(sessionTimeoutMillis), OptionalLong.empty(), MetadataVersion.latest());
) {
ListenerCollection listeners = new ListenerCollection();
listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
@ -302,7 +304,7 @@ public class QuorumControllerTest {
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
}, OptionalLong.of(sessionTimeoutMillis), OptionalLong.of(leaderImbalanceCheckIntervalNs));
}, OptionalLong.of(sessionTimeoutMillis), OptionalLong.of(leaderImbalanceCheckIntervalNs), MetadataVersion.latest());
) {
ListenerCollection listeners = new ListenerCollection();
listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
@ -439,7 +441,7 @@ public class QuorumControllerTest {
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
setFeatures(brokerFeatures()).
setListeners(listeners));
assertEquals(0L, reply.get().epoch());
assertEquals(2L, reply.get().epoch());
CreateTopicsRequestData createTopicsRequestData =
new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(Collections.singleton(
@ -455,7 +457,7 @@ public class QuorumControllerTest {
get().topics().find("foo").errorMessage());
assertEquals(new BrokerHeartbeatReply(true, false, false, false),
active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
setWantFence(false).setBrokerEpoch(0L).setBrokerId(0).
setWantFence(false).setBrokerEpoch(2L).setBrokerId(0).
setCurrentMetadataOffset(100000L)).get());
assertEquals(Errors.NONE.code(), active.createTopics(ANONYMOUS_CONTEXT,
createTopicsRequestData, Collections.singleton("foo")).
@ -483,6 +485,10 @@ public class QuorumControllerTest {
private BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection();
features.add(new BrokerRegistrationRequestData.Feature()
.setName(MetadataVersion.FEATURE_NAME)
.setMinSupportedVersion(MetadataVersion.IBP_3_0_IV0.featureLevel())
.setMaxSupportedVersion(MetadataVersion.latest().featureLevel()));
return features;
}
@ -680,6 +686,9 @@ public class QuorumControllerTest {
private List<ApiMessageAndVersion> expectedSnapshotContent(Uuid fooId, Map<Integer, Long> brokerEpochs) {
return Arrays.asList(
new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.latest().featureLevel()), (short) 0),
new ApiMessageAndVersion(new TopicRecord().
setName("foo").setTopicId(fooId), (short) 0),
new ApiMessageAndVersion(new PartitionRecord().setPartitionId(0).

View File

@ -21,9 +21,12 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.controller.QuorumController.Builder;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,6 +36,8 @@ import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class QuorumControllerTestEnv implements AutoCloseable {
private static final Logger log =
@ -45,23 +50,27 @@ public class QuorumControllerTestEnv implements AutoCloseable {
LocalLogManagerTestEnv logEnv,
Consumer<QuorumController.Builder> builderConsumer
) throws Exception {
this(logEnv, builderConsumer, OptionalLong.empty(), OptionalLong.empty());
this(logEnv, builderConsumer, OptionalLong.empty(), OptionalLong.empty(), MetadataVersion.latest());
}
public QuorumControllerTestEnv(
LocalLogManagerTestEnv logEnv,
Consumer<Builder> builderConsumer,
OptionalLong sessionTimeoutMillis,
OptionalLong leaderImbalanceCheckIntervalNs
OptionalLong leaderImbalanceCheckIntervalNs,
MetadataVersion metadataVersion
) throws Exception {
this.logEnv = logEnv;
int numControllers = logEnv.logManagers().size();
this.controllers = new ArrayList<>(numControllers);
try {
ApiVersions apiVersions = new ApiVersions();
List<Integer> nodeIds = IntStream.range(0, numControllers).boxed().collect(Collectors.toList());
for (int i = 0; i < numControllers; i++) {
QuorumController.Builder builder = new QuorumController.Builder(i, logEnv.clusterId());
builder.setRaftClient(logEnv.logManagers().get(i));
builder.setQuorumFeatures(new QuorumFeatures(i, QuorumFeatures.defaultFeatureMap()));
builder.setBootstrapMetadata(BootstrapMetadata.create(metadataVersion));
builder.setQuorumFeatures(new QuorumFeatures(i, apiVersions, QuorumFeatures.defaultFeatureMap(), nodeIds));
sessionTimeoutMillis.ifPresent(timeout -> {
builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
});

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.metadata.VersionRange;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class QuorumFeaturesTest {
@Test
public void testQuorumFeatures() {
ApiVersions apiVersions = new ApiVersions();
Map<String, VersionRange> featureMap = new HashMap<>(2);
featureMap.put("foo", VersionRange.of(1, 2));
featureMap.put("bar", VersionRange.of(3, 5));
List<Integer> nodeIds = new ArrayList<>();
nodeIds.add(0);
QuorumFeatures quorumFeatures = new QuorumFeatures(0, apiVersions, featureMap, nodeIds);
assertLocalFeature(quorumFeatures, "foo", 1, 2);
assertLocalFeature(quorumFeatures, "bar", 3, 5);
assertQuorumFeature(quorumFeatures, "foo", 1, 2);
assertQuorumFeature(quorumFeatures, "bar", 3, 5);
// Add a second node with identical features
nodeIds.add(1);
apiVersions.update("1", nodeApiVersions(featureMap));
assertLocalFeature(quorumFeatures, "foo", 1, 2);
assertLocalFeature(quorumFeatures, "bar", 3, 5);
assertQuorumFeature(quorumFeatures, "foo", 1, 2);
assertQuorumFeature(quorumFeatures, "bar", 3, 5);
// Change the supported features of one node
Map<String, VersionRange> node1Features = new HashMap<>(featureMap);
node1Features.put("bar", VersionRange.of(3, 4));
apiVersions.update("1", nodeApiVersions(node1Features));
assertLocalFeature(quorumFeatures, "foo", 1, 2);
assertLocalFeature(quorumFeatures, "bar", 3, 5);
assertQuorumFeature(quorumFeatures, "foo", 1, 2);
assertQuorumFeature(quorumFeatures, "bar", 3, 4);
// Add a third node with no features
nodeIds.add(2);
apiVersions.update("1", NodeApiVersions.create());
assertFalse(quorumFeatures.quorumSupportedFeature("foo").isPresent());
assertFalse(quorumFeatures.quorumSupportedFeature("bar").isPresent());
}
public static NodeApiVersions nodeApiVersions(Map<String, VersionRange> featureMap) {
List<ApiVersionsResponseData.SupportedFeatureKey> supportedFeatures = new ArrayList<>(featureMap.size());
featureMap.forEach((featureName, versionRange) -> {
supportedFeatures.add(new ApiVersionsResponseData.SupportedFeatureKey()
.setName(featureName)
.setMinVersion(versionRange.min())
.setMaxVersion(versionRange.max()));
});
return new NodeApiVersions(Collections.emptyList(), supportedFeatures);
}
private void assertLocalFeature(QuorumFeatures features, String name, int expectedMin, int expectedMax) {
Optional<VersionRange> featureRange = features.localSupportedFeature(name);
assertTrue(featureRange.isPresent());
assertEquals(expectedMin, featureRange.get().min());
assertEquals(expectedMax, featureRange.get().max());
}
private void assertQuorumFeature(QuorumFeatures features, String name, int expectedMin, int expectedMax) {
Optional<VersionRange> featureRange = features.quorumSupportedFeature(name);
assertTrue(featureRange.isPresent());
assertEquals(expectedMin, featureRange.get().min());
assertEquals(expectedMax, featureRange.get().max());
}
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@ -46,7 +47,7 @@ public class FeaturesImageTest {
map1.put("foo", (short) 2);
map1.put("bar", (short) 1);
map1.put("baz", (short) 8);
IMAGE1 = new FeaturesImage(map1);
IMAGE1 = new FeaturesImage(map1, MetadataVersion.latest());
DELTA1_RECORDS = new ArrayList<>();
DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
@ -62,7 +63,7 @@ public class FeaturesImageTest {
Map<String, Short> map2 = new HashMap<>();
map2.put("foo", (short) 3);
IMAGE2 = new FeaturesImage(map2);
IMAGE2 = new FeaturesImage(map2, MetadataVersion.latest());
}
@Test

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
/**
* A callback for changes to feature levels. Currently, this is only used by the controller to receive a callback
* when committed FeatureLevelRecords are being replayed.
*/
public interface FeatureLevelListener {
void handle(String featureName, short finalizedVersion);
}

View File

@ -43,6 +43,8 @@ import org.apache.kafka.common.record.RecordVersion;
* released version, they can use "0.10.0" when upgrading to the 0.10.0 release.
*/
public enum MetadataVersion {
UNINITIALIZED(-1, "0.0", ""),
IBP_0_8_0(-1, "0.8.0", ""),
IBP_0_8_1(-1, "0.8.1", ""),
IBP_0_8_2(-1, "0.8.2", ""),
@ -138,38 +140,46 @@ public enum MetadataVersion {
IBP_2_8_IV1(-1, "2.8", "IV1"),
// Introduce AllocateProducerIds (KIP-730)
IBP_3_0_IV0(1, "3.0", "IV0"),
IBP_3_0_IV0(1, "3.0", "IV0", true),
// Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734)
// Assume message format version is 3.0 (KIP-724)
IBP_3_0_IV1(2, "3.0", "IV1"),
IBP_3_0_IV1(2, "3.0", "IV1", false),
// Adds topic IDs to Fetch requests/responses (KIP-516)
IBP_3_1_IV0(3, "3.1", "IV0"),
IBP_3_1_IV0(3, "3.1", "IV0", false),
// Support for leader recovery for unclean leader election (KIP-704)
IBP_3_2_IV0(4, "3.2", "IV0");
IBP_3_2_IV0(4, "3.2", "IV0", false),
public static final MetadataVersion[] VALUES = MetadataVersion.values();
private final Optional<Short> featureLevel;
// Support for metadata.version feature flag (KIP-778)
IBP_3_3_IV0(5, "3.3", "IV0", false);
public static final String FEATURE_NAME = "metadata.version";
public static final MetadataVersion[] VERSIONS;
private final short featureLevel;
private final String release;
private final String ibpVersion;
private final boolean didMetadataChange;
MetadataVersion(int featureLevel, String release, String subVersion) {
if (featureLevel > 0) {
this.featureLevel = Optional.of((short) featureLevel);
} else {
this.featureLevel = Optional.empty();
this(featureLevel, release, subVersion, true);
}
MetadataVersion(int featureLevel, String release, String subVersion, boolean didMetadataChange) {
this.featureLevel = (short) featureLevel;
this.release = release;
if (subVersion.isEmpty()) {
this.ibpVersion = release;
} else {
this.ibpVersion = String.format("%s-%s", release, subVersion);
}
this.didMetadataChange = didMetadataChange;
}
public Optional<Short> featureLevel() {
public short featureLevel() {
return featureLevel;
}
@ -201,6 +211,9 @@ public enum MetadataVersion {
return this.isAtLeast(IBP_3_0_IV0);
}
public boolean isKRaftSupported() {
return this.featureLevel > 0;
}
public RecordVersion highestSupportedRecordVersion() {
if (this.isLessThan(IBP_0_10_0_IV0)) {
@ -215,9 +228,13 @@ public enum MetadataVersion {
private static final Map<String, MetadataVersion> IBP_VERSIONS;
static {
{
// Make a copy of values() and omit UNINITIALIZED
MetadataVersion[] enumValues = MetadataVersion.values();
VERSIONS = Arrays.copyOfRange(enumValues, 1, enumValues.length);
IBP_VERSIONS = new HashMap<>();
Map<String, MetadataVersion> maxInterVersion = new HashMap<>();
for (MetadataVersion metadataVersion : VALUES) {
for (MetadataVersion metadataVersion : VERSIONS) {
maxInterVersion.put(metadataVersion.release, metadataVersion);
IBP_VERSIONS.put(metadataVersion.ibpVersion, metadataVersion);
}
@ -233,6 +250,19 @@ public enum MetadataVersion {
return ibpVersion;
}
public boolean didMetadataChange() {
return didMetadataChange;
}
Optional<MetadataVersion> previous() {
int idx = this.ordinal();
if (idx > 1) {
return Optional.of(VERSIONS[idx - 2]);
} else {
return Optional.empty();
}
}
/**
* Return an `MetadataVersion` instance for `versionString`, which can be in a variety of formats (e.g. "0.8.0", "0.8.0.x",
* "0.10.0", "0.10.0-IV1"). `IllegalArgumentException` is thrown if `versionString` cannot be mapped to an `MetadataVersion`.
@ -253,6 +283,15 @@ public enum MetadataVersion {
);
}
public static MetadataVersion fromFeatureLevel(short version) {
for (MetadataVersion metadataVersion: MetadataVersion.values()) {
if (metadataVersion.featureLevel() == version) {
return metadataVersion;
}
}
throw new IllegalArgumentException("No MetadataVersion with metadata version " + version);
}
/**
* Return the minimum `MetadataVersion` that supports `RecordVersion`.
*/
@ -270,7 +309,36 @@ public enum MetadataVersion {
}
public static MetadataVersion latest() {
return VALUES[VALUES.length - 1];
return VERSIONS[VERSIONS.length - 1];
}
public static boolean checkIfMetadataChanged(MetadataVersion sourceVersion, MetadataVersion targetVersion) {
if (sourceVersion == targetVersion) {
return false;
}
final MetadataVersion highVersion, lowVersion;
if (sourceVersion.compareTo(targetVersion) < 0) {
highVersion = targetVersion;
lowVersion = sourceVersion;
} else {
highVersion = sourceVersion;
lowVersion = targetVersion;
}
return checkIfMetadataChangedOrdered(highVersion, lowVersion);
}
private static boolean checkIfMetadataChangedOrdered(MetadataVersion highVersion, MetadataVersion lowVersion) {
MetadataVersion version = highVersion;
while (!version.didMetadataChange() && version != lowVersion) {
Optional<MetadataVersion> prev = version.previous();
if (prev.isPresent()) {
version = prev.get();
} else {
break;
}
}
return version != lowVersion;
}
public boolean isAtLeast(MetadataVersion otherVersion) {

View File

@ -34,7 +34,7 @@ public class MetadataVersionValidator implements Validator {
@Override
public String toString() {
return "[" + Arrays.stream(MetadataVersion.VALUES).map(MetadataVersion::version).collect(
return "[" + Arrays.stream(MetadataVersion.VERSIONS).map(MetadataVersion::version).collect(
Collectors.joining(", ")) + "]";
}
}

View File

@ -63,23 +63,22 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class MetadataVersionTest {
@Test
public void testFeatureLevel() {
int firstFeatureLevelIndex = Arrays.asList(MetadataVersion.VALUES).indexOf(IBP_3_0_IV0);
MetadataVersion[] metadataVersions = MetadataVersion.VERSIONS;
int firstFeatureLevelIndex = Arrays.asList(metadataVersions).indexOf(IBP_3_0_IV0);
for (int i = 0; i < firstFeatureLevelIndex; i++) {
assertFalse(MetadataVersion.VALUES[i].featureLevel().isPresent());
assertTrue(metadataVersions[i].featureLevel() < 0);
}
short expectedFeatureLevel = 1;
for (int i = firstFeatureLevelIndex; i < MetadataVersion.VALUES.length; i++) {
MetadataVersion metadataVersion = MetadataVersion.VALUES[i];
short featureLevel = metadataVersion.featureLevel().orElseThrow(() ->
new IllegalArgumentException(
String.format("Metadata version %s must have a non-null feature level", metadataVersion.version())));
assertEquals(expectedFeatureLevel, featureLevel,
String.format("Metadata version %s should have feature level %s", metadataVersion.version(), expectedFeatureLevel));
for (int i = firstFeatureLevelIndex; i < metadataVersions.length; i++) {
MetadataVersion metadataVersion = metadataVersions[i];
assertEquals(expectedFeatureLevel, metadataVersion.featureLevel(),
String.format("Metadata version %s should have feature level %s", metadataVersion.featureLevel(), expectedFeatureLevel));
expectedFeatureLevel += 1;
}
}
@ -264,4 +263,44 @@ class MetadataVersionTest {
assertEquals("3.2-IV0", IBP_3_2_IV0.version());
}
@Test
public void testPrevious() {
for (int i = 1; i < MetadataVersion.VERSIONS.length - 2; i++) {
MetadataVersion version = MetadataVersion.VERSIONS[i];
assertTrue(version.previous().isPresent());
assertEquals(MetadataVersion.VERSIONS[i - 1], version.previous().get());
}
}
@Test
public void testMetadataChanged() {
assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, IBP_3_2_IV0));
assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, IBP_3_1_IV0));
assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, IBP_3_0_IV1));
assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, IBP_3_0_IV0));
assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, IBP_2_8_IV1));
// Check that argument order doesn't matter
assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_0_IV0, IBP_3_2_IV0));
assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_2_8_IV1, IBP_3_2_IV0));
}
@Test
public void testKRaftVersions() {
for (MetadataVersion metadataVersion : MetadataVersion.VERSIONS) {
if (metadataVersion.isKRaftSupported()) {
assertTrue(metadataVersion.featureLevel() > 0);
} else {
assertEquals(-1, metadataVersion.featureLevel());
}
}
for (MetadataVersion metadataVersion : MetadataVersion.VERSIONS) {
if (metadataVersion.isAtLeast(IBP_3_0_IV0)) {
assertTrue(metadataVersion.isKRaftSupported());
} else {
assertFalse(metadataVersion.isKRaftSupported());
}
}
}
}

View File

@ -27,7 +27,7 @@ public class MetadataVersionValidatorTest {
public void testMetadataVersionValidator() {
String str = new MetadataVersionValidator().toString();
String[] apiVersions = str.substring(1).split(",");
assertEquals(MetadataVersion.VALUES.length, apiVersions.length);
assertEquals(MetadataVersion.VERSIONS.length, apiVersions.length);
}
}

View File

@ -24,6 +24,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.util.SnapshotFileReader;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;