diff --git a/build.gradle b/build.gradle
index 8fd32e68743..6983a971a79 100644
--- a/build.gradle
+++ b/build.gradle
@@ -447,7 +447,8 @@ subprojects {
maxParallelForks = maxTestForks
ignoreFailures = userIgnoreFailures
- maxHeapSize = defaultMaxHeapSize
+ // Increase heap size for integration tests
+ maxHeapSize = "2560m"
jvmArgs = defaultJvmArgs
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 36e5cc63551..28b325b093d 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -82,6 +82,7 @@
+
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 414e59a614d..7b5f20aea44 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -259,10 +259,12 @@
+
+
@@ -348,6 +350,7 @@
+
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 0cc6c831d94..c5cd99fdaa5 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -290,7 +290,7 @@
+ files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/>
+
unknownApis = new ArrayList<>();
+ private final Map supportedFeatures;
+
/**
* Create a NodeApiVersions object with the current ApiVersions.
*
@@ -72,7 +76,7 @@ public class NodeApiVersions {
}
if (!exists) apiVersions.add(ApiVersionsResponse.toApiVersion(apiKey));
}
- return new NodeApiVersions(apiVersions);
+ return new NodeApiVersions(apiVersions, Collections.emptyList());
}
@@ -91,7 +95,7 @@ public class NodeApiVersions {
.setMaxVersion(maxVersion)));
}
- public NodeApiVersions(ApiVersionCollection nodeApiVersions) {
+ public NodeApiVersions(Collection nodeApiVersions, Collection nodeSupportedFeatures) {
for (ApiVersion nodeApiVersion : nodeApiVersions) {
if (ApiKeys.hasId(nodeApiVersion.apiKey())) {
ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey());
@@ -101,18 +105,13 @@ public class NodeApiVersions {
unknownApis.add(nodeApiVersion);
}
}
- }
- public NodeApiVersions(Collection nodeApiVersions) {
- for (ApiVersion nodeApiVersion : nodeApiVersions) {
- if (ApiKeys.hasId(nodeApiVersion.apiKey())) {
- ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey());
- supportedVersions.put(nodeApiKey, nodeApiVersion);
- } else {
- // Newer brokers may support ApiKeys we don't know about
- unknownApis.add(nodeApiVersion);
- }
+ Map supportedFeaturesBuilder = new HashMap<>();
+ for (SupportedFeatureKey supportedFeature : nodeSupportedFeatures) {
+ supportedFeaturesBuilder.put(supportedFeature.name(),
+ new SupportedVersionRange(supportedFeature.minVersion(), supportedFeature.maxVersion()));
}
+ this.supportedFeatures = Collections.unmodifiableMap(supportedFeaturesBuilder);
}
/**
@@ -233,4 +232,8 @@ public class NodeApiVersions {
public Map allSupportedApiVersions() {
return supportedVersions;
}
+
+ public Map supportedFeatures() {
+ return supportedFeatures;
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
index 206e95e4d30..89065536435 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
@@ -48,7 +48,7 @@ public class ApiVersionsTest {
assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
// something that doesn't support PRODUCE, which is the case with Raft-based controllers
- apiVersions.update("2", new NodeApiVersions(Collections.singleton(
+ apiVersions.update("2", NodeApiVersions.create(Collections.singleton(
new ApiVersionsResponseData.ApiVersion()
.setApiKey(ApiKeys.FETCH.id)
.setMinVersion((short) 0)
diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index b04d83b47df..f379366ac16 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -38,7 +39,7 @@ public class NodeApiVersionsTest {
@Test
public void testUnsupportedVersionsToString() {
- NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection());
+ NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList());
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
@@ -67,7 +68,7 @@ public class NodeApiVersionsTest {
.setMaxVersion((short) 10001));
} else versionList.add(ApiVersionsResponse.toApiVersion(apiKey));
}
- NodeApiVersions versions = new NodeApiVersions(versionList);
+ NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList());
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.values()) {
@@ -124,7 +125,7 @@ public class NodeApiVersionsTest {
@Test
public void testUsableVersionCalculationNoKnownVersions() {
- NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection());
+ NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList());
assertThrows(UnsupportedVersionException.class,
() -> versions.latestUsableVersion(ApiKeys.FETCH));
}
@@ -146,7 +147,7 @@ public class NodeApiVersionsTest {
.setApiKey((short) 100)
.setMinVersion((short) 0)
.setMaxVersion((short) 1));
- NodeApiVersions versions = new NodeApiVersions(versionList);
+ NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList());
for (ApiKeys apiKey: ApiKeys.apisForListener(scope)) {
assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
}
@@ -156,7 +157,7 @@ public class NodeApiVersionsTest {
@EnumSource(ApiMessageType.ListenerType.class)
public void testConstructionFromApiVersionsResponse(ApiMessageType.ListenerType scope) {
ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope);
- NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys());
+ NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList());
for (ApiVersion apiVersionKey : apiVersionsResponse.data().apiKeys()) {
ApiVersion apiVersion = versions.apiVersion(ApiKeys.forId(apiVersionKey.apiKey()));
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 377db5ec06c..b6bf9e6f4f1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -153,7 +153,7 @@ public class TransactionManagerTest {
private void initializeTransactionManager(Optional transactionalId) {
Metrics metrics = new Metrics(time);
- apiVersions.update("0", new NodeApiVersions(Arrays.asList(
+ apiVersions.update("0", NodeApiVersions.create(Arrays.asList(
new ApiVersion()
.setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
.setMinVersion((short) 0)
@@ -2615,7 +2615,7 @@ public class TransactionManagerTest {
@Test
public void testTransitionToFatalErrorWhenRetriedBatchIsExpired() throws InterruptedException {
- apiVersions.update("0", new NodeApiVersions(Arrays.asList(
+ apiVersions.update("0", NodeApiVersions.create(Arrays.asList(
new ApiVersion()
.setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
.setMinVersion((short) 0)
@@ -2814,7 +2814,7 @@ public class TransactionManagerTest {
@Test
public void testAbortTransactionAndReuseSequenceNumberOnError() throws InterruptedException {
- apiVersions.update("0", new NodeApiVersions(Arrays.asList(
+ apiVersions.update("0", NodeApiVersions.create(Arrays.asList(
new ApiVersion()
.setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
.setMinVersion((short) 0)
@@ -2866,7 +2866,7 @@ public class TransactionManagerTest {
// Set the InitProducerId version such that bumping the epoch number is not supported. This will test the case
// where the sequence number is reset on an UnknownProducerId error, allowing subsequent transactions to
// append to the log successfully
- apiVersions.update("0", new NodeApiVersions(Arrays.asList(
+ apiVersions.update("0", NodeApiVersions.create(Arrays.asList(
new ApiVersion()
.setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
.setMinVersion((short) 0)
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index a8b536113b9..957cb2ce8bb 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -40,7 +40,6 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.common.utils.{KafkaThread, Time}
import org.apache.kafka.common.Node
-import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, MetadataRequest, MetadataResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -157,10 +156,10 @@ object BrokerApiVersionsCommand {
throw new RuntimeException(s"Request ${request.apiKey()} failed on brokers $bootstrapBrokers")
}
- private def getApiVersions(node: Node): ApiVersionCollection = {
+ private def getNodeApiVersions(node: Node): NodeApiVersions = {
val response = send(node, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
Errors.forCode(response.data.errorCode).maybeThrow()
- response.data.apiKeys
+ new NodeApiVersions(response.data.apiKeys, response.data.supportedFeatures)
}
/**
@@ -186,7 +185,7 @@ object BrokerApiVersionsCommand {
def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
findAllBrokers().map { broker =>
- broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker)))
+ broker -> Try[NodeApiVersions](getNodeApiVersions(broker))
}.toMap
def close(): Unit = {
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index 4c292500736..cbb9f7b89bf 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -111,6 +111,7 @@ class KafkaRaftManager[T](
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
) extends RaftManager[T] with Logging {
+ val apiVersions = new ApiVersions()
private val raftConfig = new RaftConfig(config)
private val threadNamePrefix = threadNamePrefixOpt.getOrElse("kafka-raft")
private val logContext = new LogContext(s"[RaftManager nodeId=${config.nodeId}] ")
@@ -274,7 +275,7 @@ class KafkaRaftManager[T](
config.connectionSetupTimeoutMaxMs,
time,
discoverBrokerVersions,
- new ApiVersions,
+ apiVersions,
logContext
)
}
diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala b/core/src/main/scala/kafka/server/BrokerFeatures.scala
index 9511172c7eb..ff7e60908c1 100644
--- a/core/src/main/scala/kafka/server/BrokerFeatures.scala
+++ b/core/src/main/scala/kafka/server/BrokerFeatures.scala
@@ -19,6 +19,7 @@ package kafka.server
import kafka.utils.Logging
import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.server.common.MetadataVersion
import java.util
import scala.jdk.CollectionConverters._
@@ -72,6 +73,12 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte
object BrokerFeatures extends Logging {
def createDefault(): BrokerFeatures = {
+ new BrokerFeatures(Features.supportedFeatures(
+ java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
+ new SupportedVersionRange(MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.latest().featureLevel()))))
+ }
+
+ def createEmpty(): BrokerFeatures = {
new BrokerFeatures(Features.emptySupportedFeatures())
}
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index e004996bf74..7a2913e9cf0 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -30,13 +30,14 @@ import kafka.security.CredentialProvider
import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPolicyClassNameProp}
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{CoreUtils, Logging}
+import org.apache.kafka.clients.ApiVersions
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Endpoint}
-import org.apache.kafka.controller.{Controller, QuorumController, QuorumControllerMetrics, QuorumFeatures}
+import org.apache.kafka.controller.{BootstrapMetadata, Controller, QuorumController, QuorumControllerMetrics, QuorumFeatures}
import org.apache.kafka.metadata.KafkaConfigSchema
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
@@ -63,6 +64,8 @@ class ControllerServer(
val threadNamePrefix: Option[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
val configSchema: KafkaConfigSchema,
+ val raftApiVersions: ApiVersions,
+ val bootstrapMetadata: BootstrapMetadata
) extends Logging with KafkaMetricsGroup {
import kafka.server.Server._
@@ -162,7 +165,8 @@ class ControllerServer(
alterConfigPolicy = Option(config.
getConfiguredInstance(AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))
- val quorumFeatures = QuorumFeatures.create(config.nodeId, QuorumFeatures.defaultFeatureMap())
+ val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get())
+ val quorumFeatures = QuorumFeatures.create(config.nodeId, raftApiVersions, QuorumFeatures.defaultFeatureMap(), controllerNodes)
val controllerBuilder = {
val leaderImbalanceCheckIntervalNs = if (config.autoLeaderRebalanceEnable) {
@@ -179,7 +183,7 @@ class ControllerServer(
setQuorumFeatures(quorumFeatures).
setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
setDefaultNumPartitions(config.numPartitions.intValue()).
- setIsLeaderRecoverySupported(config.interBrokerProtocolVersion.isAtLeast(IBP_3_2_IV0)).
+ setIsLeaderRecoverySupported(bootstrapMetadata.metadataVersion().isAtLeast(IBP_3_2_IV0)).
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
TimeUnit.MILLISECONDS)).
setSnapshotMaxNewRecordBytes(config.metadataSnapshotMaxNewRecordBytes).
@@ -188,7 +192,8 @@ class ControllerServer(
setCreateTopicPolicy(createTopicPolicy.asJava).
setAlterConfigPolicy(alterConfigPolicy.asJava).
setConfigurationValidator(new ControllerConfigurationValidator()).
- setStaticConfig(config.originals)
+ setStaticConfig(config.originals).
+ setBootstrapMetadata(bootstrapMetadata)
}
authorizer match {
case Some(a: ClusterMetadataAuthorizer) => controllerBuilder.setAuthorizer(a)
@@ -197,7 +202,6 @@ class ControllerServer(
controller = controllerBuilder.build()
quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
- val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
authorizer,
quotaManagers,
@@ -206,7 +210,7 @@ class ControllerServer(
raftManager,
config,
metaProperties,
- controllerNodes.toSeq,
+ controllerNodes.asScala.toSeq,
apiVersionManager)
controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
socketServer.dataPlaneRequestChannel,
diff --git a/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala b/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
index ee7337653c7..390110dba0f 100644
--- a/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
+++ b/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
@@ -22,10 +22,10 @@ import java.util.Collections
import kafka.utils.Logging
import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
import org.apache.kafka.image.FeaturesDelta
+import org.apache.kafka.server.common.MetadataVersion
import scala.concurrent.TimeoutException
import scala.math.max
-
import scala.compat.java8.OptionConverters._
// Raised whenever there was an error in updating the FinalizedFeatureCache with features.
@@ -144,6 +144,9 @@ class FinalizedFeatureCache(private val brokerFeatures: BrokerFeatures) extends
new FinalizedVersionRange(version, version))
}
}
+ featuresDelta.metadataVersionChange().ifPresent { metadataVersion =>
+ newFeatures.put(MetadataVersion.FEATURE_NAME, new FinalizedVersionRange(metadataVersion.featureLevel(), metadataVersion.featureLevel()))
+ }
featuresAndEpoch = Some(FinalizedFeaturesAndEpoch(Features.finalizedFeatures(
Collections.unmodifiableMap(newFeatures)), highestMetadataOffset))
}
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index a0dd19559c4..5a1c3087d38 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -18,7 +18,6 @@ package kafka.server
import java.io.File
import java.util.concurrent.CompletableFuture
-
import kafka.common.InconsistentNodeIdException
import kafka.log.{LogConfig, UnifiedLog}
import kafka.metrics.KafkaMetricsReporter
@@ -28,11 +27,13 @@ import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
import org.apache.kafka.common.utils.{AppInfoParser, Time}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
+import org.apache.kafka.controller.BootstrapMetadata
import org.apache.kafka.metadata.{KafkaConfigSchema, MetadataRecordSerde}
import org.apache.kafka.raft.RaftConfig
-import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import java.nio.file.Paths
import scala.collection.Seq
import scala.jdk.CollectionConverters._
@@ -54,7 +55,7 @@ class KafkaRaftServer(
KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals))
KafkaYammerMetrics.INSTANCE.configure(config.originals)
- private val (metaProps, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
+ private val (metaProps, bootstrapMetadata, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
private val metrics = Server.initializeMetrics(
config,
@@ -102,6 +103,8 @@ class KafkaRaftServer(
threadNamePrefix,
controllerQuorumVotersFuture,
KafkaRaftServer.configSchema,
+ raftManager.apiVersions,
+ bootstrapMetadata
))
} else {
None
@@ -149,7 +152,7 @@ object KafkaRaftServer {
* @return A tuple containing the loaded meta properties (which are guaranteed to
* be consistent across all log dirs) and the offline directories
*/
- def initializeLogDirs(config: KafkaConfig): (MetaProperties, Seq[String]) = {
+ def initializeLogDirs(config: KafkaConfig): (MetaProperties, BootstrapMetadata, Seq[String]) = {
val logDirs = (config.logDirs.toSet + config.metadataLogDir).toSeq
val (rawMetaProperties, offlineDirs) = BrokerMetadataCheckpoint.
getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false)
@@ -177,7 +180,15 @@ object KafkaRaftServer {
"If you intend to create a new broker, you should remove all data in your data directories (log.dirs).")
}
- (metaProperties, offlineDirs.toSeq)
+ // Load the bootstrap metadata file or, in the case of an upgrade from KRaft preview, bootstrap the
+ // metadata.version corresponding to a user-configured IBP.
+ val bootstrapMetadata = if (config.originals.containsKey(KafkaConfig.InterBrokerProtocolVersionProp)) {
+ BootstrapMetadata.load(Paths.get(config.metadataLogDir), config.interBrokerProtocolVersion)
+ } else {
+ BootstrapMetadata.load(Paths.get(config.metadataLogDir), MetadataVersion.IBP_3_0_IV0)
+ }
+
+ (metaProperties, bootstrapMetadata, offlineDirs.toSeq)
}
val configSchema = new KafkaConfigSchema(Map(
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index b1273ed628f..41ff21bb570 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -162,7 +162,7 @@ class KafkaServer(
private var _featureChangeListener: FinalizedFeatureChangeListener = null
- val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
+ val brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty()
val featureCache: FinalizedFeatureCache = new FinalizedFeatureCache(brokerFeatures)
override def brokerState: BrokerState = _brokerState
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 54a777f67f8..e8819e9c5be 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -118,9 +118,14 @@ class BrokerMetadataListener(
}
_publisher.foreach(publish)
+ // If we detected a change in metadata.version, generate a local snapshot
+ val metadataVersionChanged = Option(_delta.featuresDelta()).exists { featuresDelta =>
+ featuresDelta.metadataVersionChange().isPresent
+ }
+
snapshotter.foreach { snapshotter =>
_bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
- if (shouldSnapshot()) {
+ if (shouldSnapshot() || metadataVersionChanged) {
if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
_bytesSinceLastSnapshot = 0L
}
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index fb6bb615443..01418d2630f 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsImage}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.common.MetadataVersion
import scala.collection.mutable
@@ -133,19 +134,27 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
// Publish the new metadata image to the metadata cache.
metadataCache.setImage(newImage)
+ val metadataVersionLogMsg = newImage.features().metadataVersion() match {
+ case MetadataVersion.UNINITIALIZED => "un-initialized metadata.version"
+ case mv: MetadataVersion => s"metadata.version ${mv.featureLevel()}"
+ }
+
if (_firstPublish) {
- info(s"Publishing initial metadata at offset $highestOffsetAndEpoch.")
+ info(s"Publishing initial metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.")
// If this is the first metadata update we are applying, initialize the managers
// first (but after setting up the metadata cache).
initializeManagers()
} else if (isDebugEnabled) {
- debug(s"Publishing metadata at offset $highestOffsetAndEpoch.")
+ debug(s"Publishing metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.")
}
// Apply feature deltas.
Option(delta.featuresDelta()).foreach { featuresDelta =>
featureCache.update(featuresDelta, highestOffsetAndEpoch.offset)
+ featuresDelta.metadataVersionChange().ifPresent{ metadataVersion =>
+ info(s"Updating metadata.version to ${metadataVersion.featureLevel()} at offset $highestOffsetAndEpoch.")
+ }
}
// Apply topic deltas.
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala
index 28377d297cb..9b78faf6eda 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -19,48 +19,25 @@ package kafka.tools
import java.io.PrintStream
import java.nio.file.{Files, Paths}
-
import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties, RawMetaProperties}
import kafka.utils.{Exit, Logging}
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue}
+import net.sourceforge.argparse4j.inf.Namespace
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.controller.BootstrapMetadata
+import org.apache.kafka.server.common.MetadataVersion
import scala.collection.mutable
object StorageTool extends Logging {
def main(args: Array[String]): Unit = {
try {
- val parser = ArgumentParsers.
- newArgumentParser("kafka-storage").
- defaultHelp(true).
- description("The Kafka storage tool.")
- val subparsers = parser.addSubparsers().dest("command")
-
- val infoParser = subparsers.addParser("info").
- help("Get information about the Kafka log directories on this node.")
- val formatParser = subparsers.addParser("format").
- help("Format the Kafka log directories on this node.")
- subparsers.addParser("random-uuid").help("Print a random UUID.")
- List(infoParser, formatParser).foreach(parser => {
- parser.addArgument("--config", "-c").
- action(store()).
- required(true).
- help("The Kafka configuration file to use.")
- })
- formatParser.addArgument("--cluster-id", "-t").
- action(store()).
- required(true).
- help("The cluster ID to use.")
- formatParser.addArgument("--ignore-formatted", "-g").
- action(storeTrue())
-
- val namespace = parser.parseArgsOrFail(args)
+ val namespace = parseArguments(args)
val command = namespace.getString("command")
val config = Option(namespace.getString("config")).flatMap(
p => Some(new KafkaConfig(Utils.loadProps(p))))
-
command match {
case "info" =>
val directories = configToLogDirectories(config.get)
@@ -70,13 +47,17 @@ object StorageTool extends Logging {
case "format" =>
val directories = configToLogDirectories(config.get)
val clusterId = namespace.getString("cluster_id")
+ val metadataVersion = getMetadataVersion(namespace)
+ if (!metadataVersion.isKRaftSupported) {
+ throw new TerseFailure(s"Must specify a metadata version of at least 1.")
+ }
val metaProperties = buildMetadataProperties(clusterId, config.get)
val ignoreFormatted = namespace.getBoolean("ignore_formatted")
if (!configToSelfManagedMode(config.get)) {
throw new TerseFailure("The kafka configuration file appears to be for " +
"a legacy cluster. Formatting is only supported for clusters in KRaft mode.")
}
- Exit.exit(formatCommand(System.out, directories, metaProperties, ignoreFormatted ))
+ Exit.exit(formatCommand(System.out, directories, metaProperties, metadataVersion, ignoreFormatted))
case "random-uuid" =>
System.out.println(Uuid.randomUuid)
@@ -92,6 +73,37 @@ object StorageTool extends Logging {
}
}
+ def parseArguments(args: Array[String]): Namespace = {
+ val parser = ArgumentParsers.
+ newArgumentParser("kafka-storage").
+ defaultHelp(true).
+ description("The Kafka storage tool.")
+ val subparsers = parser.addSubparsers().dest("command")
+
+ val infoParser = subparsers.addParser("info").
+ help("Get information about the Kafka log directories on this node.")
+ val formatParser = subparsers.addParser("format").
+ help("Format the Kafka log directories on this node.")
+ subparsers.addParser("random-uuid").help("Print a random UUID.")
+ List(infoParser, formatParser).foreach(parser => {
+ parser.addArgument("--config", "-c").
+ action(store()).
+ required(true).
+ help("The Kafka configuration file to use.")
+ })
+ formatParser.addArgument("--cluster-id", "-t").
+ action(store()).
+ required(true).
+ help("The cluster ID to use.")
+ formatParser.addArgument("--ignore-formatted", "-g").
+ action(storeTrue())
+ formatParser.addArgument("--metadata-version", "-v").
+ action(store()).
+ help(s"The initial metadata.version to use. Default is (${MetadataVersion.latest().featureLevel()}).")
+
+ parser.parseArgsOrFail(args)
+ }
+
def configToLogDirectories(config: KafkaConfig): Seq[String] = {
val directories = new mutable.TreeSet[String]
directories ++= config.logDirs
@@ -101,6 +113,12 @@ object StorageTool extends Logging {
def configToSelfManagedMode(config: KafkaConfig): Boolean = config.processRoles.nonEmpty
+ def getMetadataVersion(namespace: Namespace): MetadataVersion = {
+ Option(namespace.getString("metadata_version")).
+ map(mv => MetadataVersion.fromFeatureLevel(mv.toShort)).
+ getOrElse(MetadataVersion.latest())
+ }
+
def infoCommand(stream: PrintStream, selfManagedMode: Boolean, directories: Seq[String]): Int = {
val problems = new mutable.ArrayBuffer[String]
val foundDirectories = new mutable.ArrayBuffer[String]
@@ -197,13 +215,16 @@ object StorageTool extends Logging {
case e: Throwable => throw new TerseFailure(s"Cluster ID string $clusterIdStr " +
s"does not appear to be a valid UUID: ${e.getMessage}")
}
- require(config.nodeId >= 0, s"The node.id must be set to a non-negative integer.")
+ if (config.nodeId < 0) {
+ throw new TerseFailure(s"The node.id must be set to a non-negative integer. We saw ${config.nodeId}")
+ }
new MetaProperties(effectiveClusterId.toString, config.nodeId)
}
def formatCommand(stream: PrintStream,
directories: Seq[String],
metaProperties: MetaProperties,
+ metadataVersion: MetadataVersion,
ignoreFormatted: Boolean): Int = {
if (directories.isEmpty) {
throw new TerseFailure("No log directories found in the configuration.")
@@ -231,6 +252,10 @@ object StorageTool extends Logging {
val metaPropertiesPath = Paths.get(directory, "meta.properties")
val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile)
checkpoint.write(metaProperties.toProperties)
+
+ val bootstrapMetadata = BootstrapMetadata.create(metadataVersion)
+ BootstrapMetadata.write(bootstrapMetadata, Paths.get(directory))
+
stream.println(s"Formatting ${directory}")
})
0
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 56104df8212..a72784c469a 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -90,7 +90,7 @@ class TestRaftServer(
time,
metrics,
Some(threadNamePrefix),
- CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters))
+ CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters)),
)
workloadGenerator = new RaftWorkloadGenerator(
diff --git a/core/src/test/java/kafka/test/ClusterConfig.java b/core/src/test/java/kafka/test/ClusterConfig.java
index 20b74cf4324..5830959283b 100644
--- a/core/src/test/java/kafka/test/ClusterConfig.java
+++ b/core/src/test/java/kafka/test/ClusterConfig.java
@@ -19,6 +19,7 @@ package kafka.test;
import kafka.test.annotation.Type;
import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.common.MetadataVersion;
import java.io.File;
import java.util.HashMap;
@@ -41,7 +42,7 @@ public class ClusterConfig {
private final SecurityProtocol securityProtocol;
private final String listenerName;
private final File trustStoreFile;
- private final String ibp;
+ private final MetadataVersion metadataVersion;
private final Properties serverProperties = new Properties();
private final Properties producerProperties = new Properties();
@@ -53,7 +54,7 @@ public class ClusterConfig {
ClusterConfig(Type type, int brokers, int controllers, String name, boolean autoStart,
SecurityProtocol securityProtocol, String listenerName, File trustStoreFile,
- String ibp) {
+ MetadataVersion metadataVersion) {
this.type = type;
this.brokers = brokers;
this.controllers = controllers;
@@ -62,7 +63,7 @@ public class ClusterConfig {
this.securityProtocol = securityProtocol;
this.listenerName = listenerName;
this.trustStoreFile = trustStoreFile;
- this.ibp = ibp;
+ this.metadataVersion = metadataVersion;
}
public Type clusterType() {
@@ -121,8 +122,8 @@ public class ClusterConfig {
return Optional.ofNullable(trustStoreFile);
}
- public Optional ibp() {
- return Optional.ofNullable(ibp);
+ public Optional metadataVersion() {
+ return Optional.ofNullable(metadataVersion);
}
public Properties brokerServerProperties(int brokerId) {
@@ -130,16 +131,16 @@ public class ClusterConfig {
}
public Map nameTags() {
- Map tags = new LinkedHashMap<>(3);
+ Map tags = new LinkedHashMap<>(4);
name().ifPresent(name -> tags.put("Name", name));
- ibp().ifPresent(ibp -> tags.put("IBP", ibp));
+ metadataVersion().ifPresent(mv -> tags.put("MetadataVersion", mv.toString()));
tags.put("Security", securityProtocol.name());
listenerName().ifPresent(listener -> tags.put("Listener", listener));
return tags;
}
public ClusterConfig copyOf() {
- ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, ibp);
+ ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion);
copy.serverProperties.putAll(serverProperties);
copy.producerProperties.putAll(producerProperties);
copy.consumerProperties.putAll(consumerProperties);
@@ -165,7 +166,7 @@ public class ClusterConfig {
private SecurityProtocol securityProtocol;
private String listenerName;
private File trustStoreFile;
- private String ibp;
+ private MetadataVersion metadataVersion;
Builder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol) {
this.type = type;
@@ -215,13 +216,13 @@ public class ClusterConfig {
return this;
}
- public Builder ibp(String ibp) {
- this.ibp = ibp;
+ public Builder metadataVersion(MetadataVersion metadataVersion) {
+ this.metadataVersion = metadataVersion;
return this;
}
public ClusterConfig build() {
- return new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, ibp);
+ return new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion);
}
}
}
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java
index 099d93280d7..a7052857c36 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -18,11 +18,13 @@
package kafka.test;
import kafka.network.SocketServer;
+import kafka.server.BrokerFeatures;
import kafka.test.annotation.ClusterTest;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import java.util.Collection;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -95,6 +97,11 @@ public interface ClusterInstance {
*/
SocketServer anyControllerSocketServer();
+ /**
+ * Return a mapping of the underlying broker IDs to their supported features
+ */
+ Map brokerFeatures();
+
/**
* The underlying object which is responsible for setting up and tearing down the cluster.
*/
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java
index 11336ab87a1..b83df127f5a 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterTest.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java
@@ -18,6 +18,7 @@
package kafka.test.annotation;
import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.TestTemplate;
import java.lang.annotation.Documented;
@@ -40,6 +41,6 @@ public @interface ClusterTest {
String name() default "";
SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
String listener() default "";
- String ibp() default "";
+ MetadataVersion metadataVersion() default MetadataVersion.UNINITIALIZED;
ClusterConfigProperty[] serverProperties() default {};
}
diff --git a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
index 293f00b035c..f0c1d9bbda3 100644
--- a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
+++ b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
@@ -25,6 +25,7 @@ import kafka.test.annotation.ClusterTemplate;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
+import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
@@ -194,8 +195,8 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
properties.put(property.key(), property.value());
}
- if (!annot.ibp().isEmpty()) {
- builder.ibp(annot.ibp());
+ if (!annot.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
+ builder.metadataVersion(annot.metadataVersion());
}
ClusterConfig config = builder.build();
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index 711143785c5..f0ca98a5f2d 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -18,6 +18,7 @@
package kafka.test.junit;
import kafka.network.SocketServer;
+import kafka.server.BrokerFeatures;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.test.ClusterConfig;
@@ -29,6 +30,7 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
import org.junit.jupiter.api.extension.Extension;
@@ -38,6 +40,7 @@ import scala.compat.java8.OptionConverters;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -83,6 +86,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
return Arrays.asList(
(BeforeTestExecutionCallback) context -> {
TestKitNodes nodes = new TestKitNodes.Builder().
+ setBootstrapMetadataVersion(clusterConfig.metadataVersion().orElse(MetadataVersion.latest())).
setNumBrokerNodes(clusterConfig.numBrokers()).
setNumControllerNodes(clusterConfig.numControllers()).build();
nodes.brokerNodes().forEach((brokerId, brokerNode) -> {
@@ -168,6 +172,14 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
.orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
}
+ @Override
+ public Map brokerFeatures() {
+ return brokers().collect(Collectors.toMap(
+ brokerServer -> brokerServer.config().nodeId(),
+ BrokerServer::brokerFeatures
+ ));
+ }
+
@Override
public ClusterType clusterType() {
return ClusterType.RAFT;
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index 68ec0410536..02f21906ed4 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -19,6 +19,7 @@ package kafka.test.junit;
import kafka.api.IntegrationTestHarness;
import kafka.network.SocketServer;
+import kafka.server.BrokerFeatures;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.test.ClusterConfig;
@@ -41,6 +42,7 @@ import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -106,7 +108,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
@Override
public Properties serverConfig() {
Properties props = clusterConfig.serverProperties();
- clusterConfig.ibp().ifPresent(ibp -> props.put(KafkaConfig.InterBrokerProtocolVersionProp(), ibp));
+ clusterConfig.metadataVersion().ifPresent(mv -> props.put(KafkaConfig.InterBrokerProtocolVersionProp(), mv.version()));
return props;
}
@@ -237,6 +239,14 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
.orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
}
+ @Override
+ public Map brokerFeatures() {
+ return servers().collect(Collectors.toMap(
+ brokerServer -> brokerServer.config().nodeId(),
+ KafkaServer::brokerFeatures
+ ));
+ }
+
@Override
public ClusterType clusterType() {
return ClusterType.ZK;
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index 1372006f195..1924579e172 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -34,10 +34,12 @@ import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.BootstrapMetadata;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -173,6 +175,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
String threadNamePrefix = String.format("controller%d_", node.id());
MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId().toString(), node.id());
TopicPartition metadataPartition = new TopicPartition(KafkaRaftServer.MetadataTopic(), 0);
+ BootstrapMetadata bootstrapMetadata = BootstrapMetadata.create(nodes.bootstrapMetadataVersion());
KafkaRaftManager raftManager = new KafkaRaftManager<>(
metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(),
Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
@@ -184,7 +187,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
new Metrics(),
Option.apply(threadNamePrefix),
connectFutureManager.future,
- KafkaRaftServer.configSchema()
+ KafkaRaftServer.configSchema(),
+ raftManager.apiVersions(),
+ bootstrapMetadata
);
controllers.put(node.id(), controller);
controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
@@ -335,6 +340,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
StorageTool.formatCommand(out,
JavaConverters.asScalaBuffer(Collections.singletonList(metadataLogDir)).toSeq(),
properties,
+ MetadataVersion.IBP_3_0_IV0,
false);
} finally {
for (String line : stream.toString().split(String.format("%n"))) {
diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java b/core/src/test/java/kafka/testkit/TestKitNodes.java
index d52b8002337..f91e62d1798 100644
--- a/core/src/test/java/kafka/testkit/TestKitNodes.java
+++ b/core/src/test/java/kafka/testkit/TestKitNodes.java
@@ -20,6 +20,7 @@ package kafka.testkit;
import kafka.server.MetaProperties;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.server.common.MetadataVersion;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -33,6 +34,7 @@ import java.util.TreeMap;
public class TestKitNodes {
public static class Builder {
private Uuid clusterId = null;
+ private MetadataVersion bootstrapMetadataVersion = null;
private final NavigableMap controllerNodes = new TreeMap<>();
private final NavigableMap brokerNodes = new TreeMap<>();
@@ -41,6 +43,11 @@ public class TestKitNodes {
return this;
}
+ public Builder setBootstrapMetadataVersion(MetadataVersion metadataVersion) {
+ this.bootstrapMetadataVersion = metadataVersion;
+ return this;
+ }
+
public Builder addNodes(TestKitNode[] nodes) {
for (TestKitNode node : nodes) {
addNode(node);
@@ -103,18 +110,24 @@ public class TestKitNodes {
if (clusterId == null) {
clusterId = Uuid.randomUuid();
}
- return new TestKitNodes(clusterId, controllerNodes, brokerNodes);
+ if (bootstrapMetadataVersion == null) {
+ bootstrapMetadataVersion = MetadataVersion.latest();
+ }
+ return new TestKitNodes(clusterId, bootstrapMetadataVersion, controllerNodes, brokerNodes);
}
}
private final Uuid clusterId;
+ private final MetadataVersion bootstrapMetadataVersion;
private final NavigableMap controllerNodes;
private final NavigableMap brokerNodes;
private TestKitNodes(Uuid clusterId,
+ MetadataVersion bootstrapMetadataVersion,
NavigableMap controllerNodes,
NavigableMap brokerNodes) {
this.clusterId = clusterId;
+ this.bootstrapMetadataVersion = bootstrapMetadataVersion;
this.controllerNodes = controllerNodes;
this.brokerNodes = brokerNodes;
}
@@ -123,6 +136,10 @@ public class TestKitNodes {
return clusterId;
}
+ public MetadataVersion bootstrapMetadataVersion() {
+ return bootstrapMetadataVersion;
+ }
+
public Map controllerNodes() {
return controllerNodes;
}
@@ -161,7 +178,7 @@ public class TestKitNodes {
node.incarnationId(), absolutize(baseDirectory, node.metadataDirectory()),
absolutize(baseDirectory, node.logDataDirectories()), node.propertyOverrides()));
}
- return new TestKitNodes(clusterId, newControllerNodes, newBrokerNodes);
+ return new TestKitNodes(clusterId, bootstrapMetadataVersion, newControllerNodes, newBrokerNodes);
}
private static List absolutize(String base, Collection directories) {
diff --git a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
index be9f159b862..3b97ee8398b 100644
--- a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.message.InitProducerIdRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{InitProducerIdRequest, InitProducerIdResponse}
+import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.extension.ExtendWith
@@ -43,9 +44,9 @@ class ProducerIdsIntegrationTest {
}
@ClusterTests(Array(
- new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "2.8"),
- new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "3.0-IV0"),
- new ClusterTest(clusterType = Type.KRAFT, brokers = 3, ibp = "3.0-IV0")
+ new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1),
+ new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0),
+ new ClusterTest(clusterType = Type.KRAFT, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0)
))
def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
verifyUniqueIds(clusterInstance)
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index c62dbd52843..69688da4f44 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -30,18 +30,19 @@ import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, Descr
import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Tag, Test, Timeout}
-import java.util
-import java.util.concurrent.ExecutionException
-import java.util.{Arrays, Collections, Optional}
+import java.util
+import java.util.{Arrays, Collections, Optional}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.protocol.Errors._
+import org.apache.kafka.server.common.MetadataVersion
import org.slf4j.LoggerFactory
import scala.annotation.nowarn
import scala.collection.mutable
+import scala.concurrent.ExecutionException
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
import scala.jdk.CollectionConverters._
@@ -69,8 +70,8 @@ class KRaftClusterTest {
def testCreateClusterAndWaitForBrokerInRunningState(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
- setNumBrokerNodes(3).
- setNumControllerNodes(3).build()).build()
+ setNumBrokerNodes(1).
+ setNumControllerNodes(1).build()).build()
try {
cluster.format()
cluster.startup()
@@ -292,6 +293,17 @@ class KRaftClusterTest {
}
}
+ @Test
+ def testCreateClusterInvalidMetadataVersion(): Unit = {
+ assertThrows(classOf[IllegalArgumentException], () => {
+ new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setBootstrapMetadataVersion(MetadataVersion.IBP_2_7_IV0).
+ setNumBrokerNodes(1).
+ setNumControllerNodes(1).build()).build()
+ })
+ }
+
private def doOnStartedKafkaCluster(numControllerNodes: Int = 1,
numBrokerNodes: Int,
brokerPropertyOverrides: (TestKitNodes, BrokerNode) => Map[String, String])
diff --git a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
new file mode 100644
index 00000000000..b671e5d5e35
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterTest, ClusterTests, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType
+import org.apache.kafka.clients.admin.{FeatureUpdate, UpdateFeaturesOptions}
+import org.apache.kafka.server.common.MetadataVersion
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.extension.ExtendWith
+
+import scala.jdk.CollectionConverters._
+
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class MetadataVersionIntegrationTest {
+ @ClusterTests(value = Array(
+ new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_0_IV0),
+ new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_1_IV0),
+ new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_2_IV0)
+ ))
+ def testBasicMetadataVersionUpgrade(clusterInstance: ClusterInstance): Unit = {
+ val admin = clusterInstance.createAdminClient()
+ val describeResult = admin.describeFeatures()
+ val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
+ assertEquals(ff.minVersionLevel(), clusterInstance.config().metadataVersion().get().featureLevel())
+ assertEquals(ff.maxVersionLevel(), clusterInstance.config().metadataVersion().get().featureLevel())
+
+ // Update to new version
+ val updateVersion = MetadataVersion.IBP_3_3_IV0.featureLevel.shortValue
+ val updateResult = admin.updateFeatures(
+ Map("metadata.version" -> new FeatureUpdate(updateVersion, UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
+ updateResult.all().get()
+
+ // Verify that new version is visible on broker
+ TestUtils.waitUntilTrue(() => {
+ val describeResult2 = admin.describeFeatures()
+ val ff2 = describeResult2.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
+ ff2.minVersionLevel() == updateVersion && ff2.maxVersionLevel() == updateVersion
+ }, "Never saw metadata.version increase on broker")
+ }
+
+ @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV0)
+ def testUpgradeSameVersion(clusterInstance: ClusterInstance): Unit = {
+ val admin = clusterInstance.createAdminClient()
+ val updateVersion = MetadataVersion.IBP_3_3_IV0.featureLevel.shortValue
+ val updateResult = admin.updateFeatures(
+ Map("metadata.version" -> new FeatureUpdate(updateVersion, UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
+ updateResult.all().get()
+ }
+
+ @ClusterTest(clusterType = Type.KRAFT)
+ def testDefaultIsLatestVersion(clusterInstance: ClusterInstance): Unit = {
+ val admin = clusterInstance.createAdminClient()
+ val describeResult = admin.describeFeatures()
+ val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
+ assertEquals(ff.minVersionLevel(), MetadataVersion.latest().featureLevel())
+ assertEquals(ff.maxVersionLevel(), MetadataVersion.latest().featureLevel())
+ }
+}
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 9f00c0564cf..b4ec1dbc87f 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -22,7 +22,6 @@ import java.net.InetSocketAddress
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent.CompletableFuture
-
import javax.security.auth.login.Configuration
import kafka.raft.KafkaRaftManager
import kafka.tools.StorageTool
@@ -33,15 +32,18 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{Exit, Time}
+import org.apache.kafka.controller.BootstrapMetadata
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec}
-import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, TestInfo}
+import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, immutable}
+import scala.jdk.CollectionConverters._
trait QuorumImplementation {
def createBroker(config: KafkaConfig,
@@ -114,6 +116,10 @@ abstract class QuorumTestHarness extends Logging {
Seq(new Properties())
}
+ protected def metadataVersion: MetadataVersion = MetadataVersion.latest()
+
+ val bootstrapRecords: ListBuffer[ApiMessageAndVersion] = ListBuffer()
+
private var implementation: QuorumImplementation = null
def isKRaftTest(): Boolean = implementation.isInstanceOf[KRaftQuorumImplementation]
@@ -227,7 +233,7 @@ abstract class QuorumTestHarness extends Logging {
var out: PrintStream = null
try {
out = new PrintStream(stream)
- if (StorageTool.formatCommand(out, directories, metaProperties, false) != 0) {
+ if (StorageTool.formatCommand(out, directories, metaProperties, metadataVersion, ignoreFormatted = false) != 0) {
throw new RuntimeException(stream.toString())
}
debug(s"Formatted storage directory(ies) ${directories}")
@@ -282,6 +288,8 @@ abstract class QuorumTestHarness extends Logging {
threadNamePrefix = Option(threadNamePrefix),
controllerQuorumVotersFuture = controllerQuorumVotersFuture,
configSchema = KafkaRaftServer.configSchema,
+ raftApiVersions = raftManager.apiVersions,
+ bootstrapMetadata = BootstrapMetadata.create(metadataVersion, bootstrapRecords.asJava),
)
controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e) => {
if (e != null) {
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
index f237335bacf..a77b42e46e6 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
@@ -163,7 +163,7 @@ class ControllerChannelManagerTest {
def testLeaderAndIsrInterBrokerProtocolVersion(): Unit = {
testLeaderAndIsrRequestFollowsInterBrokerProtocolVersion(MetadataVersion.latest, ApiKeys.LEADER_AND_ISR.latestVersion)
- for (metadataVersion <- MetadataVersion.VALUES) {
+ for (metadataVersion <- MetadataVersion.VERSIONS) {
val leaderAndIsrRequestVersion: Short =
if (metadataVersion.isAtLeast(IBP_3_2_IV0)) 6
else if (metadataVersion.isAtLeast(IBP_2_8_IV1)) 5
@@ -380,7 +380,7 @@ class ControllerChannelManagerTest {
def testUpdateMetadataInterBrokerProtocolVersion(): Unit = {
testUpdateMetadataFollowsInterBrokerProtocolVersion(MetadataVersion.latest, ApiKeys.UPDATE_METADATA.latestVersion)
- for (metadataVersion <- MetadataVersion.VALUES) {
+ for (metadataVersion <- MetadataVersion.VERSIONS) {
val updateMetadataRequestVersion: Short =
if (metadataVersion.isAtLeast(IBP_2_8_IV1)) 7
else if (metadataVersion.isAtLeast(IBP_2_4_IV1)) 6
@@ -474,7 +474,7 @@ class ControllerChannelManagerTest {
@Test
def testStopReplicaRequestsWhileTopicQueuedForDeletion(): Unit = {
- for (metadataVersion <- MetadataVersion.VALUES) {
+ for (metadataVersion <- MetadataVersion.VERSIONS) {
testStopReplicaRequestsWhileTopicQueuedForDeletion(metadataVersion)
}
}
@@ -521,7 +521,7 @@ class ControllerChannelManagerTest {
@Test
def testStopReplicaRequestsWhileTopicDeletionStarted(): Unit = {
- for (metadataVersion <- MetadataVersion.VALUES) {
+ for (metadataVersion <- MetadataVersion.VERSIONS) {
testStopReplicaRequestsWhileTopicDeletionStarted(metadataVersion)
}
}
@@ -576,7 +576,7 @@ class ControllerChannelManagerTest {
@Test
def testStopReplicaRequestWithoutDeletePartitionWhileTopicDeletionStarted(): Unit = {
- for (metadataVersion <- MetadataVersion.VALUES) {
+ for (metadataVersion <- MetadataVersion.VERSIONS) {
testStopReplicaRequestWithoutDeletePartitionWhileTopicDeletionStarted(metadataVersion)
}
}
@@ -626,7 +626,7 @@ class ControllerChannelManagerTest {
testMixedDeleteAndNotDeleteStopReplicaRequests(MetadataVersion.latest,
ApiKeys.STOP_REPLICA.latestVersion)
- for (metadataVersion <- MetadataVersion.VALUES) {
+ for (metadataVersion <- MetadataVersion.VERSIONS) {
if (metadataVersion.isLessThan(IBP_2_2_IV0))
testMixedDeleteAndNotDeleteStopReplicaRequests(metadataVersion, 0.toShort)
else if (metadataVersion.isLessThan(IBP_2_4_IV1))
@@ -775,7 +775,7 @@ class ControllerChannelManagerTest {
def testStopReplicaInterBrokerProtocolVersion(): Unit = {
testStopReplicaFollowsInterBrokerProtocolVersion(MetadataVersion.latest, ApiKeys.STOP_REPLICA.latestVersion)
- for (metadataVersion <- MetadataVersion.VALUES) {
+ for (metadataVersion <- MetadataVersion.VERSIONS) {
if (metadataVersion.isLessThan(IBP_2_2_IV0))
testStopReplicaFollowsInterBrokerProtocolVersion(metadataVersion, 0.toShort)
else if (metadataVersion.isLessThan(IBP_2_4_IV1))
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 250f22a24fa..688d6e83b0d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -1055,7 +1055,7 @@ class GroupMetadataManagerTest {
val protocol = "range"
val memberId = "memberId"
- for (metadataVersion <- MetadataVersion.VALUES) {
+ for (metadataVersion <- MetadataVersion.VERSIONS) {
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, metadataVersion = metadataVersion)
val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time)
@@ -2276,7 +2276,7 @@ class GroupMetadataManagerTest {
assertEquals(expectedLeaderEpoch, deserializedOffsetAndMetadata.leaderEpoch)
}
- for (version <- MetadataVersion.VALUES) {
+ for (version <- MetadataVersion.VERSIONS) {
val expectedSchemaVersion = version match {
case v if v.isLessThan(IBP_2_1_IV0) => 1
case v if v.isLessThan(IBP_2_1_IV1) => 2
@@ -2307,7 +2307,7 @@ class GroupMetadataManagerTest {
assertEquals(offsetAndMetadata, deserializedOffsetAndMetadata)
}
- for (version <- MetadataVersion.VALUES)
+ for (version <- MetadataVersion.VERSIONS)
verifySerde(version)
}
@@ -2335,7 +2335,7 @@ class GroupMetadataManagerTest {
assertEquals(offsetAndMetadata, deserializedOffsetAndMetadata)
}
- for (version <- MetadataVersion.VALUES)
+ for (version <- MetadataVersion.VERSIONS)
verifySerde(version)
}
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index a7a9519455a..f8fac503d6e 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -18,7 +18,6 @@ package kafka.raft
import java.util.concurrent.CompletableFuture
import java.util.Properties
-
import kafka.raft.KafkaRaftManager.RaftIoThread
import kafka.server.{KafkaConfig, MetaProperties}
import kafka.tools.TestRaftServer.ByteArraySerde
diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index 530bc235b38..99d593ede65 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -77,7 +77,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
ApiVersionsResponse.intersectForwardableApis(
ApiMessageType.ListenerType.BROKER,
RecordVersion.current,
- new NodeApiVersions(ApiKeys.controllerApis().asScala.map(ApiVersionsResponse.toApiVersion).asJava).allSupportedApiVersions()
+ NodeApiVersions.create(ApiKeys.controllerApis().asScala.map(ApiVersionsResponse.toApiVersion).asJava).allSupportedApiVersions()
)
}
diff --git a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
index 10d69e2cd61..eab3928483f 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
@@ -27,7 +27,7 @@ class BrokerFeaturesTest {
@Test
def testEmpty(): Unit = {
- assertTrue(BrokerFeatures.createDefault().supportedFeatures.empty)
+ assertTrue(BrokerFeatures.createEmpty().supportedFeatures.empty)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index dd3e49d4d16..1a0fac443c0 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -73,7 +73,7 @@ class BrokerLifecycleManagerTest {
val metadata = new Metadata(1000, 1000, new LogContext(), new ClusterResourceListeners())
val mockClient = new MockClient(time, metadata)
val controllerNodeProvider = new SimpleControllerNodeProvider()
- val nodeApiVersions = new NodeApiVersions(Seq(BROKER_REGISTRATION, BROKER_HEARTBEAT).map {
+ val nodeApiVersions = NodeApiVersions.create(Seq(BROKER_REGISTRATION, BROKER_HEARTBEAT).map {
apiKey => new ApiVersion().setApiKey(apiKey.id).
setMinVersion(apiKey.oldestVersion()).setMaxVersion(apiKey.latestVersion())
}.toList.asJava)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e879a7f6ffc..36dff7a6798 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -698,8 +698,8 @@ class KafkaConfigTest {
KafkaConfig.fromProps(props)
}
- MetadataVersion.VALUES.foreach { interBrokerVersion =>
- MetadataVersion.VALUES.foreach { messageFormatVersion =>
+ MetadataVersion.VERSIONS.foreach { interBrokerVersion =>
+ MetadataVersion.VERSIONS.foreach { messageFormatVersion =>
if (interBrokerVersion.highestSupportedRecordVersion.value >= messageFormatVersion.highestSupportedRecordVersion.value) {
val config = buildConfig(interBrokerVersion, messageFormatVersion)
assertEquals(interBrokerVersion, config.interBrokerProtocolVersion)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index b4dac22c6c5..f997455f0b9 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -19,11 +19,11 @@ package kafka.server
import java.io.File
import java.nio.file.Files
import java.util.Properties
-
import kafka.common.{InconsistentBrokerMetadataException, InconsistentNodeIdException}
import kafka.log.UnifiedLog
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.controller.BootstrapMetadata
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -44,7 +44,7 @@ class KafkaRaftServerTest {
configProperties.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9093")
configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
- val (loadedMetaProperties, offlineDirs) =
+ val (loadedMetaProperties, _, offlineDirs) =
invokeLoadMetaProperties(metaProperties, configProperties)
assertEquals(metaProperties, loadedMetaProperties)
@@ -72,7 +72,7 @@ class KafkaRaftServerTest {
private def invokeLoadMetaProperties(
metaProperties: MetaProperties,
configProperties: Properties
- ): (MetaProperties, collection.Seq[String]) = {
+ ): (MetaProperties, BootstrapMetadata, collection.Seq[String]) = {
val tempLogDir = TestUtils.tempDirectory()
try {
writeMetaProperties(tempLogDir, metaProperties)
@@ -159,7 +159,7 @@ class KafkaRaftServerTest {
configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
val config = KafkaConfig.fromProps(configProperties)
- val (loadedProperties, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
+ val (loadedProperties, _, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
assertEquals(nodeId, loadedProperties.nodeId)
assertEquals(Seq(invalidDir.getAbsolutePath), offlineDirs)
}
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 079c0d56354..c8c7a89df77 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -50,7 +50,7 @@ import scala.collection.Seq
class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness with Logging {
// Set this to IBP_0_11_0_IV1 to demonstrate the tests failing in the pre-KIP-101 case
- val metadataVersion = MetadataVersion.latest
+ override def metadataVersion = MetadataVersion.latest
val topic = "topic1"
val msg = new Array[Byte](1000)
val msgBigger = new Array[Byte](10000)
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 0242c33dab9..9f750313411 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -26,6 +26,7 @@ import java.util.Properties
import kafka.server.{KafkaConfig, MetaProperties}
import kafka.utils.TestUtils
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.{Test, Timeout}
@@ -160,11 +161,11 @@ Found problem:
clusterId = "XcZZOzUqS4yHOjhMQB6JLQ", nodeId = 2)
val stream = new ByteArrayOutputStream()
assertEquals(0, StorageTool.
- formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, false))
+ formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, MetadataVersion.latest(), ignoreFormatted = false))
assertEquals("Formatting %s%n".format(tempDir), stream.toString())
try assertEquals(1, StorageTool.
- formatCommand(new PrintStream(new ByteArrayOutputStream()), Seq(tempDir.toString), metaProperties, false)) catch {
+ formatCommand(new PrintStream(new ByteArrayOutputStream()), Seq(tempDir.toString), metaProperties, MetadataVersion.latest(), ignoreFormatted = false)) catch {
case e: TerseFailure => assertEquals(s"Log directory ${tempDir} is already " +
"formatted. Use --ignore-formatted to ignore this directory and format the " +
"others.", e.getMessage)
@@ -172,7 +173,7 @@ Found problem:
val stream2 = new ByteArrayOutputStream()
assertEquals(0, StorageTool.
- formatCommand(new PrintStream(stream2), Seq(tempDir.toString), metaProperties, true))
+ formatCommand(new PrintStream(stream2), Seq(tempDir.toString), metaProperties, MetadataVersion.latest(), ignoreFormatted = true))
assertEquals("All of the log directories are already formatted.%n".format(), stream2.toString())
} finally Utils.delete(tempDir)
}
@@ -185,4 +186,19 @@ Found problem:
"16 bytes of a base64-encoded UUID", assertThrows(classOf[TerseFailure],
() => StorageTool.buildMetadataProperties("invalid", config)).getMessage)
}
+
+ @Test
+ def testDefaultMetadataVersion(): Unit = {
+ var namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ"))
+ var mv = StorageTool.getMetadataVersion(namespace)
+ assertEquals(MetadataVersion.latest().featureLevel(), mv.featureLevel(),
+ "Expected the default metadata.version to be the latest version")
+
+ namespace = StorageTool.parseArguments(Array("format", "-c", "config.props",
+ "--metadata-version", MetadataVersion.latest().featureLevel().toString, "-t", "XcZZOzUqS4yHOjhMQB6JLQ"))
+ mv = StorageTool.getMetadataVersion(namespace)
+ assertEquals(MetadataVersion.latest().featureLevel(), mv.featureLevel(),
+ "Expected the default metadata.version to be the latest version")
+
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java b/metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java
new file mode 100644
index 00000000000..fa031c525f1
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.metadata.util.SnapshotFileReader;
+import org.apache.kafka.metadata.util.SnapshotFileWriter;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Stream;
+
+
+/**
+ * A read-only class that holds the controller bootstrap metadata. A file named "bootstrap.snapshot" is used and the
+ * format is the same as a KRaft snapshot.
+ */
+public class BootstrapMetadata {
+ private static final Logger log = LoggerFactory.getLogger(BootstrapMetadata.class);
+
+ public static final String BOOTSTRAP_FILE = "bootstrap.checkpoint";
+
+ private final MetadataVersion metadataVersion;
+
+ private final List records;
+
+ BootstrapMetadata(MetadataVersion metadataVersion, List records) {
+ this.metadataVersion = metadataVersion;
+ this.records = Collections.unmodifiableList(records);
+ }
+
+ public MetadataVersion metadataVersion() {
+ return this.metadataVersion;
+ }
+
+ public List records() {
+ return records;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ BootstrapMetadata metadata = (BootstrapMetadata) o;
+ return metadataVersion == metadata.metadataVersion;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(metadataVersion);
+ }
+
+ @Override
+ public String toString() {
+ return "BootstrapMetadata{" +
+ "metadataVersion=" + metadataVersion +
+ '}';
+ }
+
+ /**
+ * A raft client listener that simply collects all of the commits and snapshots into a mapping of
+ * metadata record type to list of records.
+ */
+ private static class BootstrapListener implements RaftClient.Listener {
+ private final List records = new ArrayList<>();
+
+ @Override
+ public void handleCommit(BatchReader reader) {
+ try {
+ while (reader.hasNext()) {
+ Batch batch = reader.next();
+ records.addAll(batch.records());
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
+ @Override
+ public void handleSnapshot(SnapshotReader reader) {
+ try {
+ while (reader.hasNext()) {
+ Batch batch = reader.next();
+ for (ApiMessageAndVersion messageAndVersion : batch) {
+ records.add(messageAndVersion);
+ }
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ }
+
+ public static BootstrapMetadata create(MetadataVersion metadataVersion) {
+ return create(metadataVersion, new ArrayList<>());
+ }
+
+ public static BootstrapMetadata create(MetadataVersion metadataVersion, List records) {
+ if (!metadataVersion.isKRaftSupported()) {
+ throw new IllegalArgumentException("Cannot create BootstrapMetadata with a non-KRaft metadata version.");
+ }
+ records.add(new ApiMessageAndVersion(
+ new FeatureLevelRecord()
+ .setName(MetadataVersion.FEATURE_NAME)
+ .setFeatureLevel(metadataVersion.featureLevel()),
+ FeatureLevelRecord.LOWEST_SUPPORTED_VERSION));
+
+ return new BootstrapMetadata(metadataVersion, records);
+ }
+
+ /**
+ * Load a bootstrap snapshot into a read-only bootstrap metadata object and return it.
+ *
+ * @param bootstrapDir The directory from which to read the snapshot file.
+ * @param fallbackPreviewVersion The metadata.version to boostrap if upgrading from KRaft preview
+ * @return The read-only bootstrap metadata
+ * @throws Exception
+ */
+ public static BootstrapMetadata load(Path bootstrapDir, MetadataVersion fallbackPreviewVersion) throws Exception {
+ final Path bootstrapPath = bootstrapDir.resolve(BOOTSTRAP_FILE);
+
+ if (!Files.exists(bootstrapPath)) {
+ log.debug("Missing bootstrap file, this appears to be a KRaft preview cluster. Setting metadata.version to {}.",
+ fallbackPreviewVersion.featureLevel());
+ return BootstrapMetadata.create(fallbackPreviewVersion);
+ }
+
+ BootstrapListener listener = new BootstrapListener();
+ try (SnapshotFileReader reader = new SnapshotFileReader(bootstrapPath.toString(), listener)) {
+ reader.startup();
+ reader.caughtUpFuture().get();
+ } catch (ExecutionException e) {
+ throw new Exception("Failed to load snapshot", e.getCause());
+ }
+
+ Optional metadataVersionRecord = listener.records.stream()
+ .flatMap(message -> {
+ MetadataRecordType type = MetadataRecordType.fromId(message.message().apiKey());
+ if (!type.equals(MetadataRecordType.FEATURE_LEVEL_RECORD)) {
+ return Stream.empty();
+ }
+ FeatureLevelRecord record = (FeatureLevelRecord) message.message();
+ if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
+ return Stream.of(record);
+ } else {
+ return Stream.empty();
+ }
+ })
+ .findFirst();
+
+ if (metadataVersionRecord.isPresent()) {
+ return new BootstrapMetadata(MetadataVersion.fromFeatureLevel(metadataVersionRecord.get().featureLevel()), listener.records);
+ } else {
+ throw new RuntimeException("Expected a metadata.version to exist in the snapshot " + bootstrapPath + ", but none was found");
+ }
+ }
+
+ /**
+ * Write a set of bootstrap metadata to the bootstrap snapshot in a given directory
+ *
+ * @param metadata The metadata to persist
+ * @param bootstrapDir The directory in which to create the bootstrap snapshot
+ * @throws IOException
+ */
+ public static void write(BootstrapMetadata metadata, Path bootstrapDir) throws IOException {
+ final Path bootstrapPath = bootstrapDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE);
+ if (Files.exists(bootstrapPath)) {
+ throw new IOException("Cannot write metadata bootstrap file " + bootstrapPath +
+ ". File already already exists.");
+ }
+ try (SnapshotFileWriter bootstrapWriter = SnapshotFileWriter.open(bootstrapPath)) {
+ bootstrapWriter.append(metadata.records());
+ }
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 538e8a13ebb..90dfb571b1f 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -336,8 +336,7 @@ public class ClusterControlManager {
heartbeatManager.register(brokerId, record.fenced());
List records = new ArrayList<>();
- records.add(new ApiMessageAndVersion(record,
- REGISTER_BROKER_RECORD.highestSupportedVersion()));
+ records.add(new ApiMessageAndVersion(record, REGISTER_BROKER_RECORD.highestSupportedVersion()));
return ControllerResult.atomicOf(records, new BrokerRegistrationReply(brokerEpoch));
}
@@ -535,8 +534,7 @@ public class ClusterControlManager {
setEndPoints(endpoints).
setFeatures(features).
setRack(registration.rack().orElse(null)).
- setFenced(registration.fenced()),
- REGISTER_BROKER_RECORD.highestSupportedVersion()));
+ setFenced(registration.fenced()), REGISTER_BROKER_RECORD.highestSupportedVersion()));
return batch;
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 307a0ce09df..52d3c5d5215 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -25,7 +25,9 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Optional;
import java.util.TreeMap;
+import java.util.function.Consumer;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
@@ -33,10 +35,12 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineObject;
import org.slf4j.Logger;
import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_RECORD;
@@ -55,6 +59,11 @@ public class FeatureControlManager {
*/
private final TimelineHashMap finalizedVersions;
+ /**
+ * The current metadata version
+ */
+ private final TimelineObject metadataVersion;
+
FeatureControlManager(LogContext logContext,
QuorumFeatures quorumFeatures,
@@ -62,13 +71,15 @@ public class FeatureControlManager {
this.log = logContext.logger(FeatureControlManager.class);
this.quorumFeatures = quorumFeatures;
this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.metadataVersion = new TimelineObject<>(snapshotRegistry, MetadataVersion.UNINITIALIZED);
}
ControllerResult