KAFKA-13935 Fix static usages of IBP in KRaft mode (#12250)

* Set the minimum supported MetadataVersion to 3.0-IV1
* Remove MetadataVersion.UNINITIALIZED
* Relocate RPC version mapping for fetch protocols into MetadataVersion
* Replace static IBP calls with dynamic calls to MetadataCache

A side effect of removing the UNINITIALIZED metadata version is that the FeatureControlManager and FeatureImage will initialize themselves with the minimum KRaft version (3.0-IV1).

The rationale for setting the minimum version to 3.0-IV1 is so that we can avoid any cases of KRaft mode running with an old log message format (KIP-724 was introduced in 3.0-IV1). As a side-effect of increasing this minimum version, the feature level values decreased by one.

Reviewers: Jason Gustafson <jason@confluent.io>, Jun Rao <junrao@gmail.com>
This commit is contained in:
David Arthur 2022-06-13 14:23:28 -04:00 committed by GitHub
parent 5cab11cf52
commit cc384054c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 379 additions and 182 deletions

View File

@ -73,7 +73,7 @@ object BrokerFeatures extends Logging {
def createDefault(): BrokerFeatures = {
new BrokerFeatures(Features.supportedFeatures(
java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
new SupportedVersionRange(MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.latest().featureLevel()))))
new SupportedVersionRange(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.latest().featureLevel()))))
}
def createEmpty(): BrokerFeatures = {

View File

@ -1790,38 +1790,25 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
// We keep the user-provided String as `MetadataVersion.fromVersionString` can choose a slightly different version (eg if `0.10.0`
// is passed, `0.10.0-IV0` may be picked)
val interBrokerProtocolVersionString = getString(KafkaConfig.InterBrokerProtocolVersionProp)
val interBrokerProtocolVersion = MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
val fetchRequestVersion: Short =
if (interBrokerProtocolVersion.isAtLeast(IBP_3_1_IV0)) 13
else if (interBrokerProtocolVersion.isAtLeast(IBP_2_7_IV1)) 12
else if (interBrokerProtocolVersion.isAtLeast(IBP_2_3_IV1)) 11
else if (interBrokerProtocolVersion.isAtLeast(IBP_2_1_IV2)) 10
else if (interBrokerProtocolVersion.isAtLeast(IBP_2_0_IV1)) 8
else if (interBrokerProtocolVersion.isAtLeast(IBP_1_1_IV0)) 7
else if (interBrokerProtocolVersion.isAtLeast(IBP_0_11_0_IV1)) 5
else if (interBrokerProtocolVersion.isAtLeast(IBP_0_11_0_IV0)) 4
else if (interBrokerProtocolVersion.isAtLeast(IBP_0_10_1_IV1)) 3
else if (interBrokerProtocolVersion.isAtLeast(IBP_0_10_0_IV0)) 2
else if (interBrokerProtocolVersion.isAtLeast(IBP_0_9_0)) 1
else 0
val offsetForLeaderEpochRequestVersion: Short =
if (interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 4
else if (interBrokerProtocolVersion.isAtLeast(IBP_2_3_IV1)) 3
else if (interBrokerProtocolVersion.isAtLeast(IBP_2_1_IV1)) 2
else if (interBrokerProtocolVersion.isAtLeast(IBP_2_0_IV0)) 1
else 0
val listOffsetRequestVersion: Short =
if (interBrokerProtocolVersion.isAtLeast(IBP_3_0_IV1)) 7
else if (interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 6
else if (interBrokerProtocolVersion.isAtLeast(IBP_2_2_IV1)) 5
else if (interBrokerProtocolVersion.isAtLeast(IBP_2_1_IV1)) 4
else if (interBrokerProtocolVersion.isAtLeast(IBP_2_0_IV1)) 3
else if (interBrokerProtocolVersion.isAtLeast(IBP_0_11_0_IV0)) 2
else if (interBrokerProtocolVersion.isAtLeast(IBP_0_10_1_IV2)) 1
else 0
val interBrokerProtocolVersion = if (processRoles.isEmpty) {
MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
} else {
if (originals.containsKey(KafkaConfig.InterBrokerProtocolVersionProp)) {
// A user-supplied IBP was given
val configuredVersion = MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
if (!configuredVersion.isKRaftSupported) {
throw new ConfigException(s"A non-KRaft version ${interBrokerProtocolVersionString} given for ${KafkaConfig.InterBrokerProtocolVersionProp}. " +
s"The minimum version is ${MetadataVersion.MINIMUM_KRAFT_VERSION}")
} else {
warn(s"${KafkaConfig.InterBrokerProtocolVersionProp} is deprecated in KRaft mode as of 3.3 and will only " +
s"be read when first upgrading from a KRaft prior to 3.3. See kafka-storage.sh help for details on setting " +
s"the metadata version for a new KRaft cluster.")
}
}
// In KRaft mode, we pin this value to the minimum KRaft-supported version. This prevents inadvertent usage of
// the static IBP config in broker components running in KRaft mode
MetadataVersion.MINIMUM_KRAFT_VERSION
}
/** ********* Controlled shutdown configuration ***********/
val controlledShutdownMaxRetries = getInt(KafkaConfig.ControlledShutdownMaxRetriesProp)

View File

@ -35,6 +35,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
import java.nio.file.Paths
import scala.collection.Seq
import scala.compat.java8.FunctionConverters.asJavaSupplier
import scala.jdk.CollectionConverters._
/**
@ -180,13 +181,16 @@ object KafkaRaftServer {
"If you intend to create a new broker, you should remove all data in your data directories (log.dirs).")
}
// Load the bootstrap metadata file or, in the case of an upgrade from KRaft preview, bootstrap the
// metadata.version corresponding to a user-configured IBP.
val bootstrapMetadata = if (config.originals.containsKey(KafkaConfig.InterBrokerProtocolVersionProp)) {
BootstrapMetadata.load(Paths.get(config.metadataLogDir), config.interBrokerProtocolVersion)
} else {
BootstrapMetadata.load(Paths.get(config.metadataLogDir), MetadataVersion.IBP_3_0_IV0)
// Load the bootstrap metadata file. In the case of an upgrade from older KRaft where there is no bootstrap metadata,
// read the IBP from config in order to bootstrap the equivalent metadata version.
def getUserDefinedIBPVersionOrThrow(): MetadataVersion = {
if (config.originals.containsKey(KafkaConfig.InterBrokerProtocolVersionProp)) {
MetadataVersion.fromVersionString(config.interBrokerProtocolVersionString)
} else {
throw new KafkaException(s"Cannot upgrade from KRaft version prior to 3.3 without first setting ${KafkaConfig.InterBrokerProtocolVersionProp} on each broker.")
}
}
val bootstrapMetadata = BootstrapMetadata.load(Paths.get(config.metadataLogDir), asJavaSupplier(() => getUserDefinedIBPVersionOrThrow()))
(metaProperties, bootstrapMetadata, offlineDirs.toSeq)
}

View File

@ -31,6 +31,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetFo
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2
import scala.jdk.CollectionConverters._
@ -46,13 +47,16 @@ import scala.compat.java8.OptionConverters.RichOptionForJava8
* @param brokerConfig Broker configuration
* @param replicaManager A ReplicaManager
* @param quota The quota, used when building a fetch request
* @param metadataVersionSupplier A supplier that returns the current MetadataVersion. This can change during
* runtime in KRaft mode.
*/
class RemoteLeaderEndPoint(logPrefix: String,
blockingSender: BlockingSend,
private[server] val fetchSessionHandler: FetchSessionHandler, // visible for testing
brokerConfig: KafkaConfig,
replicaManager: ReplicaManager,
quota: ReplicaQuota) extends LeaderEndPoint with Logging {
quota: ReplicaQuota,
metadataVersionSupplier: () => MetadataVersion) extends LeaderEndPoint with Logging {
this.logIdent = logPrefix
@ -61,7 +65,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
private val fetchSize = brokerConfig.replicaFetchMaxBytes
override val isTruncationOnFetchSupported = brokerConfig.interBrokerProtocolVersion.isTruncationOnFetchSupported
override def isTruncationOnFetchSupported = metadataVersionSupplier().isTruncationOnFetchSupported
override def initiateClose(): Unit = blockingSender.initiateClose()
@ -106,7 +110,8 @@ class RemoteLeaderEndPoint(logPrefix: String,
.setPartitionIndex(topicPartition.partition)
.setCurrentLeaderEpoch(currentLeaderEpoch)
.setTimestamp(earliestOrLatest)))
val requestBuilder = ListOffsetsRequest.Builder.forReplica(brokerConfig.listOffsetRequestVersion, brokerConfig.brokerId)
val metadataVersion = metadataVersionSupplier()
val requestBuilder = ListOffsetsRequest.Builder.forReplica(metadataVersion.listOffsetRequestVersion, brokerConfig.brokerId)
.setTargetTimes(Collections.singletonList(topic))
val clientResponse = blockingSender.sendRequest(requestBuilder)
@ -116,7 +121,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
Errors.forCode(responsePartition.errorCode) match {
case Errors.NONE =>
if (brokerConfig.interBrokerProtocolVersion.isAtLeast(IBP_0_10_1_IV2))
if (metadataVersion.isAtLeast(IBP_0_10_1_IV2))
responsePartition.offset
else
responsePartition.oldStyleOffsets.get(0)
@ -141,7 +146,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
}
val epochRequest = OffsetsForLeaderEpochRequest.Builder.forFollower(
brokerConfig.offsetForLeaderEpochRequestVersion, topics, brokerConfig.brokerId)
metadataVersionSupplier().offsetForLeaderEpochRequestVersion, topics, brokerConfig.brokerId)
debug(s"Sending offset for leader epoch request $epochRequest")
try {
@ -201,7 +206,12 @@ class RemoteLeaderEndPoint(logPrefix: String,
val fetchRequestOpt = if (fetchData.sessionPartitions.isEmpty && fetchData.toForget.isEmpty) {
None
} else {
val version: Short = if (brokerConfig.fetchRequestVersion >= 13 && !fetchData.canUseTopicIds) 12 else brokerConfig.fetchRequestVersion
val metadataVersion = metadataVersionSupplier()
val version: Short = if (metadataVersion.fetchRequestVersion >= 13 && !fetchData.canUseTopicIds) {
12
} else {
metadataVersion.fetchRequestVersion
}
val requestBuilder = FetchRequest.Builder
.forReplica(version, brokerConfig.brokerId, maxWait, minBytes, fetchData.toSend)
.setMaxBytes(maxBytes)

View File

@ -21,13 +21,15 @@ import kafka.cluster.BrokerEndPoint
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.server.common.MetadataVersion
class ReplicaFetcherManager(brokerConfig: KafkaConfig,
protected val replicaManager: ReplicaManager,
metrics: Metrics,
time: Time,
threadNamePrefix: Option[String] = None,
quotaManager: ReplicationQuotaManager)
quotaManager: ReplicationQuotaManager,
metadataVersionSupplier: () => MetadataVersion)
extends AbstractFetcherManager[ReplicaFetcherThread](
name = "ReplicaFetcherManager on broker " + brokerConfig.brokerId,
clientId = "Replica",
@ -41,9 +43,10 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig,
val endpoint = new BrokerBlockingSender(sourceBroker, brokerConfig, metrics, time, fetcherId,
s"broker-${brokerConfig.brokerId}-fetcher-$fetcherId", logContext)
val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig, replicaManager, quotaManager)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig,
replicaManager, quotaManager, metadataVersionSupplier)
new ReplicaFetcherThread(threadName, leader, brokerConfig, failedPartitions, replicaManager,
quotaManager, logContext.logPrefix)
quotaManager, logContext.logPrefix, metadataVersionSupplier)
}
def shutdown(): Unit = {

View File

@ -21,6 +21,7 @@ import kafka.log.{LeaderOffsetIncremented, LogAppendInfo}
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.server.common.MetadataVersion
class ReplicaFetcherThread(name: String,
leader: LeaderEndPoint,
@ -28,7 +29,8 @@ class ReplicaFetcherThread(name: String,
failedPartitions: FailedPartitions,
replicaMgr: ReplicaManager,
quota: ReplicaQuota,
logPrefix: String)
logPrefix: String,
metadataVersionSupplier: () => MetadataVersion)
extends AbstractFetcherThread(name = name,
clientId = name,
leader = leader,
@ -39,7 +41,7 @@ class ReplicaFetcherThread(name: String,
this.logIdent = logPrefix
override protected val isOffsetForLeaderEpochSupported: Boolean = brokerConfig.interBrokerProtocolVersion.isOffsetForLeaderEpochSupported
override protected val isOffsetForLeaderEpochSupported: Boolean = metadataVersionSupplier().isOffsetForLeaderEpochSupported
override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
replicaMgr.localLogOrException(topicPartition).latestEpoch
@ -135,7 +137,7 @@ class ReplicaFetcherThread(name: String,
def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition: TopicPartition): Unit = {
// oversized messages don't cause replication to fail from fetch request version 3 (KIP-74)
if (brokerConfig.fetchRequestVersion <= 2 && records.sizeInBytes > 0 && records.validBytes <= 0)
if (metadataVersionSupplier().fetchRequestVersion <= 2 && records.sizeInBytes > 0 && records.validBytes <= 0)
error(s"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition $topicPartition. " +
"This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " +
"message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " +

View File

@ -310,7 +310,7 @@ class ReplicaManager(val config: KafkaConfig,
// If inter-broker protocol (IBP) < 1.0, the controller will send LeaderAndIsrRequest V0 which does not include isNew field.
// In this case, the broker receiving the request cannot determine whether it is safe to create a partition if a log directory has failed.
// Thus, we choose to halt the broker on any log directory failure if IBP < 1.0
val haltBrokerOnFailure = config.interBrokerProtocolVersion.isLessThan(IBP_1_0_IV0)
val haltBrokerOnFailure = metadataCache.metadataVersion().isLessThan(IBP_1_0_IV0)
logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)
logDirFailureHandler.start()
}
@ -1773,7 +1773,7 @@ class ReplicaManager(val config: KafkaConfig,
* OffsetForLeaderEpoch request.
*/
protected def initialFetchOffset(log: UnifiedLog): Long = {
if (config.interBrokerProtocolVersion.isTruncationOnFetchSupported() && log.latestEpoch.nonEmpty)
if (metadataCache.metadataVersion().isTruncationOnFetchSupported && log.latestEpoch.nonEmpty)
log.logEndOffset
else
log.highWatermark
@ -1903,7 +1903,7 @@ class ReplicaManager(val config: KafkaConfig,
}
protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String], quotaManager: ReplicationQuotaManager) = {
new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager, () => metadataCache.metadataVersion())
}
protected def createReplicaAlterLogDirsManager(quotaManager: ReplicationQuotaManager, brokerTopicStats: BrokerTopicStats) = {

View File

@ -31,7 +31,6 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsImage}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion
import scala.collection.mutable
@ -132,10 +131,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
// Publish the new metadata image to the metadata cache.
metadataCache.setImage(newImage)
val metadataVersionLogMsg = newImage.features().metadataVersion() match {
case MetadataVersion.UNINITIALIZED => "un-initialized metadata.version"
case mv: MetadataVersion => s"metadata.version ${mv.featureLevel()}"
}
val metadataVersionLogMsg = s"metadata.version ${newImage.features().metadataVersion()}"
if (_firstPublish) {
info(s"Publishing initial metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.")

View File

@ -49,7 +49,7 @@ object StorageTool extends Logging {
val clusterId = namespace.getString("cluster_id")
val metadataVersion = getMetadataVersion(namespace)
if (!metadataVersion.isKRaftSupported) {
throw new TerseFailure(s"Must specify a metadata version of at least 1.")
throw new TerseFailure(s"Must specify a valid KRaft metadata version of at least 3.0.")
}
val metaProperties = buildMetadataProperties(clusterId, config.get)
val ignoreFormatted = namespace.getBoolean("ignore_formatted")
@ -99,7 +99,7 @@ object StorageTool extends Logging {
action(storeTrue())
formatParser.addArgument("--release-version", "-r").
action(store()).
help(s"A release version to use for the initial metadata.version. The default is (${MetadataVersion.latest().version()})")
help(s"A KRaft release version to use for the initial metadata version. The minimum is 3.0, the default is ${MetadataVersion.latest().version()}")
parser.parseArgsOrFail(args)
}

View File

@ -122,8 +122,8 @@ public class ClusterConfig {
return Optional.ofNullable(trustStoreFile);
}
public Optional<MetadataVersion> metadataVersion() {
return Optional.ofNullable(metadataVersion);
public MetadataVersion metadataVersion() {
return metadataVersion;
}
public Properties brokerServerProperties(int brokerId) {
@ -133,7 +133,7 @@ public class ClusterConfig {
public Map<String, String> nameTags() {
Map<String, String> tags = new LinkedHashMap<>(4);
name().ifPresent(name -> tags.put("Name", name));
metadataVersion().ifPresent(mv -> tags.put("MetadataVersion", mv.toString()));
tags.put("MetadataVersion", metadataVersion.toString());
tags.put("Security", securityProtocol.name());
listenerName().ifPresent(listener -> tags.put("Listener", listener));
return tags;
@ -150,11 +150,12 @@ public class ClusterConfig {
}
public static Builder defaultClusterBuilder() {
return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT);
return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latest());
}
public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol) {
return new Builder(type, brokers, controllers, autoStart, securityProtocol);
public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart,
SecurityProtocol securityProtocol, MetadataVersion metadataVersion) {
return new Builder(type, brokers, controllers, autoStart, securityProtocol, metadataVersion);
}
public static class Builder {
@ -168,12 +169,13 @@ public class ClusterConfig {
private File trustStoreFile;
private MetadataVersion metadataVersion;
Builder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol) {
Builder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol, MetadataVersion metadataVersion) {
this.type = type;
this.brokers = brokers;
this.controllers = controllers;
this.autoStart = autoStart;
this.securityProtocol = securityProtocol;
this.metadataVersion = metadataVersion;
}
public Builder type(Type type) {

View File

@ -25,6 +25,7 @@ import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@ -109,4 +110,9 @@ public class ClusterTestExtensionsTest {
clusterInstance.start();
Assertions.assertNotNull(clusterInstance.anyBrokerSocketServer());
}
@ClusterTest
public void testDefaults(ClusterConfig config) {
Assertions.assertEquals(MetadataVersion.IBP_3_3_IV3, config.metadataVersion());
}
}

View File

@ -41,6 +41,6 @@ public @interface ClusterTest {
String name() default "";
SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
String listener() default "";
MetadataVersion metadataVersion() default MetadataVersion.UNINITIALIZED;
MetadataVersion metadataVersion() default MetadataVersion.IBP_3_3_IV3;
ClusterConfigProperty[] serverProperties() default {};
}

View File

@ -25,7 +25,6 @@ import kafka.test.annotation.ClusterTemplate;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
@ -180,7 +179,8 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
throw new IllegalStateException();
}
ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, brokers, controllers, autoStart, annot.securityProtocol());
ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, brokers, controllers, autoStart,
annot.securityProtocol(), annot.metadataVersion());
if (!annot.name().isEmpty()) {
builder.name(annot.name());
} else {
@ -195,10 +195,6 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
properties.put(property.key(), property.value());
}
if (!annot.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
builder.metadataVersion(annot.metadataVersion());
}
ClusterConfig config = builder.build();
config.serverProperties().putAll(properties);
type.invocationContexts(config, testInvocations);

View File

@ -30,7 +30,6 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerState;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
import org.junit.jupiter.api.extension.Extension;
@ -86,7 +85,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
return Arrays.asList(
(BeforeTestExecutionCallback) context -> {
TestKitNodes nodes = new TestKitNodes.Builder().
setBootstrapMetadataVersion(clusterConfig.metadataVersion().orElse(MetadataVersion.latest())).
setBootstrapMetadataVersion(clusterConfig.metadataVersion()).
setNumBrokerNodes(clusterConfig.numBrokers()).
setNumControllerNodes(clusterConfig.numControllers()).build();
nodes.brokerNodes().forEach((brokerId, brokerNode) -> {

View File

@ -108,7 +108,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
@Override
public Properties serverConfig() {
Properties props = clusterConfig.serverProperties();
clusterConfig.metadataVersion().ifPresent(mv -> props.put(KafkaConfig.InterBrokerProtocolVersionProp(), mv.version()));
props.put(KafkaConfig.InterBrokerProtocolVersionProp(), metadataVersion().version());
return props;
}

View File

@ -340,7 +340,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
StorageTool.formatCommand(out,
JavaConverters.asScalaBuffer(Collections.singletonList(metadataLogDir)).toSeq(),
properties,
MetadataVersion.IBP_3_0_IV0,
MetadataVersion.MINIMUM_KRAFT_VERSION,
false);
} finally {
for (String line : stream.toString().split(String.format("%n"))) {

View File

@ -46,7 +46,7 @@ class ProducerIdsIntegrationTest {
@ClusterTests(Array(
new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1),
new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0),
new ClusterTest(clusterType = Type.KRAFT, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0)
new ClusterTest(clusterType = Type.KRAFT, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV1)
))
def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
verifyUniqueIds(clusterInstance)

View File

@ -32,7 +32,7 @@ import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class MetadataVersionIntegrationTest {
@ClusterTests(value = Array(
new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_0_IV0),
new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_0_IV1),
new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_1_IV0),
new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_2_IV0)
))
@ -40,8 +40,8 @@ class MetadataVersionIntegrationTest {
val admin = clusterInstance.createAdminClient()
val describeResult = admin.describeFeatures()
val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
assertEquals(ff.minVersionLevel(), clusterInstance.config().metadataVersion().get().featureLevel())
assertEquals(ff.maxVersionLevel(), clusterInstance.config().metadataVersion().get().featureLevel())
assertEquals(ff.minVersionLevel(), clusterInstance.config().metadataVersion().featureLevel())
assertEquals(ff.maxVersionLevel(), clusterInstance.config().metadataVersion().featureLevel())
// Update to new version
val updateVersion = MetadataVersion.IBP_3_3_IV0.featureLevel.shortValue
@ -71,7 +71,8 @@ class MetadataVersionIntegrationTest {
val admin = clusterInstance.createAdminClient()
val describeResult = admin.describeFeatures()
val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
assertEquals(ff.minVersionLevel(), MetadataVersion.latest().featureLevel())
assertEquals(ff.minVersionLevel(), MetadataVersion.latest().featureLevel(),
"If this test fails, check the default MetadataVersion in the @ClusterTest annotation")
assertEquals(ff.maxVersionLevel(), MetadataVersion.latest().featureLevel())
}
}

View File

@ -1570,4 +1570,32 @@ class KafkaConfigTest {
"contained in listeners or controller.listener.names",
assertThrows(classOf[ConfigException], () => new KafkaConfig(props)).getMessage)
}
@Test
def testIgnoreUserInterBrokerProtocolVersionKRaft(): Unit = {
for (ibp <- Seq("3.0", "3.1", "3.2")) {
val props = new Properties()
props.putAll(kraftProps())
props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, ibp)
val config = new KafkaConfig(props)
assertEquals(config.interBrokerProtocolVersion, MetadataVersion.MINIMUM_KRAFT_VERSION)
}
}
@Test
def testInvalidInterBrokerProtocolVersionKRaft(): Unit = {
val props = new Properties()
props.putAll(kraftProps())
props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, "2.8")
assertEquals("A non-KRaft version 2.8 given for inter.broker.protocol.version. The minimum version is 3.0-IV1",
assertThrows(classOf[ConfigException], () => new KafkaConfig(props)).getMessage)
}
@Test
def testDefaultInterBrokerProtocolVersionKRaft(): Unit = {
val props = new Properties()
props.putAll(kraftProps())
val config = new KafkaConfig(props)
assertEquals(config.interBrokerProtocolVersion, MetadataVersion.MINIMUM_KRAFT_VERSION)
}
}

View File

@ -24,6 +24,7 @@ import kafka.log.UnifiedLog
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.controller.BootstrapMetadata
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@ -71,12 +72,13 @@ class KafkaRaftServerTest {
private def invokeLoadMetaProperties(
metaProperties: MetaProperties,
configProperties: Properties
configProperties: Properties,
metadataVersion: Option[MetadataVersion] = Some(MetadataVersion.latest())
): (MetaProperties, BootstrapMetadata, collection.Seq[String]) = {
val tempLogDir = TestUtils.tempDirectory()
try {
writeMetaProperties(tempLogDir, metaProperties)
metadataVersion.foreach(mv => writeBootstrapMetadata(tempLogDir, mv))
configProperties.put(KafkaConfig.LogDirProp, tempLogDir.getAbsolutePath)
val config = KafkaConfig.fromProps(configProperties)
KafkaRaftServer.initializeLogDirs(config)
@ -94,6 +96,11 @@ class KafkaRaftServerTest {
checkpoint.write(metaProperties.toProperties)
}
private def writeBootstrapMetadata(logDir: File, metadataVersion: MetadataVersion): Unit = {
val bootstrapMetadata = BootstrapMetadata.create(metadataVersion)
BootstrapMetadata.write(bootstrapMetadata, logDir.toPath)
}
@Test
def testStartupFailsIfMetaPropertiesMissingInSomeLogDir(): Unit = {
val clusterId = clusterIdBase64
@ -147,6 +154,7 @@ class KafkaRaftServerTest {
// One log dir is online and has properly formatted `meta.properties`
val validDir = TestUtils.tempDirectory()
writeMetaProperties(validDir, MetaProperties(clusterId, nodeId))
writeBootstrapMetadata(validDir, MetadataVersion.latest())
// Use a regular file as an invalid log dir to trigger an IO error
val invalidDir = TestUtils.tempFile("blah")
@ -215,4 +223,47 @@ class KafkaRaftServerTest {
() => KafkaRaftServer.initializeLogDirs(config))
}
@Test
def testKRaftUpdateWithIBP(): Unit = {
val clusterId = clusterIdBase64
val nodeId = 0
val metaProperties = MetaProperties(clusterId, nodeId)
val configProperties = new Properties
configProperties.put(KafkaConfig.ProcessRolesProp, "broker,controller")
configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
configProperties.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
configProperties.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9093")
configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
configProperties.put(KafkaConfig.InterBrokerProtocolVersionProp, "3.2")
val (loadedMetaProperties, bootstrapMetadata, offlineDirs) =
invokeLoadMetaProperties(metaProperties, configProperties, None)
assertEquals(metaProperties, loadedMetaProperties)
assertEquals(Seq.empty, offlineDirs)
assertEquals(bootstrapMetadata.metadataVersion(), MetadataVersion.IBP_3_2_IV0)
}
@Test
def testKRaftUpdateWithoutIBP(): Unit = {
val clusterId = clusterIdBase64
val nodeId = 0
val metaProperties = MetaProperties(clusterId, nodeId)
val logDir = TestUtils.tempDirectory()
writeMetaProperties(logDir, metaProperties)
val configProperties = new Properties
configProperties.put(KafkaConfig.ProcessRolesProp, "broker,controller")
configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
configProperties.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
configProperties.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9093")
configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
configProperties.put(KafkaConfig.LogDirProp, logDir.getAbsolutePath)
val config = KafkaConfig.fromProps(configProperties)
assertEquals("Cannot upgrade from KRaft version prior to 3.3 without first setting inter.broker.protocol.version on each broker.",
assertThrows(classOf[KafkaException], () => KafkaRaftServer.initializeLogDirs(config)).getMessage)
}
}

View File

@ -103,14 +103,16 @@ class ReplicaFetcherThreadTest {
leaderEndpointBlockingSend: BlockingSend): ReplicaFetcherThread = {
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${brokerConfig.brokerId}, leaderId=${leaderEndpointBlockingSend.brokerEndPoint().id}, fetcherId=$fetcherId] ")
val fetchSessionHandler = new FetchSessionHandler(logContext, leaderEndpointBlockingSend.brokerEndPoint().id)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, leaderEndpointBlockingSend, fetchSessionHandler, brokerConfig, replicaMgr, quota)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, leaderEndpointBlockingSend, fetchSessionHandler,
brokerConfig, replicaMgr, quota, () => brokerConfig.interBrokerProtocolVersion)
new ReplicaFetcherThread(name,
leader,
brokerConfig,
failedPartitions,
replicaMgr,
quota,
logContext.logPrefix)
logContext.logPrefix,
() => brokerConfig.interBrokerProtocolVersion)
}
@Test
@ -121,9 +123,9 @@ class ReplicaFetcherThreadTest {
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
when(replicaManager.brokerTopicStats).thenReturn(mock(classOf[BrokerTopicStats]))
assertEquals(ApiKeys.FETCH.latestVersion, config.fetchRequestVersion)
assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, config.offsetForLeaderEpochRequestVersion)
assertEquals(ApiKeys.LIST_OFFSETS.latestVersion, config.listOffsetRequestVersion)
assertEquals(ApiKeys.FETCH.latestVersion, config.interBrokerProtocolVersion.fetchRequestVersion())
assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, config.interBrokerProtocolVersion.offsetForLeaderEpochRequestVersion)
assertEquals(ApiKeys.LIST_OFFSETS.latestVersion, config.interBrokerProtocolVersion.listOffsetRequestVersion)
}
@Test
@ -581,8 +583,10 @@ class ReplicaFetcherThreadTest {
val mockNetwork = new MockBlockingSender(Collections.emptyMap(), brokerEndPoint, new SystemTime())
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ")
val fetchSessionHandler = new FetchSessionHandler(logContext, brokerEndPoint.id)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockNetwork, fetchSessionHandler, config, replicaManager, quota)
val thread = new ReplicaFetcherThread("bob", leader, config, failedPartitions, replicaManager, quota, logContext.logPrefix) {
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockNetwork, fetchSessionHandler, config,
replicaManager, quota, () => config.interBrokerProtocolVersion)
val thread = new ReplicaFetcherThread("bob", leader, config, failedPartitions,
replicaManager, quota, logContext.logPrefix, () => config.interBrokerProtocolVersion) {
override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = None
}
thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), initialLEO), t1p1 -> initialFetchState(Some(topicId1), initialLEO)))
@ -1036,14 +1040,16 @@ class ReplicaFetcherThreadTest {
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ")
val fetchSessionHandler = new FetchSessionHandler(logContext, brokerEndPoint.id)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockBlockingSend, fetchSessionHandler, config, replicaManager, replicaQuota)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockBlockingSend, fetchSessionHandler, config,
replicaManager, replicaQuota, () => config.interBrokerProtocolVersion)
val thread = new ReplicaFetcherThread("bob",
leader,
config,
failedPartitions,
replicaManager,
replicaQuota,
logContext.logPrefix)
logContext.logPrefix,
() => config.interBrokerProtocolVersion)
val leaderEpoch = 1

View File

@ -214,6 +214,7 @@ class ReplicaManagerTest {
val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1))
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
mockGetAliveBrokerFunctions(metadataCache, aliveBrokers)
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
val rm = new ReplicaManager(
metrics = metrics,
config = config,
@ -1964,7 +1965,7 @@ class ReplicaManagerTest {
any[TopicPartition], any[ListenerName])).
thenReturn(Map(leaderBrokerId -> new Node(leaderBrokerId, "host1", 9092, "rack-a"),
followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-b")).toMap)
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", timer, reaperEnabled = false)
val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
@ -2003,15 +2004,16 @@ class ReplicaManagerTest {
time: Time,
threadNamePrefix: Option[String],
replicationQuotaManager: ReplicationQuotaManager): ReplicaFetcherManager = {
new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, replicationQuotaManager) {
new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, replicationQuotaManager, () => metadataCache.metadataVersion()) {
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = {
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${sourceBroker.id}, " +
s"fetcherId=$fetcherId] ")
val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, blockingSend, fetchSessionHandler, config, replicaManager, quotaManager.follower)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, blockingSend, fetchSessionHandler, config,
replicaManager, quotaManager.follower, () => config.interBrokerProtocolVersion)
new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", leader, config, failedPartitions, replicaManager,
quotaManager.follower, logContext.logPrefix) {
quotaManager.follower, logContext.logPrefix, () => config.interBrokerProtocolVersion) {
override def doWork(): Unit = {
// In case the thread starts before the partition is added by AbstractFetcherManager,
// add it here (it's a no-op if already added)
@ -2235,6 +2237,7 @@ class ReplicaManagerTest {
when(metadataCache.topicIdInfo()).thenReturn((topicIds.asJava, topicNames.asJava))
when(metadataCache.topicNamesToIds()).thenReturn(topicIds.asJava)
when(metadataCache.topicIdsToNames()).thenReturn(topicNames.asJava)
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
mockGetAliveBrokerFunctions(metadataCache, aliveBrokers)
val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", timer, reaperEnabled = false)
@ -2487,6 +2490,8 @@ class ReplicaManagerTest {
val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1))
mockGetAliveBrokerFunctions(metadataCache0, aliveBrokers)
mockGetAliveBrokerFunctions(metadataCache1, aliveBrokers)
when(metadataCache0.metadataVersion()).thenReturn(config0.interBrokerProtocolVersion)
when(metadataCache1.metadataVersion()).thenReturn(config1.interBrokerProtocolVersion)
// each replica manager is for a broker
val rm0 = new ReplicaManager(

View File

@ -74,7 +74,7 @@ class BrokerMetadataListenerTest {
)
)
val imageRecords = listener.getImageRecords().get()
assertEquals(0, imageRecords.size())
assertEquals(1, imageRecords.size())
assertEquals(100L, listener.highestMetadataOffset)
assertEquals(0L, metrics.lastAppliedRecordOffset.get)
assertEquals(0L, metrics.lastAppliedRecordTimestamp.get)

View File

@ -325,7 +325,8 @@ public class ReplicaFetcherThreadBenchmark {
new LogContext(String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3)), 3),
config,
replicaManager,
replicaQuota
replicaQuota,
config::interBrokerProtocolVersion
) {
@Override
public long fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {
@ -356,7 +357,8 @@ public class ReplicaFetcherThreadBenchmark {
new FailedPartitions(),
replicaManager,
replicaQuota,
String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3)
String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3),
config::interBrokerProtocolVersion
);
pool = partitions;

View File

@ -39,6 +39,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Stream;
@ -128,7 +129,9 @@ public class BootstrapMetadata {
public static BootstrapMetadata create(MetadataVersion metadataVersion, List<ApiMessageAndVersion> records) {
if (!metadataVersion.isKRaftSupported()) {
throw new IllegalArgumentException("Cannot create BootstrapMetadata with a non-KRaft metadata version.");
throw new IllegalArgumentException(String.format(
"Cannot create BootstrapMetadata with a non-KRaft metadata version %s. Minimum version is %s",
metadataVersion, MetadataVersion.MINIMUM_KRAFT_VERSION));
}
records.add(new ApiMessageAndVersion(
new FeatureLevelRecord()
@ -142,18 +145,24 @@ public class BootstrapMetadata {
/**
* Load a bootstrap snapshot into a read-only bootstrap metadata object and return it.
*
* @param bootstrapDir The directory from which to read the snapshot file.
* @param fallbackPreviewVersion The metadata.version to boostrap if upgrading from KRaft preview
* @return The read-only bootstrap metadata
* @param bootstrapDir The directory from which to read the snapshot file.
* @param fallbackVersionSupplier A function that returns the metadata.version to use when upgrading from an older KRaft
* @return The read-only bootstrap metadata
* @throws Exception
*/
public static BootstrapMetadata load(Path bootstrapDir, MetadataVersion fallbackPreviewVersion) throws Exception {
public static BootstrapMetadata load(Path bootstrapDir, Supplier<MetadataVersion> fallbackVersionSupplier) throws Exception {
final Path bootstrapPath = bootstrapDir.resolve(BOOTSTRAP_FILE);
if (!Files.exists(bootstrapPath)) {
log.debug("Missing bootstrap file, this appears to be a KRaft preview cluster. Setting metadata.version to {}.",
fallbackPreviewVersion.featureLevel());
return BootstrapMetadata.create(fallbackPreviewVersion);
// Upgrade scenario from KRaft prior to 3.3 (i.e., no bootstrap metadata present)
MetadataVersion fallbackVersion = fallbackVersionSupplier.get();
if (fallbackVersion.isKRaftSupported()) {
log.debug("Missing bootstrap file, this appears to be a KRaft cluster older than 3.3. Setting metadata.version to {}.",
fallbackVersion.featureLevel());
return BootstrapMetadata.create(fallbackVersion);
} else {
throw new Exception(String.format("Could not set fallback bootstrap metadata with non-KRaft metadata version of %s", fallbackVersion));
}
}
BootstrapListener listener = new BootstrapListener();
@ -182,7 +191,7 @@ public class BootstrapMetadata {
if (metadataVersionRecord.isPresent()) {
return new BootstrapMetadata(MetadataVersion.fromFeatureLevel(metadataVersionRecord.get().featureLevel()), listener.records);
} else {
throw new RuntimeException("Expected a metadata.version to exist in the snapshot " + bootstrapPath + ", but none was found");
throw new Exception("Expected a metadata.version to exist in the snapshot " + bootstrapPath + ", but none was found");
}
}

View File

@ -590,11 +590,7 @@ public class ClusterControlManager {
ClusterControlIterator(long epoch) {
this.iterator = brokerRegistrations.entrySet(epoch).iterator();
if (featureControl.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
this.metadataVersion = MetadataVersion.IBP_3_0_IV1;
} else {
this.metadataVersion = featureControl.metadataVersion();
}
this.metadataVersion = featureControl.metadataVersion();
}
@Override

View File

@ -52,7 +52,7 @@ public class FeatureControlManager {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
private QuorumFeatures quorumFeatures = null;
private MetadataVersion metadataVersion = MetadataVersion.UNINITIALIZED;
private MetadataVersion metadataVersion = MetadataVersion.MINIMUM_KRAFT_VERSION;
Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
@ -105,6 +105,10 @@ public class FeatureControlManager {
*/
private final TimelineObject<MetadataVersion> metadataVersion;
/**
* A boolean to see if we have encountered a metadata.version or not.
*/
private final TimelineObject<Boolean> sawMetadataVersion;
private FeatureControlManager(
LogContext logContext,
@ -116,6 +120,7 @@ public class FeatureControlManager {
this.quorumFeatures = quorumFeatures;
this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
this.metadataVersion = new TimelineObject<>(snapshotRegistry, metadataVersion);
this.sawMetadataVersion = new TimelineObject<>(snapshotRegistry, false);
}
ControllerResult<Map<String, ApiError>> updateFeatures(
@ -226,7 +231,7 @@ public class FeatureControlManager {
return invalidMetadataVersion(newVersionLevel, "Unknown metadata.version.");
}
if (!currentVersion.equals(MetadataVersion.UNINITIALIZED) && newVersion.isLessThan(currentVersion)) {
if (newVersion.isLessThan(currentVersion)) {
// This is a downgrade
boolean metadataChanged = MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion);
if (!metadataChanged) {
@ -257,15 +262,20 @@ public class FeatureControlManager {
FinalizedControllerFeatures finalizedFeatures(long epoch) {
Map<String, Short> features = new HashMap<>();
if (!metadataVersion.get(epoch).equals(MetadataVersion.UNINITIALIZED)) {
features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get(epoch).featureLevel());
}
features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get(epoch).featureLevel());
for (Entry<String, Short> entry : finalizedVersions.entrySet(epoch)) {
features.put(entry.getKey(), entry.getValue());
}
return new FinalizedControllerFeatures(features, epoch);
}
/**
* @return true if a FeatureLevelRecord for "metadata.version" has been replayed. False otherwise
*/
boolean sawMetadataVersion() {
return this.sawMetadataVersion.get();
}
public void replay(FeatureLevelRecord record) {
VersionRange range = quorumFeatures.localSupportedFeature(record.name());
if (!range.contains(record.featureLevel())) {
@ -275,6 +285,7 @@ public class FeatureControlManager {
if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
log.info("Setting metadata.version to {}", record.featureLevel());
metadataVersion.set(MetadataVersion.fromFeatureLevel(record.featureLevel()));
sawMetadataVersion.set(true);
} else {
if (record.featureLevel() == 0) {
log.info("Removing feature {}", record.name());
@ -294,9 +305,6 @@ public class FeatureControlManager {
FeatureControlIterator(long epoch) {
this.iterator = finalizedVersions.entrySet(epoch).iterator();
this.metadataVersion = FeatureControlManager.this.metadataVersion.get(epoch);
if (this.metadataVersion.equals(MetadataVersion.UNINITIALIZED)) {
this.wroteVersion = true;
}
}
@Override

View File

@ -283,7 +283,7 @@ public final class QuorumController implements Controller {
public QuorumController build() throws Exception {
if (raftClient == null) {
throw new IllegalStateException("You must set a raft client.");
} else if (bootstrapMetadata == null || bootstrapMetadata.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
} else if (bootstrapMetadata == null) {
throw new IllegalStateException("You must specify an initial metadata.version using the kafka-storage tool.");
} else if (quorumFeatures == null) {
throw new IllegalStateException("You must specify the quorum features");
@ -932,21 +932,21 @@ public final class QuorumController implements Controller {
// write any other records to the log since we need the metadata.version to determine the correct
// record version
final MetadataVersion metadataVersion;
if (featureControl.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
if (!featureControl.sawMetadataVersion()) {
final CompletableFuture<Map<String, ApiError>> future;
if (!bootstrapMetadata.metadataVersion().isKRaftSupported()) {
metadataVersion = MetadataVersion.UNINITIALIZED;
metadataVersion = MetadataVersion.MINIMUM_KRAFT_VERSION;
future = new CompletableFuture<>();
future.completeExceptionally(
new IllegalStateException("Cannot become leader without an initial metadata.version of " +
"at least 1. Got " + bootstrapMetadata.metadataVersion().featureLevel()));
new IllegalStateException("Cannot become leader without a KRaft supported version. " +
"Got " + bootstrapMetadata.metadataVersion()));
} else {
metadataVersion = bootstrapMetadata.metadataVersion();
future = appendWriteEvent("bootstrapMetadata", OptionalLong.empty(), () -> {
if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_3_IV0)) {
log.info("Initializing metadata.version to {}", metadataVersion.featureLevel());
} else {
log.info("Upgrading from KRaft preview. Initializing metadata.version to {}",
log.info("Upgrading KRaft cluster and initializing metadata.version to {}",
metadataVersion.featureLevel());
}
return ControllerResult.atomicOf(bootstrapMetadata.records(), null);
@ -1964,6 +1964,11 @@ public final class QuorumController implements Controller {
return curClaimEpoch;
}
// Visible for testing
MetadataVersion metadataVersion() {
return featureControl.metadataVersion();
}
@Override
public void close() throws InterruptedException {
queue.close();

View File

@ -72,7 +72,9 @@ public class QuorumFeatures {
public static Map<String, VersionRange> defaultFeatureMap() {
Map<String, VersionRange> features = new HashMap<>(1);
features.put(MetadataVersion.FEATURE_NAME, VersionRange.of(MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.latest().featureLevel()));
features.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
MetadataVersion.latest().featureLevel()));
return features;
}

View File

@ -38,7 +38,7 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_
* This class is thread-safe.
*/
public final class FeaturesImage {
public static final FeaturesImage EMPTY = new FeaturesImage(Collections.emptyMap(), MetadataVersion.UNINITIALIZED);
public static final FeaturesImage EMPTY = new FeaturesImage(Collections.emptyMap(), MetadataVersion.MINIMUM_KRAFT_VERSION);
private final Map<String, Short> finalizedVersions;
@ -68,11 +68,10 @@ public final class FeaturesImage {
public void write(Consumer<List<ApiMessageAndVersion>> out) {
List<ApiMessageAndVersion> batch = new ArrayList<>();
// Write out the metadata.version record first, and then the rest of the finalized features
if (!metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
}
batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
for (Entry<String, Short> entry : finalizedVersions.entrySet()) {
if (entry.getKey().equals(MetadataVersion.FEATURE_NAME)) {
continue;

View File

@ -121,12 +121,7 @@ public final class MetadataImage {
}
public void write(Consumer<List<ApiMessageAndVersion>> out) {
// We use the minimum KRaft metadata version if this image does
// not have a specific version set.
MetadataVersion metadataVersion = features.metadataVersion();
if (metadataVersion.equals(MetadataVersion.UNINITIALIZED)) {
metadataVersion = MetadataVersion.IBP_3_0_IV1;
}
// Features should be written out first so we can include the metadata.version at the beginning of the
// snapshot
features.write(out);

View File

@ -34,28 +34,28 @@ public class BootstrapMetadataTest {
@Test
public void testWriteAndReadBootstrapFile() throws Exception {
Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
BootstrapMetadata metadata = BootstrapMetadata.create(MetadataVersion.IBP_3_0_IV0);
BootstrapMetadata metadata = BootstrapMetadata.create(MetadataVersion.MINIMUM_KRAFT_VERSION);
BootstrapMetadata.write(metadata, tmpDir);
assertTrue(Files.exists(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE)));
BootstrapMetadata newMetadata = BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0);
BootstrapMetadata newMetadata = BootstrapMetadata.load(tmpDir, () -> MetadataVersion.MINIMUM_KRAFT_VERSION);
assertEquals(metadata, newMetadata);
}
@Test
public void testNoBootstrapFile() throws Exception {
Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
BootstrapMetadata metadata = BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0);
assertEquals(MetadataVersion.IBP_3_0_IV0, metadata.metadataVersion());
metadata = BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_2_IV0);
BootstrapMetadata metadata = BootstrapMetadata.load(tmpDir, () -> MetadataVersion.MINIMUM_KRAFT_VERSION);
assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, metadata.metadataVersion());
metadata = BootstrapMetadata.load(tmpDir, () -> MetadataVersion.IBP_3_2_IV0);
assertEquals(MetadataVersion.IBP_3_2_IV0, metadata.metadataVersion());
}
@Test
public void testExistingBootstrapFile() throws Exception {
Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
BootstrapMetadata.write(BootstrapMetadata.create(MetadataVersion.IBP_3_0_IV0), tmpDir);
BootstrapMetadata.write(BootstrapMetadata.create(MetadataVersion.MINIMUM_KRAFT_VERSION), tmpDir);
assertThrows(IOException.class, () -> {
BootstrapMetadata.write(BootstrapMetadata.create(MetadataVersion.IBP_3_1_IV0), tmpDir);
});
@ -65,7 +65,7 @@ public class BootstrapMetadataTest {
public void testEmptyBootstrapFile() throws Exception {
Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
Files.createFile(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE));
assertThrows(RuntimeException.class, () -> BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0),
assertThrows(Exception.class, () -> BootstrapMetadata.load(tmpDir, () -> MetadataVersion.MINIMUM_KRAFT_VERSION),
"Should fail to load if no metadata.version is set");
}
@ -77,7 +77,7 @@ public class BootstrapMetadataTest {
byte[] data = new byte[100];
random.nextBytes(data);
Files.write(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE), data, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
assertThrows(RuntimeException.class, () -> BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0),
assertThrows(Exception.class, () -> BootstrapMetadata.load(tmpDir, () -> MetadataVersion.MINIMUM_KRAFT_VERSION),
"Should fail on invalid data");
}
}

View File

@ -68,7 +68,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class ClusterControlManagerTest {
@ParameterizedTest
@EnumSource(value = MetadataVersion.class, names = {"IBP_3_0_IV0", "IBP_3_3_IV2"})
@EnumSource(value = MetadataVersion.class, names = {"IBP_3_0_IV1", "IBP_3_3_IV2"})
public void testReplay(MetadataVersion metadataVersion) {
MockTime time = new MockTime(0, 0, 0);

View File

@ -93,7 +93,7 @@ public class FeatureControlManagerTest {
setSnapshotRegistry(snapshotRegistry).
build();
snapshotRegistry.getOrCreateSnapshot(-1);
assertEquals(new FinalizedControllerFeatures(Collections.emptyMap(), -1),
assertEquals(new FinalizedControllerFeatures(Collections.singletonMap("metadata.version", (short) 1), -1),
manager.finalizedFeatures(-1));
assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
@ -131,7 +131,7 @@ public class FeatureControlManagerTest {
build();
manager.replay(record);
snapshotRegistry.getOrCreateSnapshot(123);
assertEquals(new FinalizedControllerFeatures(versionMap("foo", 2), 123),
assertEquals(new FinalizedControllerFeatures(versionMap("metadata.version", 1, "foo", 2), 123),
manager.finalizedFeatures(123));
}
@ -210,6 +210,9 @@ public class FeatureControlManagerTest {
Collections.emptyMap(), Collections.emptyMap(), false);
RecordTestUtils.replayAll(manager, result.records());
RecordTestUtils.assertBatchIteratorContains(Arrays.asList(
Arrays.asList(new ApiMessageAndVersion(new FeatureLevelRecord().
setName("metadata.version").
setFeatureLevel((short) 1), (short) 0)),
Arrays.asList(new ApiMessageAndVersion(new FeatureLevelRecord().
setName("foo").
setFeatureLevel((short) 5), (short) 0)),
@ -222,13 +225,13 @@ public class FeatureControlManagerTest {
@Test
public void testApplyMetadataVersionChangeRecord() {
QuorumFeatures features = features(MetadataVersion.FEATURE_NAME,
MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel());
MetadataVersion.IBP_3_0_IV1.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel());
FeatureControlManager manager = new FeatureControlManager.Builder().
setQuorumFeatures(features).build();
manager.replay(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.IBP_3_0_IV0.featureLevel()));
assertEquals(MetadataVersion.IBP_3_0_IV0, manager.metadataVersion());
setFeatureLevel(MetadataVersion.IBP_3_0_IV1.featureLevel()));
assertEquals(MetadataVersion.IBP_3_0_IV1, manager.metadataVersion());
}
@Test
@ -258,12 +261,12 @@ public class FeatureControlManagerTest {
assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().get(MetadataVersion.FEATURE_NAME).error());
result = manager.updateFeatures(
Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_0_IV0.featureLevel()),
Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel()),
Collections.singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
Collections.emptyMap(),
true);
assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().get(MetadataVersion.FEATURE_NAME).error());
assertEquals("Invalid update version 1 for feature metadata.version. Local controller 0 only supports versions 4-5",
assertEquals("Invalid update version 1 for feature metadata.version. Local controller 0 only supports versions 3-4",
result.response().get(MetadataVersion.FEATURE_NAME).message());
}
@ -271,7 +274,7 @@ public class FeatureControlManagerTest {
public void testCreateFeatureLevelRecords() {
Map<String, VersionRange> localSupportedFeatures = new HashMap<>();
localSupportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.latest().featureLevel()));
MetadataVersion.IBP_3_0_IV1.featureLevel(), MetadataVersion.latest().featureLevel()));
localSupportedFeatures.put("foo", VersionRange.of(0, 2));
FeatureControlManager manager = new FeatureControlManager.Builder().
setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(), localSupportedFeatures, emptyList())).

View File

@ -96,6 +96,7 @@ import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import static java.util.function.Function.identity;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
@ -214,7 +215,7 @@ public class QuorumControllerTest {
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
}, OptionalLong.of(sessionTimeoutMillis), OptionalLong.empty(), MetadataVersion.latest());
}, OptionalLong.of(sessionTimeoutMillis), OptionalLong.empty(), BootstrapMetadata.create(MetadataVersion.latest()));
) {
ListenerCollection listeners = new ListenerCollection();
listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
@ -306,7 +307,7 @@ public class QuorumControllerTest {
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
}, OptionalLong.of(sessionTimeoutMillis), OptionalLong.of(leaderImbalanceCheckIntervalNs), MetadataVersion.latest());
}, OptionalLong.of(sessionTimeoutMillis), OptionalLong.of(leaderImbalanceCheckIntervalNs), BootstrapMetadata.create(MetadataVersion.latest()));
) {
ListenerCollection listeners = new ListenerCollection();
listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
@ -537,7 +538,7 @@ public class QuorumControllerTest {
BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection();
features.add(new BrokerRegistrationRequestData.Feature()
.setName(MetadataVersion.FEATURE_NAME)
.setMinSupportedVersion(MetadataVersion.IBP_3_0_IV0.featureLevel())
.setMinSupportedVersion(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel())
.setMaxSupportedVersion(MetadataVersion.latest().featureLevel()));
return features;
}
@ -1179,4 +1180,21 @@ public class QuorumControllerTest {
"authorizer " + i + " should not have completed loading.");
}
}
@Test
public void testInvalidBootstrapMetadata() throws Exception {
// We can't actually create a BootstrapMetadata with an invalid version, so we have to fake it
BootstrapMetadata bootstrapMetadata = Mockito.mock(BootstrapMetadata.class);
Mockito.when(bootstrapMetadata.metadataVersion()).thenReturn(MetadataVersion.IBP_2_8_IV0);
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
}, OptionalLong.empty(), OptionalLong.empty(), bootstrapMetadata);
) {
QuorumController active = controlEnv.activeController();
TestUtils.waitForCondition(() -> !active.isActive(),
"Timed out waiting for controller to renounce itself after bad bootstrap metadata version.");
}
}
}

View File

@ -49,7 +49,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
LocalLogManagerTestEnv logEnv,
Consumer<QuorumController.Builder> builderConsumer
) throws Exception {
this(logEnv, builderConsumer, OptionalLong.empty(), OptionalLong.empty(), MetadataVersion.latest());
this(logEnv, builderConsumer, OptionalLong.empty(), OptionalLong.empty(), BootstrapMetadata.create(MetadataVersion.latest()));
}
public QuorumControllerTestEnv(
@ -57,7 +57,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
Consumer<Builder> builderConsumer,
OptionalLong sessionTimeoutMillis,
OptionalLong leaderImbalanceCheckIntervalNs,
MetadataVersion metadataVersion
BootstrapMetadata bootstrapMetadata
) throws Exception {
this.logEnv = logEnv;
int numControllers = logEnv.logManagers().size();
@ -68,7 +68,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
for (int i = 0; i < numControllers; i++) {
QuorumController.Builder builder = new QuorumController.Builder(i, logEnv.clusterId());
builder.setRaftClient(logEnv.logManagers().get(i));
builder.setBootstrapMetadata(BootstrapMetadata.create(metadataVersion));
builder.setBootstrapMetadata(bootstrapMetadata);
builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs);
builder.setQuorumFeatures(new QuorumFeatures(i, apiVersions, QuorumFeatures.defaultFeatureMap(), nodeIds));
sessionTimeoutMillis.ifPresent(timeout -> {

View File

@ -43,7 +43,6 @@ import org.apache.kafka.common.record.RecordVersion;
* released version, they can use "0.10.0" when upgrading to the 0.10.0 release.
*/
public enum MetadataVersion {
UNINITIALIZED(-1, "0.0", ""),
IBP_0_8_0(-1, "0.8.0", ""),
IBP_0_8_1(-1, "0.8.1", ""),
@ -140,32 +139,36 @@ public enum MetadataVersion {
IBP_2_8_IV1(-1, "2.8", "IV1"),
// Introduce AllocateProducerIds (KIP-730)
IBP_3_0_IV0(1, "3.0", "IV0", true),
IBP_3_0_IV0(-1, "3.0", "IV0"),
// Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734)
// Assume message format version is 3.0 (KIP-724)
IBP_3_0_IV1(2, "3.0", "IV1", false),
IBP_3_0_IV1(1, "3.0", "IV1", true),
// Adds topic IDs to Fetch requests/responses (KIP-516)
IBP_3_1_IV0(3, "3.1", "IV0", false),
IBP_3_1_IV0(2, "3.1", "IV0", false),
// Support for leader recovery for unclean leader election (KIP-704)
IBP_3_2_IV0(4, "3.2", "IV0", true),
IBP_3_2_IV0(3, "3.2", "IV0", true),
// Support for metadata.version feature flag and Removes min_version_level from the finalized version range that is written to ZooKeeper (KIP-778)
IBP_3_3_IV0(5, "3.3", "IV0", false),
IBP_3_3_IV0(4, "3.3", "IV0", false),
// Support NoopRecord for the cluster metadata log (KIP-835)
IBP_3_3_IV1(6, "3.3", "IV1", true),
IBP_3_3_IV1(5, "3.3", "IV1", true),
// In KRaft mode, use BrokerRegistrationChangeRecord instead of UnfenceBrokerRecord and FenceBrokerRecord.
IBP_3_3_IV2(7, "3.3", "IV2", true),
IBP_3_3_IV2(6, "3.3", "IV2", true),
// Adds InControlledShutdown state to RegisterBrokerRecord and BrokerRegistrationChangeRecord (KIP-841).
IBP_3_3_IV3(8, "3.3", "IV3", true);
IBP_3_3_IV3(7, "3.3", "IV3", true);
// NOTE: update the default version in @ClusterTest annotation to point to the latest version
public static final String FEATURE_NAME = "metadata.version";
public static final MetadataVersion MINIMUM_KRAFT_VERSION = IBP_3_0_IV1;
public static final MetadataVersion[] VERSIONS;
private final short featureLevel;
@ -258,12 +261,73 @@ public enum MetadataVersion {
}
}
public short fetchRequestVersion() {
if (this.isAtLeast(IBP_3_1_IV0)) {
return 13;
} else if (this.isAtLeast(IBP_2_7_IV1)) {
return 12;
} else if (this.isAtLeast(IBP_2_3_IV1)) {
return 11;
} else if (this.isAtLeast(IBP_2_1_IV2)) {
return 10;
} else if (this.isAtLeast(IBP_2_0_IV1)) {
return 8;
} else if (this.isAtLeast(IBP_1_1_IV0)) {
return 7;
} else if (this.isAtLeast(IBP_0_11_0_IV1)) {
return 5;
} else if (this.isAtLeast(IBP_0_11_0_IV0)) {
return 4;
} else if (this.isAtLeast(IBP_0_10_1_IV1)) {
return 3;
} else if (this.isAtLeast(IBP_0_10_0_IV0)) {
return 2;
} else if (this.isAtLeast(IBP_0_9_0)) {
return 1;
} else {
return 0;
}
}
public short offsetForLeaderEpochRequestVersion() {
if (this.isAtLeast(IBP_2_8_IV0)) {
return 4;
} else if (this.isAtLeast(IBP_2_3_IV1)) {
return 3;
} else if (this.isAtLeast(IBP_2_1_IV1)) {
return 2;
} else if (this.isAtLeast(IBP_2_0_IV0)) {
return 1;
} else {
return 0;
}
}
public short listOffsetRequestVersion() {
if (this.isAtLeast(IBP_3_0_IV1)) {
return 7;
} else if (this.isAtLeast(IBP_2_8_IV0)) {
return 6;
} else if (this.isAtLeast(IBP_2_2_IV1)) {
return 5;
} else if (this.isAtLeast(IBP_2_1_IV1)) {
return 4;
} else if (this.isAtLeast(IBP_2_0_IV1)) {
return 3;
} else if (this.isAtLeast(IBP_0_11_0_IV0)) {
return 2;
} else if (this.isAtLeast(IBP_0_10_1_IV2)) {
return 1;
} else {
return 0;
}
}
private static final Map<String, MetadataVersion> IBP_VERSIONS;
static {
{
// Make a copy of values() and omit UNINITIALIZED
MetadataVersion[] enumValues = MetadataVersion.values();
VERSIONS = Arrays.copyOfRange(enumValues, 1, enumValues.length);
VERSIONS = Arrays.copyOf(enumValues, enumValues.length);
IBP_VERSIONS = new HashMap<>();
Map<String, MetadataVersion> maxInterVersion = new HashMap<>();
@ -289,8 +353,8 @@ public enum MetadataVersion {
Optional<MetadataVersion> previous() {
int idx = this.ordinal();
if (idx > 1) {
return Optional.of(VERSIONS[idx - 2]);
if (idx > 0) {
return Optional.of(VERSIONS[idx - 1]);
} else {
return Optional.empty();
}

View File

@ -76,7 +76,7 @@ class MetadataVersionTest {
@Test
public void testFeatureLevel() {
MetadataVersion[] metadataVersions = MetadataVersion.VERSIONS;
int firstFeatureLevelIndex = Arrays.asList(metadataVersions).indexOf(IBP_3_0_IV0);
int firstFeatureLevelIndex = Arrays.asList(metadataVersions).indexOf(MetadataVersion.MINIMUM_KRAFT_VERSION);
for (int i = 0; i < firstFeatureLevelIndex; i++) {
assertTrue(metadataVersions[i].featureLevel() < 0);
}
@ -287,7 +287,7 @@ class MetadataVersionTest {
public void testPrevious() {
for (int i = 1; i < MetadataVersion.VERSIONS.length - 2; i++) {
MetadataVersion version = MetadataVersion.VERSIONS[i];
assertTrue(version.previous().isPresent());
assertTrue(version.previous().isPresent(), version.toString());
assertEquals(MetadataVersion.VERSIONS[i - 1], version.previous().get());
}
}
@ -317,8 +317,8 @@ class MetadataVersionTest {
}
for (MetadataVersion metadataVersion : MetadataVersion.VERSIONS) {
if (metadataVersion.isAtLeast(IBP_3_0_IV0)) {
assertTrue(metadataVersion.isKRaftSupported());
if (metadataVersion.isAtLeast(IBP_3_0_IV1)) {
assertTrue(metadataVersion.isKRaftSupported(), metadataVersion.toString());
} else {
assertFalse(metadataVersion.isKRaftSupported());
}