diff --git a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java index d496e63c88f..6dd892cf365 100644 --- a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java @@ -48,7 +48,7 @@ public class LogManagerBuilder { private int maxTransactionTimeoutMs = 15 * 60 * 1000; private ProducerStateManagerConfig producerStateManagerConfig = new ProducerStateManagerConfig(60000, false); private int producerIdExpirationCheckIntervalMs = 600000; - private MetadataVersion interBrokerProtocolVersion = MetadataVersion.latest(); + private MetadataVersion interBrokerProtocolVersion = MetadataVersion.latestProduction(); private Scheduler scheduler = null; private BrokerTopicStats brokerTopicStats = null; private LogDirFailureChannel logDirFailureChannel = null; diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 4da07516551..85b655f16c2 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -715,7 +715,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, origin: AppendOrigin = AppendOrigin.CLIENT, - interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest, + interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latestProduction, requestLocal: RequestLocal = RequestLocal.NoCaching, verificationGuard: VerificationGuard = VerificationGuard.SENTINEL): LogAppendInfo = { val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER @@ -732,7 +732,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, def appendAsFollower(records: MemoryRecords): LogAppendInfo = { append(records, origin = AppendOrigin.REPLICATION, - interBrokerProtocolVersion = MetadataVersion.latest, + interBrokerProtocolVersion = MetadataVersion.latestProduction, validateAndAssignOffsets = false, leaderEpoch = -1, requestLocal = None, diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala index 6e1057469df..a7ef07868eb 100644 --- a/core/src/main/scala/kafka/server/ApiVersionManager.scala +++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala @@ -91,7 +91,7 @@ class SimpleApiVersionManager( this( listenerType, ApiKeys.apisForListener(listenerType).asScala, - BrokerFeatures.defaultSupportedFeatures(), + BrokerFeatures.defaultSupportedFeatures(enableUnstableLastVersion), enableUnstableLastVersion, zkMigrationEnabled, featuresProvider diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala b/core/src/main/scala/kafka/server/BrokerFeatures.scala index 040529dde57..41333c2bd37 100644 --- a/core/src/main/scala/kafka/server/BrokerFeatures.scala +++ b/core/src/main/scala/kafka/server/BrokerFeatures.scala @@ -70,14 +70,21 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte object BrokerFeatures extends Logging { - def createDefault(): BrokerFeatures = { - new BrokerFeatures(defaultSupportedFeatures()) + def createDefault(unstableMetadataVersionsEnabled: Boolean): BrokerFeatures = { + new BrokerFeatures(defaultSupportedFeatures(unstableMetadataVersionsEnabled)) } - def defaultSupportedFeatures(): Features[SupportedVersionRange] = { + def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): Features[SupportedVersionRange] = { Features.supportedFeatures( java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME, - new SupportedVersionRange(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.latest().featureLevel()))) + new SupportedVersionRange( + MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), + if (unstableMetadataVersionsEnabled) { + MetadataVersion.latestTesting.featureLevel + } else { + MetadataVersion.latestProduction.featureLevel + } + ))) } def createEmpty(): BrokerFeatures = { diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 46862480206..593d28ac7ab 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -140,7 +140,7 @@ class BrokerServer( var brokerMetadataPublisher: BrokerMetadataPublisher = _ - val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault() + val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault(config.unstableMetadataVersionsEnabled) def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 03d55c309e8..fbd27114c61 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -152,7 +152,7 @@ object Defaults { val LeaderImbalancePerBrokerPercentage = 10 val LeaderImbalanceCheckIntervalSeconds = 300 val InterBrokerSecurityProtocol = SecurityProtocol.PLAINTEXT.toString - val InterBrokerProtocolVersion = MetadataVersion.latest.version + val InterBrokerProtocolVersion = MetadataVersion.latestProduction.version /** ********* Controlled shutdown configuration ***********/ val ControlledShutdownMaxRetries = 3 diff --git a/core/src/test/java/kafka/test/ClusterConfig.java b/core/src/test/java/kafka/test/ClusterConfig.java index c4ea8669d53..900d62f90c7 100644 --- a/core/src/test/java/kafka/test/ClusterConfig.java +++ b/core/src/test/java/kafka/test/ClusterConfig.java @@ -155,7 +155,7 @@ public class ClusterConfig { } public static Builder defaultClusterBuilder() { - return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latest()); + return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latestTesting()); } public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart, diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java index 2cef03f67af..fa11fa9b186 100644 --- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java +++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java @@ -199,6 +199,7 @@ public class KafkaClusterTestKit implements AutoCloseable { props.putAll(brokerNode.propertyOverrides()); } props.putIfAbsent(KafkaConfig$.MODULE$.UnstableMetadataVersionsEnableProp(), "true"); + props.putIfAbsent(KafkaConfig$.MODULE$.UnstableApiVersionsEnableProp(), "true"); return new KafkaConfig(props, false, Option.empty()); } diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java b/core/src/test/java/kafka/testkit/TestKitNodes.java index 90317c56224..1321d3a56c7 100644 --- a/core/src/test/java/kafka/testkit/TestKitNodes.java +++ b/core/src/test/java/kafka/testkit/TestKitNodes.java @@ -34,7 +34,7 @@ public class TestKitNodes { private boolean combined = false; private Uuid clusterId = null; private BootstrapMetadata bootstrapMetadata = BootstrapMetadata. - fromVersion(MetadataVersion.latest(), "testkit"); + fromVersion(MetadataVersion.latestTesting(), "testkit"); private final NavigableMap controllerNodeBuilders = new TreeMap<>(); private final NavigableMap brokerNodeBuilders = new TreeMap<>(); diff --git a/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala index f2a6e71dd17..5a0b6f1df02 100644 --- a/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala @@ -173,7 +173,7 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging { zkClient.createTopLevelPaths() val securityProtocol = SecurityProtocol.PLAINTEXT val endpoint = new EndPoint("localhost", 9092, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol) - val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), MetadataVersion.latest, jmxPort = 9192) + val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), MetadataVersion.latestTesting, jmxPort = 9192) zkClient.registerBroker(brokerInfo) } } diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 0ec23bc9de8..d1452a1d918 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -923,12 +923,12 @@ class KRaftClusterTest { try { admin.updateFeatures( Map(MetadataVersion.FEATURE_NAME -> - new FeatureUpdate(MetadataVersion.latest().featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions + new FeatureUpdate(MetadataVersion.latestTesting().featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions ) } finally { admin.close() } - TestUtils.waitUntilTrue(() => cluster.brokers().get(1).metadataCache.currentImage().features().metadataVersion().equals(MetadataVersion.latest()), + TestUtils.waitUntilTrue(() => cluster.brokers().get(1).metadataCache.currentImage().features().metadataVersion().equals(MetadataVersion.latestTesting()), "Timed out waiting for metadata version update.") } finally { cluster.close() diff --git a/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala b/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala index fac859effb2..630484503f3 100644 --- a/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala +++ b/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala @@ -37,8 +37,8 @@ class MetadataRequestBetweenDifferentIbpTest extends BaseRequestTest { override def generateConfigs: Seq[KafkaConfig] = { Seq( createConfig(0, IBP_2_8_IV0), - createConfig(1, MetadataVersion.latest), - createConfig(2, MetadataVersion.latest) + createConfig(1, MetadataVersion.latestTesting), + createConfig(2, MetadataVersion.latestTesting) ) } diff --git a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala index 54bc462e973..5640f432ed6 100644 --- a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala @@ -74,8 +74,8 @@ class MetadataVersionIntegrationTest { val admin = clusterInstance.createAdminClient() val describeResult = admin.describeFeatures() val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME) - assertEquals(ff.minVersionLevel(), MetadataVersion.latest().featureLevel(), + assertEquals(ff.minVersionLevel(), MetadataVersion.latestTesting().featureLevel(), "If this test fails, check the default MetadataVersion in the @ClusterTest annotation") - assertEquals(ff.maxVersionLevel(), MetadataVersion.latest().featureLevel()) + assertEquals(ff.maxVersionLevel(), MetadataVersion.latestTesting().featureLevel()) } } diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index 10477c1a947..1a83809c0ad 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -175,7 +175,7 @@ abstract class QuorumTestHarness extends Logging { Seq(new Properties()) } - protected def metadataVersion: MetadataVersion = MetadataVersion.latest() + protected def metadataVersion: MetadataVersion = MetadataVersion.latestTesting() private var testInfo: TestInfo = _ private var implementation: QuorumImplementation = _ diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala index 1d0577cc977..8e53111ebed 100644 --- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala @@ -95,7 +95,7 @@ class AbstractPartitionTest { .thenReturn(None) } - protected def interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest + protected def interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latestTesting def createLogProperties(overrides: Map[String, String]): Properties = { val logProps = new Properties() diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 054d1690d80..8523df82023 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -271,7 +271,7 @@ class PartitionLockTest extends Logging { logManager.startup(Set.empty) val partition = new Partition(topicPartition, replicaLagTimeMaxMs = kafka.server.Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = MetadataVersion.latest, + interBrokerProtocolVersion = MetadataVersion.latestTesting, localBrokerId = brokerId, () => 1L, mockTime, diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 9180147618f..9a302ea6ea8 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -417,7 +417,7 @@ class PartitionTest extends AbstractPartitionTest { partition = new Partition( topicPartition, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = MetadataVersion.latest, + interBrokerProtocolVersion = MetadataVersion.latestTesting, localBrokerId = brokerId, () => defaultBrokerEpoch(brokerId), time, @@ -1570,7 +1570,7 @@ class PartitionTest extends AbstractPartitionTest { val partition = new Partition( topicPartition, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = MetadataVersion.latest, + interBrokerProtocolVersion = MetadataVersion.latestTesting, localBrokerId = brokerId, () => defaultBrokerEpoch(brokerId), time, @@ -1687,7 +1687,7 @@ class PartitionTest extends AbstractPartitionTest { val partition = new Partition( topicPartition, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = MetadataVersion.latest, + interBrokerProtocolVersion = MetadataVersion.latestTesting, localBrokerId = brokerId, () => defaultBrokerEpoch(brokerId), time, @@ -1794,7 +1794,7 @@ class PartitionTest extends AbstractPartitionTest { val partition = new Partition( topicPartition, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = MetadataVersion.latest, + interBrokerProtocolVersion = MetadataVersion.latestTesting, localBrokerId = brokerId, () => defaultBrokerEpoch(brokerId), time, @@ -1886,7 +1886,7 @@ class PartitionTest extends AbstractPartitionTest { val partition = new Partition( topicPartition, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = MetadataVersion.latest, + interBrokerProtocolVersion = MetadataVersion.latestTesting, localBrokerId = brokerId, () => defaultBrokerEpoch(brokerId), time, @@ -1952,7 +1952,7 @@ class PartitionTest extends AbstractPartitionTest { val partition = new Partition( topicPartition, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = MetadataVersion.latest, + interBrokerProtocolVersion = MetadataVersion.latestTesting, localBrokerId = brokerId, () => defaultBrokerEpoch(brokerId), time, @@ -2108,7 +2108,7 @@ class PartitionTest extends AbstractPartitionTest { val partition = new Partition( topicPartition, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = MetadataVersion.latest, + interBrokerProtocolVersion = MetadataVersion.latestTesting, localBrokerId = brokerId, () => defaultBrokerEpoch(brokerId), time, @@ -2781,7 +2781,7 @@ class PartitionTest extends AbstractPartitionTest { // Create new Partition object for same topicPartition val partition2 = new Partition(topicPartition, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = MetadataVersion.latest, + interBrokerProtocolVersion = MetadataVersion.latestTesting, localBrokerId = brokerId, () => defaultBrokerEpoch(brokerId), time, @@ -2826,7 +2826,7 @@ class PartitionTest extends AbstractPartitionTest { // Create new Partition object for same topicPartition val partition2 = new Partition(topicPartition, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = MetadataVersion.latest, + interBrokerProtocolVersion = MetadataVersion.latestTesting, localBrokerId = brokerId, () => defaultBrokerEpoch(brokerId), time, @@ -2909,7 +2909,7 @@ class PartitionTest extends AbstractPartitionTest { def testUpdateAssignmentAndIsr(): Unit = { val topicPartition = new TopicPartition("test", 1) val partition = new Partition( - topicPartition, 1000, MetadataVersion.latest, 0, () => defaultBrokerEpoch(0), + topicPartition, 1000, MetadataVersion.latestTesting, 0, () => defaultBrokerEpoch(0), new SystemTime(), mock(classOf[AlterPartitionListener]), mock(classOf[DelayedOperations]), mock(classOf[MetadataCache]), mock(classOf[LogManager]), mock(classOf[AlterPartitionManager])) @@ -2984,7 +2984,7 @@ class PartitionTest extends AbstractPartitionTest { val spyLogManager = spy(logManager) val partition = new Partition(topicPartition, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = MetadataVersion.latest, + interBrokerProtocolVersion = MetadataVersion.latestTesting, localBrokerId = brokerId, () => defaultBrokerEpoch(brokerId), time, @@ -3023,7 +3023,7 @@ class PartitionTest extends AbstractPartitionTest { val partition = new Partition(topicPartition, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = MetadataVersion.latest, + interBrokerProtocolVersion = MetadataVersion.latestTesting, localBrokerId = brokerId, () => defaultBrokerEpoch(brokerId), time, @@ -3065,7 +3065,7 @@ class PartitionTest extends AbstractPartitionTest { val partition = new Partition(topicPartition, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = MetadataVersion.latest, + interBrokerProtocolVersion = MetadataVersion.latestTesting, localBrokerId = brokerId, () => defaultBrokerEpoch(brokerId), time, diff --git a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala index 44a14386f7f..d6718e4203c 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala @@ -160,7 +160,7 @@ class ControllerChannelManagerTest { @Test def testLeaderAndIsrInterBrokerProtocolVersion(): Unit = { - testLeaderAndIsrRequestFollowsInterBrokerProtocolVersion(MetadataVersion.latest, ApiKeys.LEADER_AND_ISR.latestVersion) + testLeaderAndIsrRequestFollowsInterBrokerProtocolVersion(MetadataVersion.latestTesting, ApiKeys.LEADER_AND_ISR.latestVersion) for (metadataVersion <- MetadataVersion.VERSIONS) { val leaderAndIsrRequestVersion: Short = { @@ -379,7 +379,7 @@ class ControllerChannelManagerTest { @Test def testUpdateMetadataInterBrokerProtocolVersion(): Unit = { - testUpdateMetadataFollowsInterBrokerProtocolVersion(MetadataVersion.latest, ApiKeys.UPDATE_METADATA.latestVersion) + testUpdateMetadataFollowsInterBrokerProtocolVersion(MetadataVersion.latestTesting, ApiKeys.UPDATE_METADATA.latestVersion) for (metadataVersion <- MetadataVersion.VERSIONS) { val updateMetadataRequestVersion: Short = @@ -625,7 +625,7 @@ class ControllerChannelManagerTest { @Test def testMixedDeleteAndNotDeleteStopReplicaRequests(): Unit = { - testMixedDeleteAndNotDeleteStopReplicaRequests(MetadataVersion.latest, + testMixedDeleteAndNotDeleteStopReplicaRequests(MetadataVersion.latestTesting, ApiKeys.STOP_REPLICA.latestVersion) for (metadataVersion <- MetadataVersion.VERSIONS) { @@ -775,7 +775,7 @@ class ControllerChannelManagerTest { @Test def testStopReplicaInterBrokerProtocolVersion(): Unit = { - testStopReplicaFollowsInterBrokerProtocolVersion(MetadataVersion.latest, ApiKeys.STOP_REPLICA.latestVersion) + testStopReplicaFollowsInterBrokerProtocolVersion(MetadataVersion.latestTesting, ApiKeys.STOP_REPLICA.latestVersion) for (metadataVersion <- MetadataVersion.VERSIONS) { if (metadataVersion.isLessThan(IBP_2_2_IV0)) diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index d5acc2f190a..8a7bba87c6b 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -51,7 +51,7 @@ import scala.util.{Failure, Success, Try} object ControllerIntegrationTest { def testAlterPartitionSource(): JStream[Arguments] = { - Seq(MetadataVersion.IBP_2_7_IV0, MetadataVersion.latest).asJava.stream.flatMap { metadataVersion => + Seq(MetadataVersion.IBP_2_7_IV0, MetadataVersion.latestTesting).asJava.stream.flatMap { metadataVersion => ApiKeys.ALTER_PARTITION.allVersions.stream.map { alterPartitionVersion => Arguments.of(metadataVersion, alterPartitionVersion) } @@ -1010,7 +1010,7 @@ class ControllerIntegrationTest extends QuorumTestHarness { // topic ids anymore. However, the already assigned topic ids are kept. This means // that using AlterPartition version 2 should still work assuming that it only // contains topic with topics ids. - servers = makeServers(1, interBrokerProtocolVersion = Some(MetadataVersion.latest)) + servers = makeServers(1, interBrokerProtocolVersion = Some(MetadataVersion.latestTesting)) val controllerId = TestUtils.waitUntilControllerElected(zkClient) val tp = new TopicPartition("t", 0) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index d43a1225598..efc6d01c7a1 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -97,7 +97,7 @@ class GroupMetadataManagerTest { metrics = new kMetrics() time = new MockTime replicaManager = mock(classOf[ReplicaManager]) - groupMetadataManager = new GroupMetadataManager(0, MetadataVersion.latest, offsetConfig, replicaManager, + groupMetadataManager = new GroupMetadataManager(0, MetadataVersion.latestTesting, offsetConfig, replicaManager, time, metrics) groupMetadataManager.startup(() => numOffsetsPartitions, false) partition = mock(classOf[Partition]) @@ -112,7 +112,7 @@ class GroupMetadataManagerTest { def testLogInfoFromCleanupGroupMetadata(): Unit = { var expiredOffsets: Int = 0 var infoCount = 0 - val gmm = new GroupMetadataManager(0, MetadataVersion.latest, offsetConfig, replicaManager, time, metrics) { + val gmm = new GroupMetadataManager(0, MetadataVersion.latestTesting, offsetConfig, replicaManager, time, metrics) { override def cleanupGroupMetadata(groups: Iterable[GroupMetadata], requestLocal: RequestLocal, selector: GroupMetadata => Map[TopicPartition, OffsetAndMetadata]): Int = expiredOffsets @@ -2740,7 +2740,7 @@ class GroupMetadataManagerTest { val offsetCommitRecord = TestUtils.records(Seq( new SimpleRecord( GroupMetadataManager.offsetCommitKey(groupId, topicPartition), - GroupMetadataManager.offsetCommitValue(OffsetAndMetadata(35L, "", time.milliseconds()), MetadataVersion.latest) + GroupMetadataManager.offsetCommitValue(OffsetAndMetadata(35L, "", time.milliseconds()), MetadataVersion.latestTesting) ) )).records.asScala.head val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(offsetCommitRecord) @@ -2826,7 +2826,7 @@ class GroupMetadataManagerTest { protocol: String, memberId: String, assignmentBytes: Array[Byte] = Array.emptyByteArray, - metadataVersion: MetadataVersion = MetadataVersion.latest): SimpleRecord = { + metadataVersion: MetadataVersion = MetadataVersion.latestTesting): SimpleRecord = { val memberProtocols = List((protocol, Array.emptyByteArray)) val member = new MemberMetadata(memberId, Some(groupInstanceId), "clientId", "clientHost", 30000, 10000, protocolType, memberProtocols) val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol, memberId, @@ -2839,7 +2839,7 @@ class GroupMetadataManagerTest { private def buildEmptyGroupRecord(generation: Int, protocolType: String): SimpleRecord = { val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, None, Seq.empty, time) val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId) - val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map.empty, MetadataVersion.latest) + val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map.empty, MetadataVersion.latestTesting) new SimpleRecord(groupMetadataKey, groupMetadataValue) } @@ -2883,7 +2883,7 @@ class GroupMetadataManagerTest { private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long], groupId: String = groupId, - metadataVersion: MetadataVersion = MetadataVersion.latest, + metadataVersion: MetadataVersion = MetadataVersion.latestTesting, retentionTimeOpt: Option[Long] = None): Seq[SimpleRecord] = { committedOffsets.map { case (topicPartition, offset) => val commitTimestamp = time.milliseconds() diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index ac18c07dced..5232f3b10b5 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -246,7 +246,7 @@ class LogLoaderTest { @Test def testProducerSnapshotsRecoveryAfterUncleanShutdownCurrentMessageFormat(): Unit = { - testProducerSnapshotsRecoveryAfterUncleanShutdown(MetadataVersion.latest.version) + testProducerSnapshotsRecoveryAfterUncleanShutdown(MetadataVersion.latestTesting.version) } private def createLog(dir: File, diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 5d900223a71..0b5d17cbcc2 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -158,7 +158,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( offsetCounter, metricsRecorder, @@ -203,7 +203,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(0), metricsRecorder, @@ -255,7 +255,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(0), metricsRecorder, @@ -321,7 +321,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(0), metricsRecorder, @@ -372,7 +372,7 @@ class LogValidatorTest { 1000L, partitionLeaderEpoch, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( offsetCounter, metricsRecorder, @@ -458,7 +458,7 @@ class LogValidatorTest { 1000L, partitionLeaderEpoch, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(0), metricsRecorder, @@ -517,7 +517,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(0), metricsRecorder, @@ -565,7 +565,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest, + MetadataVersion.latestTesting, ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(0), metricsRecorder, @@ -628,7 +628,7 @@ class LogValidatorTest { 1000L, partitionLeaderEpoch, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(0), metricsRecorder, @@ -688,7 +688,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(0), metricsRecorder, @@ -714,7 +714,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(0), metricsRecorder, @@ -740,7 +740,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(0), metricsRecorder, @@ -766,7 +766,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(0), metricsRecorder, @@ -791,7 +791,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest, + MetadataVersion.latestTesting, ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ).validatedRecords, offset) @@ -814,7 +814,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ).validatedRecords, offset) @@ -838,7 +838,7 @@ class LogValidatorTest { 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ).validatedRecords @@ -863,7 +863,7 @@ class LogValidatorTest { 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ).validatedRecords @@ -889,7 +889,7 @@ class LogValidatorTest { 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ).validatedRecords @@ -915,7 +915,7 @@ class LogValidatorTest { 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ).validatedRecords @@ -939,7 +939,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ) @@ -965,7 +965,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ) @@ -991,7 +991,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ) @@ -1017,7 +1017,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ) @@ -1043,7 +1043,7 @@ class LogValidatorTest { 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier )) @@ -1066,7 +1066,7 @@ class LogValidatorTest { 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.COORDINATOR, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ) @@ -1094,7 +1094,7 @@ class LogValidatorTest { 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ).validatedRecords, offset) @@ -1118,7 +1118,7 @@ class LogValidatorTest { 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ).validatedRecords, offset) @@ -1141,7 +1141,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ).validatedRecords, offset) @@ -1164,7 +1164,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ).validatedRecords, offset) @@ -1188,7 +1188,7 @@ class LogValidatorTest { 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ).validatedRecords, offset) @@ -1212,7 +1212,7 @@ class LogValidatorTest { 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ).validatedRecords, offset) @@ -1238,7 +1238,7 @@ class LogValidatorTest { 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier )) @@ -1264,7 +1264,7 @@ class LogValidatorTest { 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier )) @@ -1288,7 +1288,7 @@ class LogValidatorTest { 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ).validatedRecords, offset) @@ -1312,7 +1312,7 @@ class LogValidatorTest { 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ).validatedRecords, offset) @@ -1334,7 +1334,7 @@ class LogValidatorTest { 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier )) @@ -1398,7 +1398,7 @@ class LogValidatorTest { 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ) @@ -1477,7 +1477,7 @@ class LogValidatorTest { timestampAfterMaxConfig, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ) @@ -1511,7 +1511,7 @@ class LogValidatorTest { timestampAfterMaxConfig, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ) @@ -1544,7 +1544,7 @@ class LogValidatorTest { 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, - MetadataVersion.latest + MetadataVersion.latestTesting ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier )) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 6b0f1daeca6..c9c9ca0ffbb 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -79,7 +79,7 @@ class SocketServerTest { TestUtils.clearYammerMetrics() private val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, false, - () => new Features(MetadataVersion.latest(), Collections.emptyMap[String, java.lang.Short], 0, true)) + () => new Features(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true)) val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager) val sockets = new ArrayBuffer[Socket] diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala index aab892b3346..73596ef94e0 100644 --- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala @@ -137,7 +137,7 @@ class ZkAuthorizationTest extends QuorumTestHarness with Logging { private def createBrokerInfo(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol, rack: Option[String] = None): BrokerInfo = BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol - (securityProtocol), securityProtocol)), rack = rack), MetadataVersion.latest, jmxPort = port + 10) + (securityProtocol), securityProtocol)), rack = rack), MetadataVersion.latestTesting, jmxPort = port + 10) private def newKafkaZkClient(connectionString: String, isSecure: Boolean) = KafkaZkClient(connectionString, isSecure, 6000, 6000, Int.MaxValue, Time.SYSTEM, "ZkAuthorizationTest", diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala index 5b9e0aafee2..376cd40c694 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala @@ -79,12 +79,12 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { ): Unit = { if (cluster.isKRaftTest && apiVersion >= 3) { assertEquals(1, apiVersionsResponse.data().finalizedFeatures().size()) - assertEquals(MetadataVersion.latest().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel()) - assertEquals(MetadataVersion.latest().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel()) + assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel()) + assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel()) assertEquals(1, apiVersionsResponse.data().supportedFeatures().size()) assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion()) - assertEquals(MetadataVersion.latest().featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersion()) + assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersion()) } val expectedApis = if (!cluster.isKRaftTest) { ApiVersionsResponse.collectApis( diff --git a/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala index 3e4ac4b6771..b20478cf6f2 100644 --- a/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala @@ -48,7 +48,7 @@ import scala.jdk.CollectionConverters._ */ class AlterUserScramCredentialsRequestTest extends BaseRequestTest { - protected var testMetadataVersion = MetadataVersion.latest + protected var testMetadataVersion = MetadataVersion.latestTesting override protected def metadataVersion = testMetadataVersion @BeforeEach diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala index 6c9c3a70ca6..eeaaae1940c 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala @@ -30,8 +30,8 @@ import org.mockito.Mockito import scala.jdk.CollectionConverters._ class ApiVersionManagerTest { - private val brokerFeatures = BrokerFeatures.createDefault() - private val metadataCache = new ZkMetadataCache(1, MetadataVersion.latest(), brokerFeatures) + private val brokerFeatures = BrokerFeatures.createDefault(true) + private val metadataCache = new ZkMetadataCache(1, MetadataVersion.latestTesting(), brokerFeatures) @ParameterizedTest @EnumSource(classOf[ListenerType]) diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala index d041d68a843..c88993dc82e 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala @@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.ApiVersionsRequest import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.extension.ExtendWith @@ -37,14 +38,20 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio super.brokerPropertyOverrides(config.serverProperties()) } - @ClusterTest + @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), + )) def testApiVersionsRequest(): Unit = { val request = new ApiVersionsRequest.Builder().build() val apiVersionsResponse = sendApiVersionsRequest(request, cluster.clientListener()) validateApiVersionsResponse(apiVersionsResponse) } - @ClusterTest(serverProperties = Array(new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"))) + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), + )) def testApiVersionsRequestIncludesUnreleasedApis(): Unit = { val request = new ApiVersionsRequest.Builder().build() val apiVersionsResponse = sendApiVersionsRequest(request, cluster.clientListener()) @@ -77,7 +84,10 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion()) } - @ClusterTest + @ClusterTest(metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"), + )) def testApiVersionsRequestValidationV0(): Unit = { val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short]) val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.clientListener()) diff --git a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala index ad8786c919d..78432b71105 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala @@ -33,7 +33,7 @@ class BrokerFeaturesTest { @Test def testIncompatibilitiesDueToAbsentFeature(): Unit = { - val brokerFeatures = BrokerFeatures.createDefault() + val brokerFeatures = BrokerFeatures.createDefault(true) val supportedFeatures = Features.supportedFeatures(Map[String, SupportedVersionRange]( "test_feature_1" -> new SupportedVersionRange(1, 4), "test_feature_2" -> new SupportedVersionRange(1, 3)).asJava) @@ -51,7 +51,7 @@ class BrokerFeaturesTest { @Test def testIncompatibilitiesDueToIncompatibleFeature(): Unit = { - val brokerFeatures = BrokerFeatures.createDefault() + val brokerFeatures = BrokerFeatures.createDefault(true) val supportedFeatures = Features.supportedFeatures(Map[String, SupportedVersionRange]( "test_feature_1" -> new SupportedVersionRange(1, 4), "test_feature_2" -> new SupportedVersionRange(1, 3)).asJava) @@ -70,7 +70,7 @@ class BrokerFeaturesTest { @Test def testCompatibleFeatures(): Unit = { - val brokerFeatures = BrokerFeatures.createDefault() + val brokerFeatures = BrokerFeatures.createDefault(true) val supportedFeatures = Features.supportedFeatures(Map[String, SupportedVersionRange]( "test_feature_1" -> new SupportedVersionRange(1, 4), "test_feature_2" -> new SupportedVersionRange(1, 3)).asJava) @@ -86,7 +86,7 @@ class BrokerFeaturesTest { @Test def testDefaultFinalizedFeatures(): Unit = { - val brokerFeatures = BrokerFeatures.createDefault() + val brokerFeatures = BrokerFeatures.createDefault(true) val supportedFeatures = Features.supportedFeatures(Map[String, SupportedVersionRange]( "test_feature_1" -> new SupportedVersionRange(1, 4), "test_feature_2" -> new SupportedVersionRange(1, 3), @@ -94,7 +94,7 @@ class BrokerFeaturesTest { brokerFeatures.setSupportedFeatures(supportedFeatures) val expectedFeatures = Map[String, Short]( - MetadataVersion.FEATURE_NAME -> MetadataVersion.latest().featureLevel(), + MetadataVersion.FEATURE_NAME -> MetadataVersion.latestTesting().featureLevel(), "test_feature_1" -> 4, "test_feature_2" -> 3, "test_feature_3" -> 7) diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index 66059238e5c..3ef7344ee91 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -166,7 +166,7 @@ class ControllerApisTest { ListenerType.CONTROLLER, true, false, - () => Features.fromKRaftVersion(MetadataVersion.latest())), + () => Features.fromKRaftVersion(MetadataVersion.latestTesting())), metadataCache ) } diff --git a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala b/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala index b94bfb16c7e..98f6fa3bc8e 100644 --- a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala @@ -29,7 +29,7 @@ class FinalizedFeatureCacheTest { @Test def testEmpty(): Unit = { - assertTrue(new ZkMetadataCache(1, MetadataVersion.IBP_2_8_IV1, BrokerFeatures.createDefault()).getFeatureOption.isEmpty) + assertTrue(new ZkMetadataCache(1, MetadataVersion.IBP_2_8_IV1, BrokerFeatures.createDefault(true)).getFeatureOption.isEmpty) } def asJava(input: Map[String, Short]): java.util.Map[String, java.lang.Short] = { @@ -40,7 +40,7 @@ class FinalizedFeatureCacheTest { def testUpdateOrThrowFailedDueToInvalidEpoch(): Unit = { val supportedFeatures = Map[String, SupportedVersionRange]( "feature_1" -> new SupportedVersionRange(1, 4)) - val brokerFeatures = BrokerFeatures.createDefault() + val brokerFeatures = BrokerFeatures.createDefault(true) brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava)) val finalizedFeatures = Map[String, Short]("feature_1" -> 4) @@ -63,7 +63,7 @@ class FinalizedFeatureCacheTest { def testUpdateOrThrowFailedDueToInvalidFeatures(): Unit = { val supportedFeatures = Map[String, SupportedVersionRange]("feature_1" -> new SupportedVersionRange(1, 1)) - val brokerFeatures = BrokerFeatures.createDefault() + val brokerFeatures = BrokerFeatures.createDefault(true) brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava)) val finalizedFeatures = Map[String, Short]("feature_1" -> 2) @@ -79,7 +79,7 @@ class FinalizedFeatureCacheTest { def testUpdateOrThrowSuccess(): Unit = { val supportedFeatures = Map[String, SupportedVersionRange]("feature_1" -> new SupportedVersionRange(1, 4)) - val brokerFeatures = BrokerFeatures.createDefault() + val brokerFeatures = BrokerFeatures.createDefault(true) brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava)) val finalizedFeatures = Map[String, Short]("feature_1" -> 3) @@ -95,7 +95,7 @@ class FinalizedFeatureCacheTest { def testClear(): Unit = { val supportedFeatures = Map[String, SupportedVersionRange]("feature_1" -> new SupportedVersionRange(1, 4)) - val brokerFeatures = BrokerFeatures.createDefault() + val brokerFeatures = BrokerFeatures.createDefault(true) brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava)) val finalizedFeatures = Map[String, Short]("feature_1" -> 3) diff --git a/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala b/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala index b32a45210ff..6441f825b57 100644 --- a/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala +++ b/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala @@ -48,7 +48,7 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness { val supportedFeaturesMap = Map[String, SupportedVersionRange]( "feature_1" -> new SupportedVersionRange(1, 4), "feature_2" -> new SupportedVersionRange(1, 3)) - val brokerFeatures = BrokerFeatures.createDefault() + val brokerFeatures = BrokerFeatures.createDefault(true) brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeaturesMap.asJava)) brokerFeatures } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 81852f73d0a..218480003b4 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -124,7 +124,7 @@ class KafkaApisTest extends Logging { private val metrics = new Metrics() private val brokerId = 1 // KRaft tests should override this with a KRaftMetadataCache - private var metadataCache: MetadataCache = MetadataCache.zkMetadataCache(brokerId, MetadataVersion.latest()) + private var metadataCache: MetadataCache = MetadataCache.zkMetadataCache(brokerId, MetadataVersion.latestTesting()) private val brokerEpochManager: ZkBrokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) private val clientQuotaManager: ClientQuotaManager = mock(classOf[ClientQuotaManager]) private val clientRequestQuotaManager: ClientRequestQuotaManager = mock(classOf[ClientRequestQuotaManager]) @@ -149,7 +149,7 @@ class KafkaApisTest extends Logging { metrics.close() } - def createKafkaApis(interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest, + def createKafkaApis(interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latestTesting, authorizer: Option[Authorizer] = None, enableForwarding: Boolean = false, configRepository: ConfigRepository = new MockConfigRepository(), @@ -199,10 +199,10 @@ class KafkaApisTest extends Logging { val apiVersionManager = new SimpleApiVersionManager( listenerType, enabledApis, - BrokerFeatures.defaultSupportedFeatures(), + BrokerFeatures.defaultSupportedFeatures(true), true, false, - () => new Features(MetadataVersion.latest(), Collections.emptyMap[String, java.lang.Short], 0, raftSupport)) + () => new Features(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, raftSupport)) val clientMetricsManagerOpt = if (raftSupport) Some(clientMetricsManager) else None diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index dd2419a5636..e9426245b14 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -595,7 +595,7 @@ class KafkaConfigTest { props.setProperty(KafkaConfig.BrokerIdProp, "1") props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") val conf = KafkaConfig.fromProps(props) - assertEquals(MetadataVersion.latest, conf.interBrokerProtocolVersion) + assertEquals(MetadataVersion.latestProduction, conf.interBrokerProtocolVersion) props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.0") // We need to set the message format version to make the configuration valid. @@ -611,7 +611,7 @@ class KafkaConfigTest { assertEquals(IBP_0_8_2, conf3.interBrokerProtocolVersion) //check that latest is newer than 0.8.2 - assertTrue(MetadataVersion.latest.isAtLeast(conf3.interBrokerProtocolVersion)) + assertTrue(MetadataVersion.latestTesting.isAtLeast(conf3.interBrokerProtocolVersion)) } private def isValidKafkaConfig(props: Properties): Boolean = { diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala index e9bdb1b7336..d77271a0c81 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala @@ -83,7 +83,7 @@ class KafkaRaftServerTest { private def invokeLoadMetaProperties( metaProperties: MetaProperties, configProperties: Properties, - metadataVersion: Option[MetadataVersion] = Some(MetadataVersion.latest()) + metadataVersion: Option[MetadataVersion] = Some(MetadataVersion.latestTesting()) ): (MetaPropertiesEnsemble, BootstrapMetadata) = { val tempLogDir = TestUtils.tempDirectory() try { @@ -178,7 +178,7 @@ class KafkaRaftServerTest { setNodeId(nodeId). build()) - writeBootstrapMetadata(validDir, MetadataVersion.latest()) + writeBootstrapMetadata(validDir, MetadataVersion.latestTesting()) // Use a regular file as an invalid log dir to trigger an IO error val invalidDir = TestUtils.tempFile("blah") @@ -314,6 +314,6 @@ class KafkaRaftServerTest { assertEquals(metaProperties, metaPropertiesEnsemble.logDirProps().values().iterator().next()) assertTrue(metaPropertiesEnsemble.errorLogDirs().isEmpty()) assertTrue(metaPropertiesEnsemble.emptyLogDirs().isEmpty()) - assertEquals(bootstrapMetadata.metadataVersion(), MetadataVersion.latest()) + assertEquals(bootstrapMetadata.metadataVersion(), MetadataVersion.latestProduction()) } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala index a71120bdf53..1372690afeb 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala @@ -127,7 +127,7 @@ class KafkaServerTest extends QuorumTestHarness { @Test def testAlterIsrManager(): Unit = { val props = TestUtils.createBrokerConfigs(1, zkConnect).head - props.put(KafkaConfig.InterBrokerProtocolVersionProp, MetadataVersion.latest.toString) + props.put(KafkaConfig.InterBrokerProtocolVersionProp, MetadataVersion.latestTesting.toString) val server = TestUtils.createServer(KafkaConfig.fromProps(props)) server.replicaManager.alterPartitionManager match { diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 91d5d601169..87554990415 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -44,12 +44,12 @@ import scala.jdk.CollectionConverters._ object MetadataCacheTest { def zkCacheProvider(): util.stream.Stream[MetadataCache] = util.stream.Stream.of[MetadataCache]( - MetadataCache.zkMetadataCache(1, MetadataVersion.latest()) + MetadataCache.zkMetadataCache(1, MetadataVersion.latestTesting()) ) def cacheProvider(): util.stream.Stream[MetadataCache] = util.stream.Stream.of[MetadataCache]( - MetadataCache.zkMetadataCache(1, MetadataVersion.latest()), + MetadataCache.zkMetadataCache(1, MetadataVersion.latestTesting()), MetadataCache.kRaftMetadataCache(1) ) @@ -954,7 +954,7 @@ class MetadataCacheTest { )( verifier: ZkMetadataCache => Unit ): Unit = { - val cache = MetadataCache.zkMetadataCache(1, MetadataVersion.latest(), zkMigrationEnabled = zkMigrationEnabled) + val cache = MetadataCache.zkMetadataCache(1, MetadataVersion.latestTesting(), zkMigrationEnabled = zkMigrationEnabled) cache.updateMetadata(1, new UpdateMetadataRequest.Builder(8, 1, 42, brokerEpoch, initialTopicStates.flatMap(_._2.values).toList.asJava, Seq.empty.asJava, initialTopicIds.asJava).build()) cache.updateMetadata(1, updateMetadataRequest) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index d5f61c93737..2118e9fffb9 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -62,7 +62,7 @@ class ReplicaAlterLogDirsThreadTest { private val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(), 0, 0, 0, partitionStates, Collections.emptyList(), topicIds.asJava).build() // TODO: support raft code? - private val metadataCache = new ZkMetadataCache(0, MetadataVersion.latest(), BrokerFeatures.createEmpty()) + private val metadataCache = new ZkMetadataCache(0, MetadataVersion.latestTesting(), BrokerFeatures.createEmpty()) metadataCache.updateMetadata(0, updateMetadataRequest) private def initialFetchState(fetchOffset: Long, leaderEpoch: Int = 1): InitialFetchState = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index 053583fd889..1369ac85a35 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -83,7 +83,7 @@ class ReplicaFetcherThreadTest { private val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(), 0, 0, 0, partitionStates, Collections.emptyList(), topicIds.asJava).build() // TODO: support raft code? - private var metadataCache = new ZkMetadataCache(0, MetadataVersion.latest(), BrokerFeatures.createEmpty()) + private var metadataCache = new ZkMetadataCache(0, MetadataVersion.latestTesting(), BrokerFeatures.createEmpty()) metadataCache.updateMetadata(0, updateMetadataRequest) private def initialFetchState(topicId: Option[Uuid], fetchOffset: Long, leaderEpoch: Int = 1): InitialFetchState = { @@ -277,7 +277,7 @@ class ReplicaFetcherThreadTest { @Test def shouldNotFetchLeaderEpochOnFirstFetchWithTruncateOnFetch(): Unit = { - verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.latest, epochFetchCount = 0) + verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.latestTesting, epochFetchCount = 0) } private def verifyFetchLeaderEpochOnFirstFetch(ibp: MetadataVersion, epochFetchCount: Int = 1): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala index ba2d1fde5c2..f31408187b6 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala @@ -162,7 +162,7 @@ class ReplicaManagerConcurrencyTest extends Logging { setClusterId(Uuid.randomUuid().toString). setNodeId(1). build() - TestUtils.formatDirectories(immutable.Seq(logDir.getAbsolutePath), metaProperties, MetadataVersion.latest(), None) + TestUtils.formatDirectories(immutable.Seq(logDir.getAbsolutePath), metaProperties, MetadataVersion.latestTesting(), None) val props = new Properties props.put(KafkaConfig.QuorumVotersProp, "100@localhost:12345") diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 7ef92a97371..6e8a61239d7 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -6175,7 +6175,7 @@ class ReplicaManagerTest { private def imageFromTopics(topicsImage: TopicsImage): MetadataImage = { val featuresImageLatest = new FeaturesImage( Collections.emptyMap(), - MetadataVersion.latest(), + MetadataVersion.latestTesting(), ZkMigrationState.NONE) new MetadataImage( new MetadataProvenance(100L, 10, 1000L), diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala index e24f348ad68..4589aa2fc2d 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala @@ -49,7 +49,7 @@ import scala.jdk.CollectionConverters._ */ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness with Logging { // Set this to IBP_0_11_0_IV1 to demonstrate the tests failing in the pre-KIP-101 case - override def metadataVersion = MetadataVersion.latest + override def metadataVersion = MetadataVersion.latestTesting val topic = "topic1" val msg = new Array[Byte](1000) val msgBigger = new Array[Byte](10000) diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index db4482e8ddb..c3cdce65cbe 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -173,13 +173,13 @@ Found problem: setNodeId(2). build() val stream = new ByteArrayOutputStream() - val bootstrapMetadata = StorageTool.buildBootstrapMetadata(MetadataVersion.latest(), None, "test format command") + val bootstrapMetadata = StorageTool.buildBootstrapMetadata(MetadataVersion.latestTesting(), None, "test format command") assertEquals(0, StorageTool. - formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latest(), ignoreFormatted = false)) + formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), ignoreFormatted = false)) assertTrue(stream.toString().startsWith("Formatting %s".format(tempDir))) try assertEquals(1, StorageTool. - formatCommand(new PrintStream(new ByteArrayOutputStream()), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latest(), ignoreFormatted = false)) catch { + formatCommand(new PrintStream(new ByteArrayOutputStream()), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), ignoreFormatted = false)) catch { case e: TerseFailure => assertEquals(s"Log directory ${tempDir} is already " + "formatted. Use --ignore-formatted to ignore this directory and format the " + "others.", e.getMessage) @@ -187,7 +187,7 @@ Found problem: val stream2 = new ByteArrayOutputStream() assertEquals(0, StorageTool. - formatCommand(new PrintStream(stream2), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latest(), ignoreFormatted = true)) + formatCommand(new PrintStream(stream2), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), ignoreFormatted = true)) assertEquals("All of the log directories are already formatted.%n".format(), stream2.toString()) } finally Utils.delete(tempDir) } @@ -379,9 +379,9 @@ Found problem: setNodeId(2). build() val bootstrapMetadata = StorageTool. - buildBootstrapMetadata(MetadataVersion.latest(), None, "test format command") + buildBootstrapMetadata(MetadataVersion.latestTesting(), None, "test format command") assertEquals(0, StorageTool. - formatCommand(new PrintStream(NullOutputStream.NULL_OUTPUT_STREAM), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latest(), ignoreFormatted = false)) + formatCommand(new PrintStream(NullOutputStream.NULL_OUTPUT_STREAM), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), ignoreFormatted = false)) val metaPropertiesFile = Paths.get(tempDir.toURI).resolve(MetaPropertiesEnsemble.META_PROPERTIES_NAME).toFile assertTrue(metaPropertiesFile.exists()) @@ -416,7 +416,7 @@ Found problem: } val args = Array("format", "-c", s"${propsFile.toPath}", "-t", "XcZZOzUqS4yHOjhMQB6JLQ", - "--release-version", MetadataVersion.latest().toString) + "--release-version", MetadataVersion.latestTesting().toString) try { StorageTool.main(args) } catch { @@ -428,7 +428,7 @@ Found problem: assertEquals("", exitString) assertEquals(0, exitStatus) } else { - assertEquals(s"Metadata version ${MetadataVersion.latest().toString} is not ready for " + + assertEquals(s"Metadata version ${MetadataVersion.latestTesting().toString} is not ready for " + "production use yet.", exitString) assertEquals(1, exitStatus) } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 83a09e75a81..7076e8384e5 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -917,7 +917,7 @@ object TestUtils extends Logging { Broker(b.id, Seq(EndPoint("localhost", 6667, listenerName, protocol)), if (b.rack.isPresent) Some(b.rack.get()) else None) } brokers.foreach(b => zkClient.registerBroker(BrokerInfo(Broker(b.id, b.endPoints, rack = b.rack), - MetadataVersion.latest, jmxPort = -1))) + MetadataVersion.latestTesting, jmxPort = -1))) brokers } @@ -1443,7 +1443,7 @@ object TestUtils extends Logging { configRepository: ConfigRepository = new MockConfigRepository, cleanerConfig: CleanerConfig = new CleanerConfig(false), time: MockTime = new MockTime(), - interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest, + interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latestTesting, recoveryThreadsPerDataDir: Int = 4, transactionVerificationEnabled: Boolean = false, log: Option[UnifiedLog] = None, diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index b5f6e5b65fa..131b46d5671 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -812,7 +812,7 @@ class KafkaZkClientTest extends QuorumTestHarness { Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)), rack = rack, features = features), - MetadataVersion.latest, jmxPort = port + 10) + MetadataVersion.latestTesting, jmxPort = port + 10) @Test def testRegisterBrokerInfo(): Unit = { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 9275e8775fc..82bb4c52341 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -637,7 +637,7 @@ public class GroupMetadataManagerTest { .build()); assertEquals( - Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())), + Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())), syncResult.records ); // Simulate a successful write to the log. @@ -841,7 +841,7 @@ public class GroupMetadataManagerTest { )); assertEquals( Collections.singletonList( - RecordHelpers.newGroupMetadataRecord(group, groupAssignment, MetadataVersion.latest())), + RecordHelpers.newGroupMetadataRecord(group, groupAssignment, MetadataVersion.latestTesting())), leaderSyncResult.records ); @@ -901,7 +901,7 @@ public class GroupMetadataManagerTest { // Now the group is stable, with the one member that joined above assertEquals( - Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())), + Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())), syncResult.records ); // Simulate a successful write to log. @@ -939,7 +939,7 @@ public class GroupMetadataManagerTest { syncResult = sendClassicGroupSync(syncRequest.setGenerationId(nextGenerationId)); assertEquals( - Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())), + Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())), syncResult.records ); // Simulate a successful write to log. @@ -1003,7 +1003,7 @@ public class GroupMetadataManagerTest { .setProtocolType("consumer") .setProtocol(null) .setCurrentStateTimestamp(time.milliseconds()), - MetadataVersion.latest())); + MetadataVersion.latestTesting())); Set heartbeatKeys = timeouts.stream().map(timeout -> timeout.key).collect(Collectors.toSet()); @@ -4709,7 +4709,7 @@ public class GroupMetadataManagerTest { ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("group-id", false); assertEquals( - Collections.singletonList(RecordHelpers.newEmptyGroupMetadataRecord(group, MetadataVersion.latest())), + Collections.singletonList(RecordHelpers.newEmptyGroupMetadataRecord(group, MetadataVersion.latestTesting())), joinResult.records ); } @@ -4783,7 +4783,7 @@ public class GroupMetadataManagerTest { .setProtocolType("consumer") .setProtocol("range") .setCurrentStateTimestamp(context.time.milliseconds()), - MetadataVersion.latest()); + MetadataVersion.latestTesting()); context.replay(groupMetadataRecord); ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("group-id", false); @@ -4847,7 +4847,7 @@ public class GroupMetadataManagerTest { .setProtocolType("consumer") .setProtocol("range") .setCurrentStateTimestamp(context.time.milliseconds()), - MetadataVersion.latest()); + MetadataVersion.latestTesting()); context.replay(groupMetadataRecord); context.groupMetadataManager.onLoaded(); @@ -4887,7 +4887,7 @@ public class GroupMetadataManagerTest { .setProtocolType("consumer") .setProtocol("range") .setCurrentStateTimestamp(context.time.milliseconds()), - MetadataVersion.latest()); + MetadataVersion.latestTesting()); context.replay(groupMetadataRecord); context.groupMetadataManager.onLoaded(); @@ -5826,7 +5826,7 @@ public class GroupMetadataManagerTest { timeouts.forEach(timeout -> { assertEquals(classicGroupHeartbeatKey("group-id", memberId), timeout.key); assertEquals(Collections.singletonList( - newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())), + newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())), timeout.result.records()); }); @@ -6305,7 +6305,7 @@ public class GroupMetadataManagerTest { ); assertEquals( - Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())), + Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())), joinResult.records ); assertFalse(joinResult.joinFuture.isDone()); @@ -6432,7 +6432,7 @@ public class GroupMetadataManagerTest { ); assertEquals( - Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())), + Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())), joinResult.records ); assertFalse(joinResult.joinFuture.isDone()); @@ -6508,7 +6508,7 @@ public class GroupMetadataManagerTest { supportSkippingAssignment); assertEquals( - Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())), + Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())), joinResult.records ); assertFalse(joinResult.joinFuture.isDone()); @@ -6646,7 +6646,7 @@ public class GroupMetadataManagerTest { .setProtocolType("consumer") .setProtocol(null) .setCurrentStateTimestamp(context.time.milliseconds()), - MetadataVersion.latest()) + MetadataVersion.latestTesting()) ); assertEquals(1, timeouts.size()); @@ -7076,7 +7076,7 @@ public class GroupMetadataManagerTest { ); assertEquals( - Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())), + Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())), joinResult.records ); // Simulate a successful write to the log. @@ -7389,7 +7389,7 @@ public class GroupMetadataManagerTest { ); assertEquals( - Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())), + Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())), followerJoinResult.records ); // Simulate a failed write to the log. @@ -7446,7 +7446,7 @@ public class GroupMetadataManagerTest { leaderSyncResult.appendFuture.complete(null); assertEquals( - Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())), + Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())), leaderSyncResult.records ); @@ -7496,7 +7496,7 @@ public class GroupMetadataManagerTest { ); assertEquals( - Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())), + Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())), followerJoinResult.records ); @@ -7708,7 +7708,7 @@ public class GroupMetadataManagerTest { ); assertEquals( - Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())), + Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())), followerJoinResult.records ); // Simulate a successful write to log. @@ -7915,7 +7915,7 @@ public class GroupMetadataManagerTest { ); assertEquals( - Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())), + Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())), leaderJoinResult.records ); // Simulate a successful write to log. @@ -8696,7 +8696,7 @@ public class GroupMetadataManagerTest { assertEquals( Collections.singletonList( - RecordHelpers.newGroupMetadataRecord(group, updatedAssignment, MetadataVersion.latest())), + RecordHelpers.newGroupMetadataRecord(group, updatedAssignment, MetadataVersion.latestTesting())), syncResult.records ); @@ -9357,7 +9357,7 @@ public class GroupMetadataManagerTest { ExpiredTimeout timeout = timeouts.get(0); assertEquals(classicGroupSyncKey("group-id"), timeout.key); assertEquals( - Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())), + Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())), timeout.result.records() ); @@ -9513,7 +9513,7 @@ public class GroupMetadataManagerTest { if (response.memberId().equals(leaderId)) { assertEquals( - Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())), + Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())), syncResult.records ); @@ -9607,7 +9607,7 @@ public class GroupMetadataManagerTest { .setProtocolType(classicGroupType) .setProtocol("range") .setCurrentStateTimestamp(context.time.milliseconds()), - MetadataVersion.latest())); + MetadataVersion.latestTesting())); context.commit(); ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false); context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, new ConsumerGroupMember.Builder(memberId1) @@ -9794,7 +9794,7 @@ public class GroupMetadataManagerTest { context.replay(newGroupMetadataRecord( "group-id", groupMetadataValue, - MetadataVersion.latest() + MetadataVersion.latestTesting() )); context.verifyDescribeGroupsReturnsDeadGroup("group-id"); context.commit(); @@ -9843,7 +9843,7 @@ public class GroupMetadataManagerTest { context.replay(newGroupMetadataRecord( "group-id", groupMetadataValue, - MetadataVersion.latest() + MetadataVersion.latestTesting() )); ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("group-id", false); context.groupMetadataManager.prepareRebalance(group, "trigger rebalance"); @@ -10264,7 +10264,7 @@ public class GroupMetadataManagerTest { )) ); assertEquals( - Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())), + Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())), leaveResult.records() ); // Simulate a successful write to the log. diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index 8731cca0862..1d2c269f24c 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -380,7 +380,7 @@ public class OffsetMetadataManagerTest { commitTimestamp, OptionalLong.empty() ), - MetadataVersion.latest() + MetadataVersion.latestTesting() )); } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index 7a91a611a1a..b18bfb14979 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -149,7 +149,7 @@ public class ReplicaFetcherThreadBenchmark { setFlushStartOffsetCheckpointMs(10000L). setRetentionCheckMs(1000L). setProducerStateManagerConfig(60000, false). - setInterBrokerProtocolVersion(MetadataVersion.latest()). + setInterBrokerProtocolVersion(MetadataVersion.latestTesting()). setScheduler(scheduler). setBrokerTopicStats(brokerTopicStats). setLogDirFailureChannel(logDirFailureChannel). @@ -178,7 +178,7 @@ public class ReplicaFetcherThreadBenchmark { OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class); Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Option.apply(0L)); AlterPartitionManager isrChannelManager = Mockito.mock(AlterPartitionManager.class); - Partition partition = new Partition(tp, 100, MetadataVersion.latest(), + Partition partition = new Partition(tp, 100, MetadataVersion.latestTesting(), 0, () -> -1, Time.SYSTEM, alterPartitionListener, new DelayedOperationsMock(tp), Mockito.mock(MetadataCache.class), logManager, isrChannelManager); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java index b3ab13fcb43..6be5bcbd3a8 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java @@ -203,7 +203,7 @@ public class KRaftMetadataRequestBenchmark { ApiMessageType.ListenerType.BROKER, false, false, - () -> Features.fromKRaftVersion(MetadataVersion.latest()))). + () -> Features.fromKRaftVersion(MetadataVersion.latestTesting()))). build(); } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java index 2cf574a1403..189dcb3d559 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java @@ -113,7 +113,7 @@ public class MetadataRequestBenchmark { private Metrics metrics = new Metrics(); private int brokerId = 1; private ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId, - MetadataVersion.latest(), BrokerFeatures.createEmpty(), null, false); + MetadataVersion.latestTesting(), BrokerFeatures.createEmpty(), null, false); private ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class); private ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class); private ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class); @@ -204,7 +204,7 @@ public class MetadataRequestBenchmark { ApiMessageType.ListenerType.ZK_BROKER, false, false, - () -> Features.fromKRaftVersion(MetadataVersion.latest()))). + () -> Features.fromKRaftVersion(MetadataVersion.latestTesting()))). build(); } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java index b9f091a9a7c..77585f161b1 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java @@ -108,7 +108,7 @@ public class PartitionMakeFollowerBenchmark { setFlushStartOffsetCheckpointMs(10000L). setRetentionCheckMs(1000L). setProducerStateManagerConfig(60000, false). - setInterBrokerProtocolVersion(MetadataVersion.latest()). + setInterBrokerProtocolVersion(MetadataVersion.latestTesting()). setScheduler(scheduler). setBrokerTopicStats(brokerTopicStats). setLogDirFailureChannel(logDirFailureChannel). @@ -122,7 +122,7 @@ public class PartitionMakeFollowerBenchmark { AlterPartitionListener alterPartitionListener = Mockito.mock(AlterPartitionListener.class); AlterPartitionManager alterPartitionManager = Mockito.mock(AlterPartitionManager.class); partition = new Partition(tp, 100, - MetadataVersion.latest(), 0, () -> -1, Time.SYSTEM, + MetadataVersion.latestTesting(), 0, () -> -1, Time.SYSTEM, alterPartitionListener, delayedOperations, Mockito.mock(MetadataCache.class), logManager, alterPartitionManager); partition.createLogIfNotExists(true, false, offsetCheckpoints, topicId, Option.empty()); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java index 48befe4be4d..010602c9f0c 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java @@ -98,7 +98,7 @@ public class UpdateFollowerFetchStateBenchmark { setFlushStartOffsetCheckpointMs(10000L). setRetentionCheckMs(1000L). setProducerStateManagerConfig(60000, false). - setInterBrokerProtocolVersion(MetadataVersion.latest()). + setInterBrokerProtocolVersion(MetadataVersion.latestTesting()). setScheduler(scheduler). setBrokerTopicStats(brokerTopicStats). setLogDirFailureChannel(logDirFailureChannel). @@ -125,7 +125,7 @@ public class UpdateFollowerFetchStateBenchmark { AlterPartitionListener alterPartitionListener = Mockito.mock(AlterPartitionListener.class); AlterPartitionManager alterPartitionManager = Mockito.mock(AlterPartitionManager.class); partition = new Partition(topicPartition, 100, - MetadataVersion.latest(), 0, () -> -1, Time.SYSTEM, + MetadataVersion.latestTesting(), 0, () -> -1, Time.SYSTEM, alterPartitionListener, delayedOperations, Mockito.mock(MetadataCache.class), logManager, alterPartitionManager); partition.makeLeader(partitionState, offsetCheckpoints, topicId, Option.empty()); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java index e96ec08e1ab..7cd8719bf59 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java @@ -54,7 +54,7 @@ public class CompressedRecordBatchValidationBenchmark extends BaseRecordBatchBen new LogValidator(records, new TopicPartition("a", 0), Time.SYSTEM, compressionType, compressionType, false, messageVersion, TimestampType.CREATE_TIME, Long.MAX_VALUE, Long.MAX_VALUE, 0, AppendOrigin.CLIENT, - MetadataVersion.latest() + MetadataVersion.latestTesting() ).validateMessagesAndAssignOffsetsCompressed(PrimitiveRef.ofLong(startingOffset), validatorMetricsRecorder, requestLocal.bufferSupplier()); } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/UncompressedRecordBatchValidationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/UncompressedRecordBatchValidationBenchmark.java index fda12efa6c4..18e73eae2e1 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/UncompressedRecordBatchValidationBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/UncompressedRecordBatchValidationBenchmark.java @@ -50,7 +50,7 @@ public class UncompressedRecordBatchValidationBenchmark extends BaseRecordBatchB new LogValidator(records, new TopicPartition("a", 0), Time.SYSTEM, CompressionType.NONE, CompressionType.NONE, false, messageVersion, TimestampType.CREATE_TIME, Long.MAX_VALUE, Long.MAX_VALUE, 0, AppendOrigin.CLIENT, - MetadataVersion.latest() + MetadataVersion.latestTesting() ).assignOffsetsNonCompressed(PrimitiveRef.ofLong(startingOffset), validatorMetricsRecorder); } } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java index 0456ecda142..bb98f8c5be3 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java @@ -109,7 +109,7 @@ public class CheckpointBench { JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList()); this.logManager = TestUtils.createLogManager(JavaConverters.asScalaBuffer(files), new LogConfig(new Properties()), new MockConfigRepository(), new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d, - 1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, MetadataVersion.latest(), 4, false, Option.empty(), false); + 1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, MetadataVersion.latestTesting(), 4, false, Option.empty(), false); scheduler.startup(); final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty()); final MetadataCache metadataCache = diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java index 8972e44b42e..c40939235ff 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java @@ -135,7 +135,7 @@ public class PartitionCreationBench { setFlushStartOffsetCheckpointMs(10000L). setRetentionCheckMs(1000L). setProducerStateManagerConfig(60000, false). - setInterBrokerProtocolVersion(MetadataVersion.latest()). + setInterBrokerProtocolVersion(MetadataVersion.latestTesting()). setScheduler(scheduler). setBrokerTopicStats(brokerTopicStats). setLogDirFailureChannel(failureChannel). diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java index 549d99ca425..d5911d4f54f 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java @@ -54,7 +54,7 @@ public class FeatureControlManager { private LogContext logContext = null; private SnapshotRegistry snapshotRegistry = null; private QuorumFeatures quorumFeatures = null; - private MetadataVersion metadataVersion = MetadataVersion.latest(); + private MetadataVersion metadataVersion = MetadataVersion.latestProduction(); private MetadataVersion minimumBootstrapVersion = MetadataVersion.MINIMUM_BOOTSTRAP_VERSION; private ClusterFeatureSupportDescriber clusterSupportDescriber = new ClusterFeatureSupportDescriber() { @Override @@ -105,7 +105,7 @@ public class FeatureControlManager { Map localSupportedFeatures = new HashMap<>(); localSupportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), - MetadataVersion.latest().featureLevel())); + MetadataVersion.latestProduction().featureLevel())); quorumFeatures = new QuorumFeatures(0, localSupportedFeatures, Collections.singletonList(0)); } return new FeatureControlManager( diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java index aadb27e937e..9b0355c0db7 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java @@ -59,8 +59,8 @@ public final class QuorumFeatures { features.put(MetadataVersion.FEATURE_NAME, VersionRange.of( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), enableUnstable ? - MetadataVersion.latest().featureLevel() : - MetadataVersion.LATEST_PRODUCTION.featureLevel())); + MetadataVersion.latestTesting().featureLevel() : + MetadataVersion.latestProduction().featureLevel())); return features; } diff --git a/metadata/src/main/java/org/apache/kafka/image/writer/ImageWriterOptions.java b/metadata/src/main/java/org/apache/kafka/image/writer/ImageWriterOptions.java index a9560d22728..b89b93a0155 100644 --- a/metadata/src/main/java/org/apache/kafka/image/writer/ImageWriterOptions.java +++ b/metadata/src/main/java/org/apache/kafka/image/writer/ImageWriterOptions.java @@ -35,7 +35,7 @@ public final class ImageWriterOptions { }; public Builder() { - this.metadataVersion = MetadataVersion.latest(); + this.metadataVersion = MetadataVersion.latestProduction(); } public Builder(MetadataImage image) { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/ControllerRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/ControllerRegistration.java index cd2f43cc3e5..37246c3f252 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/ControllerRegistration.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/ControllerRegistration.java @@ -112,7 +112,7 @@ public class ControllerRegistration { supportedFeatures = new HashMap<>(); supportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), - MetadataVersion.latest().featureLevel())); + MetadataVersion.latestProduction().featureLevel())); } return new ControllerRegistration(id, incarnationId, diff --git a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java index a7cd382dc16..b238a3d95e5 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java @@ -82,7 +82,7 @@ public class BootstrapDirectory { BootstrapMetadata readFromConfiguration() { if (!ibp.isPresent()) { - return BootstrapMetadata.fromVersion(MetadataVersion.latest(), "the default bootstrap"); + return BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(), "the default bootstrap"); } MetadataVersion version = MetadataVersion.fromVersionString(ibp.get()); if (version.isLessThan(MINIMUM_BOOTSTRAP_VERSION)) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java index 20e688d6413..cc3a7aa3935 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java @@ -280,7 +280,7 @@ public class ClusterControlManagerTest { MetadataVersion.IBP_3_3_IV2, MetadataVersion.IBP_3_3_IV3, MetadataVersion.IBP_3_7_IV2, // introduces directory assignment - MetadataVersion.latest() + MetadataVersion.latestTesting() ).map(Arguments::of); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java index c2be58a519f..b5f2239cd5a 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java @@ -384,7 +384,7 @@ public class FeatureControlManagerTest { public void testCreateFeatureLevelRecords() { Map localSupportedFeatures = new HashMap<>(); localSupportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of( - MetadataVersion.IBP_3_0_IV1.featureLevel(), MetadataVersion.latest().featureLevel())); + MetadataVersion.IBP_3_0_IV1.featureLevel(), MetadataVersion.latestTesting().featureLevel())); localSupportedFeatures.put("foo", VersionRange.of(0, 2)); FeatureControlManager manager = new FeatureControlManager.Builder(). setQuorumFeatures(new QuorumFeatures(0, localSupportedFeatures, emptyList())). diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java index 6c8ef5268bc..cb00a3830d2 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java @@ -1057,7 +1057,7 @@ public class PartitionChangeBuilderTest { setPartitionEpoch(200). build(); Optional built = new PartitionChangeBuilder(registration, FOO_ID, - 0, r -> true, MetadataVersion.latest(), 2). + 0, r -> true, MetadataVersion.latestTesting(), 2). setDirectory(3, Uuid.fromString("pN1VKs9zRzK4APflpegAVg")). setDirectory(1, DirectoryId.LOST). setDefaultDirProvider(DEFAULT_DIR_PROVIDER). diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java index e43f1327960..9fbb8ee855c 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java @@ -54,7 +54,7 @@ public class QuorumControllerIntegrationTestUtils { private final static Logger log = LoggerFactory.getLogger(QuorumControllerIntegrationTestUtils.class); BrokerRegistrationRequestData.FeatureCollection brokerFeatures() { - return brokerFeatures(MetadataVersion.MINIMUM_KRAFT_VERSION, MetadataVersion.latest()); + return brokerFeatures(MetadataVersion.MINIMUM_KRAFT_VERSION, MetadataVersion.latestTesting()); } /** @@ -94,7 +94,7 @@ public class QuorumControllerIntegrationTestUtils { .setBrokerId(brokerId) .setRack(null) .setClusterId(controller.clusterId()) - .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latest())) + .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting())) .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId)) .setLogDirs(Collections.singletonList( Uuid.fromString("TESTBROKER" + Integer.toString(100000 + brokerId).substring(1) + "DIRAAAA") diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java index 3ae8cc1ed0d..f340d27925b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java @@ -50,7 +50,7 @@ public class QuorumControllerTestEnv implements AutoCloseable { private OptionalLong sessionTimeoutMillis = OptionalLong.empty(); private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty(); private BootstrapMetadata bootstrapMetadata = BootstrapMetadata. - fromVersion(MetadataVersion.latest(), "test-provided version"); + fromVersion(MetadataVersion.latestTesting(), "test-provided version"); public Builder(LocalLogManagerTestEnv logEnv) { this.logEnv = logEnv; diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java index 1e0259e9661..e0bab331e67 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java @@ -63,7 +63,7 @@ public class QuorumFeaturesTest { Map expectedFeatures = new HashMap<>(1); expectedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), - MetadataVersion.latest().featureLevel())); + MetadataVersion.latestTesting().featureLevel())); assertEquals(expectedFeatures, QuorumFeatures.defaultFeatureMap(true)); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 6ac769043e2..bf7f6c82e05 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -166,7 +166,7 @@ public class ReplicationControlManagerTest { private static class ReplicationControlTestContext { private static class Builder { private Optional createTopicPolicy = Optional.empty(); - private MetadataVersion metadataVersion = MetadataVersion.latest(); + private MetadataVersion metadataVersion = MetadataVersion.latestTesting(); private MockTime mockTime = new MockTime(); private boolean isElrEnabled = false; @@ -1623,7 +1623,7 @@ public class ReplicationControlManagerTest { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION) public void testReassignPartitions(short version) throws Exception { - MetadataVersion metadataVersion = MetadataVersion.latest(); + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() .setMetadataVersion(metadataVersion) .build(); @@ -1699,7 +1699,7 @@ public class ReplicationControlManagerTest { )). setLeader(3). setRemovingReplicas(Collections.emptyList()). - setAddingReplicas(Collections.emptyList()), MetadataVersion.latest().partitionChangeRecordVersion())), + setAddingReplicas(Collections.emptyList()), MetadataVersion.latestTesting().partitionChangeRecordVersion())), new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList( new ReassignableTopicResponse().setName("foo").setPartitions(asList( new ReassignablePartitionResponse().setPartitionIndex(0). @@ -1960,7 +1960,7 @@ public class ReplicationControlManagerTest { @Test public void testCancelReassignPartitions() throws Exception { - MetadataVersion metadataVersion = MetadataVersion.latest(); + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() .setMetadataVersion(metadataVersion) .build(); @@ -2094,7 +2094,7 @@ public class ReplicationControlManagerTest { Uuid.fromString("TESTBROKER00004DIRAAAA") )). setRemovingReplicas(null). - setAddingReplicas(Collections.emptyList()), MetadataVersion.latest().partitionChangeRecordVersion())), + setAddingReplicas(Collections.emptyList()), MetadataVersion.latestTesting().partitionChangeRecordVersion())), new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList( new ReassignableTopicResponse().setName("foo").setPartitions(asList( new ReassignablePartitionResponse().setPartitionIndex(0). @@ -2435,13 +2435,13 @@ public class ReplicationControlManagerTest { setPartitionId(0). setTopicId(fooId). setLeader(1), - MetadataVersion.latest().partitionChangeRecordVersion()), + MetadataVersion.latestTesting().partitionChangeRecordVersion()), new ApiMessageAndVersion( new PartitionChangeRecord(). setPartitionId(2). setTopicId(fooId). setLeader(0), - MetadataVersion.latest().partitionChangeRecordVersion())), + MetadataVersion.latestTesting().partitionChangeRecordVersion())), election2Result.records()); } @@ -2484,7 +2484,7 @@ public class ReplicationControlManagerTest { .setPartitionId(0) .setTopicId(fooId) .setLeader(1); - assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latest().partitionChangeRecordVersion())), balanceResult.records()); + assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latestTesting().partitionChangeRecordVersion())), balanceResult.records()); assertTrue(replication.arePartitionLeadersImbalanced()); assertFalse(balanceResult.response()); @@ -2516,7 +2516,7 @@ public class ReplicationControlManagerTest { .setPartitionId(2) .setTopicId(fooId) .setLeader(0); - assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latest().partitionChangeRecordVersion())), balanceResult.records()); + assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latestTesting().partitionChangeRecordVersion())), balanceResult.records()); assertFalse(replication.arePartitionLeadersImbalanced()); assertFalse(balanceResult.response()); } diff --git a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java index 1ec1d24b651..bc5149a2172 100644 --- a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java @@ -51,7 +51,7 @@ public class FeaturesImageTest { map1.put("foo", (short) 2); map1.put("bar", (short) 1); map1.put("baz", (short) 8); - IMAGE1 = new FeaturesImage(map1, MetadataVersion.latest(), ZkMigrationState.NONE); + IMAGE1 = new FeaturesImage(map1, MetadataVersion.latestTesting(), ZkMigrationState.NONE); DELTA1_RECORDS = new ArrayList<>(); DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord(). @@ -69,7 +69,7 @@ public class FeaturesImageTest { Map map2 = new HashMap<>(); map2.put("foo", (short) 3); - IMAGE2 = new FeaturesImage(map2, MetadataVersion.latest(), ZkMigrationState.NONE); + IMAGE2 = new FeaturesImage(map2, MetadataVersion.latestTesting(), ZkMigrationState.NONE); } @Test diff --git a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java index e52edd61fc3..4f5ff6ec8ff 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java @@ -73,7 +73,7 @@ public class BootstrapDirectoryTest { @Test public void testReadFromEmptyConfiguration() throws Exception { try (BootstrapTestDirectory testDirectory = new BootstrapTestDirectory().createDirectory()) { - assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.latest(), + assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(), "the default bootstrap"), new BootstrapDirectory(testDirectory.path(), Optional.empty()).read()); } diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 9eecc9f345a..d38ecff1957 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -582,10 +582,15 @@ public enum MetadataVersion { } } - public static MetadataVersion latest() { + // Testing only + public static MetadataVersion latestTesting() { return VERSIONS[VERSIONS.length - 1]; } + public static MetadataVersion latestProduction() { + return LATEST_PRODUCTION; + } + public static boolean checkIfMetadataChanged(MetadataVersion sourceVersion, MetadataVersion targetVersion) { if (sourceVersion == targetVersion) { return false; diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java index 6acd0b23e2d..add6a88b49e 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java @@ -435,9 +435,9 @@ class MetadataVersionTest { @Test public void assertLatestProductionIsLessThanLatest() { - assertTrue(LATEST_PRODUCTION.ordinal() < MetadataVersion.latest().ordinal(), + assertTrue(LATEST_PRODUCTION.ordinal() < MetadataVersion.latestTesting().ordinal(), "Expected LATEST_PRODUCTION " + LATEST_PRODUCTION + - " to be less than the latest of " + MetadataVersion.latest()); + " to be less than the latest of " + MetadataVersion.latestTesting()); } @Test @@ -447,6 +447,6 @@ class MetadataVersionTest { @Test public void assertLatestIsNotProduction() { - assertFalse(MetadataVersion.latest().isProduction()); + assertFalse(MetadataVersion.latestTesting().isProduction()); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java index 67e774acc5d..ef3e4363a24 100644 --- a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java @@ -244,7 +244,7 @@ public class FeatureCommand { } catch (Throwable e) { throw new TerseException("Unsupported metadata version " + metadata + ". Supported metadata versions are " + metadataVersionsToString( - MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, MetadataVersion.latest())); + MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, MetadataVersion.latestProduction())); } updates.put(MetadataVersion.FEATURE_NAME, new FeatureUpdate(version.featureLevel(), upgradeType)); }