mirror of https://github.com/apache/kafka.git
KAFKA-16078: Be more consistent about getting the latest MetadataVersion
This PR creates MetadataVersion.latestTesting to represent the highest metadata version (which may be unstable) and MetadataVersion.latestProduction to represent the latest version that should be used in production. It fixes a few cases where the broker was advertising that it supported the testing versions even when unstable metadata versions had not been configured. Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
parent
2ff3ae5bed
commit
3c92330274
|
|
@ -48,7 +48,7 @@ public class LogManagerBuilder {
|
|||
private int maxTransactionTimeoutMs = 15 * 60 * 1000;
|
||||
private ProducerStateManagerConfig producerStateManagerConfig = new ProducerStateManagerConfig(60000, false);
|
||||
private int producerIdExpirationCheckIntervalMs = 600000;
|
||||
private MetadataVersion interBrokerProtocolVersion = MetadataVersion.latest();
|
||||
private MetadataVersion interBrokerProtocolVersion = MetadataVersion.latestProduction();
|
||||
private Scheduler scheduler = null;
|
||||
private BrokerTopicStats brokerTopicStats = null;
|
||||
private LogDirFailureChannel logDirFailureChannel = null;
|
||||
|
|
|
|||
|
|
@ -715,7 +715,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
|
|||
def appendAsLeader(records: MemoryRecords,
|
||||
leaderEpoch: Int,
|
||||
origin: AppendOrigin = AppendOrigin.CLIENT,
|
||||
interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest,
|
||||
interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latestProduction,
|
||||
requestLocal: RequestLocal = RequestLocal.NoCaching,
|
||||
verificationGuard: VerificationGuard = VerificationGuard.SENTINEL): LogAppendInfo = {
|
||||
val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER
|
||||
|
|
@ -732,7 +732,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
|
|||
def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
|
||||
append(records,
|
||||
origin = AppendOrigin.REPLICATION,
|
||||
interBrokerProtocolVersion = MetadataVersion.latest,
|
||||
interBrokerProtocolVersion = MetadataVersion.latestProduction,
|
||||
validateAndAssignOffsets = false,
|
||||
leaderEpoch = -1,
|
||||
requestLocal = None,
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ class SimpleApiVersionManager(
|
|||
this(
|
||||
listenerType,
|
||||
ApiKeys.apisForListener(listenerType).asScala,
|
||||
BrokerFeatures.defaultSupportedFeatures(),
|
||||
BrokerFeatures.defaultSupportedFeatures(enableUnstableLastVersion),
|
||||
enableUnstableLastVersion,
|
||||
zkMigrationEnabled,
|
||||
featuresProvider
|
||||
|
|
|
|||
|
|
@ -70,14 +70,21 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte
|
|||
|
||||
object BrokerFeatures extends Logging {
|
||||
|
||||
def createDefault(): BrokerFeatures = {
|
||||
new BrokerFeatures(defaultSupportedFeatures())
|
||||
def createDefault(unstableMetadataVersionsEnabled: Boolean): BrokerFeatures = {
|
||||
new BrokerFeatures(defaultSupportedFeatures(unstableMetadataVersionsEnabled))
|
||||
}
|
||||
|
||||
def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
|
||||
def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): Features[SupportedVersionRange] = {
|
||||
Features.supportedFeatures(
|
||||
java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
|
||||
new SupportedVersionRange(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.latest().featureLevel())))
|
||||
new SupportedVersionRange(
|
||||
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
|
||||
if (unstableMetadataVersionsEnabled) {
|
||||
MetadataVersion.latestTesting.featureLevel
|
||||
} else {
|
||||
MetadataVersion.latestProduction.featureLevel
|
||||
}
|
||||
)))
|
||||
}
|
||||
|
||||
def createEmpty(): BrokerFeatures = {
|
||||
|
|
|
|||
|
|
@ -140,7 +140,7 @@ class BrokerServer(
|
|||
|
||||
var brokerMetadataPublisher: BrokerMetadataPublisher = _
|
||||
|
||||
val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
|
||||
val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault(config.unstableMetadataVersionsEnabled)
|
||||
|
||||
def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
|
||||
|
||||
|
|
|
|||
|
|
@ -152,7 +152,7 @@ object Defaults {
|
|||
val LeaderImbalancePerBrokerPercentage = 10
|
||||
val LeaderImbalanceCheckIntervalSeconds = 300
|
||||
val InterBrokerSecurityProtocol = SecurityProtocol.PLAINTEXT.toString
|
||||
val InterBrokerProtocolVersion = MetadataVersion.latest.version
|
||||
val InterBrokerProtocolVersion = MetadataVersion.latestProduction.version
|
||||
|
||||
/** ********* Controlled shutdown configuration ***********/
|
||||
val ControlledShutdownMaxRetries = 3
|
||||
|
|
|
|||
|
|
@ -155,7 +155,7 @@ public class ClusterConfig {
|
|||
}
|
||||
|
||||
public static Builder defaultClusterBuilder() {
|
||||
return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latest());
|
||||
return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latestTesting());
|
||||
}
|
||||
|
||||
public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart,
|
||||
|
|
|
|||
|
|
@ -199,6 +199,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
|||
props.putAll(brokerNode.propertyOverrides());
|
||||
}
|
||||
props.putIfAbsent(KafkaConfig$.MODULE$.UnstableMetadataVersionsEnableProp(), "true");
|
||||
props.putIfAbsent(KafkaConfig$.MODULE$.UnstableApiVersionsEnableProp(), "true");
|
||||
return new KafkaConfig(props, false, Option.empty());
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ public class TestKitNodes {
|
|||
private boolean combined = false;
|
||||
private Uuid clusterId = null;
|
||||
private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
|
||||
fromVersion(MetadataVersion.latest(), "testkit");
|
||||
fromVersion(MetadataVersion.latestTesting(), "testkit");
|
||||
private final NavigableMap<Integer, ControllerNode.Builder> controllerNodeBuilders = new TreeMap<>();
|
||||
private final NavigableMap<Integer, BrokerNode.Builder> brokerNodeBuilders = new TreeMap<>();
|
||||
|
||||
|
|
|
|||
|
|
@ -173,7 +173,7 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging {
|
|||
zkClient.createTopLevelPaths()
|
||||
val securityProtocol = SecurityProtocol.PLAINTEXT
|
||||
val endpoint = new EndPoint("localhost", 9092, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
|
||||
val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), MetadataVersion.latest, jmxPort = 9192)
|
||||
val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), MetadataVersion.latestTesting, jmxPort = 9192)
|
||||
zkClient.registerBroker(brokerInfo)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -923,12 +923,12 @@ class KRaftClusterTest {
|
|||
try {
|
||||
admin.updateFeatures(
|
||||
Map(MetadataVersion.FEATURE_NAME ->
|
||||
new FeatureUpdate(MetadataVersion.latest().featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions
|
||||
new FeatureUpdate(MetadataVersion.latestTesting().featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions
|
||||
)
|
||||
} finally {
|
||||
admin.close()
|
||||
}
|
||||
TestUtils.waitUntilTrue(() => cluster.brokers().get(1).metadataCache.currentImage().features().metadataVersion().equals(MetadataVersion.latest()),
|
||||
TestUtils.waitUntilTrue(() => cluster.brokers().get(1).metadataCache.currentImage().features().metadataVersion().equals(MetadataVersion.latestTesting()),
|
||||
"Timed out waiting for metadata version update.")
|
||||
} finally {
|
||||
cluster.close()
|
||||
|
|
|
|||
|
|
@ -37,8 +37,8 @@ class MetadataRequestBetweenDifferentIbpTest extends BaseRequestTest {
|
|||
override def generateConfigs: Seq[KafkaConfig] = {
|
||||
Seq(
|
||||
createConfig(0, IBP_2_8_IV0),
|
||||
createConfig(1, MetadataVersion.latest),
|
||||
createConfig(2, MetadataVersion.latest)
|
||||
createConfig(1, MetadataVersion.latestTesting),
|
||||
createConfig(2, MetadataVersion.latestTesting)
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -74,8 +74,8 @@ class MetadataVersionIntegrationTest {
|
|||
val admin = clusterInstance.createAdminClient()
|
||||
val describeResult = admin.describeFeatures()
|
||||
val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
|
||||
assertEquals(ff.minVersionLevel(), MetadataVersion.latest().featureLevel(),
|
||||
assertEquals(ff.minVersionLevel(), MetadataVersion.latestTesting().featureLevel(),
|
||||
"If this test fails, check the default MetadataVersion in the @ClusterTest annotation")
|
||||
assertEquals(ff.maxVersionLevel(), MetadataVersion.latest().featureLevel())
|
||||
assertEquals(ff.maxVersionLevel(), MetadataVersion.latestTesting().featureLevel())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -175,7 +175,7 @@ abstract class QuorumTestHarness extends Logging {
|
|||
Seq(new Properties())
|
||||
}
|
||||
|
||||
protected def metadataVersion: MetadataVersion = MetadataVersion.latest()
|
||||
protected def metadataVersion: MetadataVersion = MetadataVersion.latestTesting()
|
||||
|
||||
private var testInfo: TestInfo = _
|
||||
private var implementation: QuorumImplementation = _
|
||||
|
|
|
|||
|
|
@ -95,7 +95,7 @@ class AbstractPartitionTest {
|
|||
.thenReturn(None)
|
||||
}
|
||||
|
||||
protected def interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest
|
||||
protected def interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latestTesting
|
||||
|
||||
def createLogProperties(overrides: Map[String, String]): Properties = {
|
||||
val logProps = new Properties()
|
||||
|
|
|
|||
|
|
@ -271,7 +271,7 @@ class PartitionLockTest extends Logging {
|
|||
logManager.startup(Set.empty)
|
||||
val partition = new Partition(topicPartition,
|
||||
replicaLagTimeMaxMs = kafka.server.Defaults.ReplicaLagTimeMaxMs,
|
||||
interBrokerProtocolVersion = MetadataVersion.latest,
|
||||
interBrokerProtocolVersion = MetadataVersion.latestTesting,
|
||||
localBrokerId = brokerId,
|
||||
() => 1L,
|
||||
mockTime,
|
||||
|
|
|
|||
|
|
@ -417,7 +417,7 @@ class PartitionTest extends AbstractPartitionTest {
|
|||
partition = new Partition(
|
||||
topicPartition,
|
||||
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
|
||||
interBrokerProtocolVersion = MetadataVersion.latest,
|
||||
interBrokerProtocolVersion = MetadataVersion.latestTesting,
|
||||
localBrokerId = brokerId,
|
||||
() => defaultBrokerEpoch(brokerId),
|
||||
time,
|
||||
|
|
@ -1570,7 +1570,7 @@ class PartitionTest extends AbstractPartitionTest {
|
|||
val partition = new Partition(
|
||||
topicPartition,
|
||||
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
|
||||
interBrokerProtocolVersion = MetadataVersion.latest,
|
||||
interBrokerProtocolVersion = MetadataVersion.latestTesting,
|
||||
localBrokerId = brokerId,
|
||||
() => defaultBrokerEpoch(brokerId),
|
||||
time,
|
||||
|
|
@ -1687,7 +1687,7 @@ class PartitionTest extends AbstractPartitionTest {
|
|||
val partition = new Partition(
|
||||
topicPartition,
|
||||
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
|
||||
interBrokerProtocolVersion = MetadataVersion.latest,
|
||||
interBrokerProtocolVersion = MetadataVersion.latestTesting,
|
||||
localBrokerId = brokerId,
|
||||
() => defaultBrokerEpoch(brokerId),
|
||||
time,
|
||||
|
|
@ -1794,7 +1794,7 @@ class PartitionTest extends AbstractPartitionTest {
|
|||
val partition = new Partition(
|
||||
topicPartition,
|
||||
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
|
||||
interBrokerProtocolVersion = MetadataVersion.latest,
|
||||
interBrokerProtocolVersion = MetadataVersion.latestTesting,
|
||||
localBrokerId = brokerId,
|
||||
() => defaultBrokerEpoch(brokerId),
|
||||
time,
|
||||
|
|
@ -1886,7 +1886,7 @@ class PartitionTest extends AbstractPartitionTest {
|
|||
val partition = new Partition(
|
||||
topicPartition,
|
||||
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
|
||||
interBrokerProtocolVersion = MetadataVersion.latest,
|
||||
interBrokerProtocolVersion = MetadataVersion.latestTesting,
|
||||
localBrokerId = brokerId,
|
||||
() => defaultBrokerEpoch(brokerId),
|
||||
time,
|
||||
|
|
@ -1952,7 +1952,7 @@ class PartitionTest extends AbstractPartitionTest {
|
|||
val partition = new Partition(
|
||||
topicPartition,
|
||||
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
|
||||
interBrokerProtocolVersion = MetadataVersion.latest,
|
||||
interBrokerProtocolVersion = MetadataVersion.latestTesting,
|
||||
localBrokerId = brokerId,
|
||||
() => defaultBrokerEpoch(brokerId),
|
||||
time,
|
||||
|
|
@ -2108,7 +2108,7 @@ class PartitionTest extends AbstractPartitionTest {
|
|||
val partition = new Partition(
|
||||
topicPartition,
|
||||
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
|
||||
interBrokerProtocolVersion = MetadataVersion.latest,
|
||||
interBrokerProtocolVersion = MetadataVersion.latestTesting,
|
||||
localBrokerId = brokerId,
|
||||
() => defaultBrokerEpoch(brokerId),
|
||||
time,
|
||||
|
|
@ -2781,7 +2781,7 @@ class PartitionTest extends AbstractPartitionTest {
|
|||
// Create new Partition object for same topicPartition
|
||||
val partition2 = new Partition(topicPartition,
|
||||
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
|
||||
interBrokerProtocolVersion = MetadataVersion.latest,
|
||||
interBrokerProtocolVersion = MetadataVersion.latestTesting,
|
||||
localBrokerId = brokerId,
|
||||
() => defaultBrokerEpoch(brokerId),
|
||||
time,
|
||||
|
|
@ -2826,7 +2826,7 @@ class PartitionTest extends AbstractPartitionTest {
|
|||
// Create new Partition object for same topicPartition
|
||||
val partition2 = new Partition(topicPartition,
|
||||
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
|
||||
interBrokerProtocolVersion = MetadataVersion.latest,
|
||||
interBrokerProtocolVersion = MetadataVersion.latestTesting,
|
||||
localBrokerId = brokerId,
|
||||
() => defaultBrokerEpoch(brokerId),
|
||||
time,
|
||||
|
|
@ -2909,7 +2909,7 @@ class PartitionTest extends AbstractPartitionTest {
|
|||
def testUpdateAssignmentAndIsr(): Unit = {
|
||||
val topicPartition = new TopicPartition("test", 1)
|
||||
val partition = new Partition(
|
||||
topicPartition, 1000, MetadataVersion.latest, 0, () => defaultBrokerEpoch(0),
|
||||
topicPartition, 1000, MetadataVersion.latestTesting, 0, () => defaultBrokerEpoch(0),
|
||||
new SystemTime(), mock(classOf[AlterPartitionListener]), mock(classOf[DelayedOperations]),
|
||||
mock(classOf[MetadataCache]), mock(classOf[LogManager]), mock(classOf[AlterPartitionManager]))
|
||||
|
||||
|
|
@ -2984,7 +2984,7 @@ class PartitionTest extends AbstractPartitionTest {
|
|||
val spyLogManager = spy(logManager)
|
||||
val partition = new Partition(topicPartition,
|
||||
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
|
||||
interBrokerProtocolVersion = MetadataVersion.latest,
|
||||
interBrokerProtocolVersion = MetadataVersion.latestTesting,
|
||||
localBrokerId = brokerId,
|
||||
() => defaultBrokerEpoch(brokerId),
|
||||
time,
|
||||
|
|
@ -3023,7 +3023,7 @@ class PartitionTest extends AbstractPartitionTest {
|
|||
|
||||
val partition = new Partition(topicPartition,
|
||||
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
|
||||
interBrokerProtocolVersion = MetadataVersion.latest,
|
||||
interBrokerProtocolVersion = MetadataVersion.latestTesting,
|
||||
localBrokerId = brokerId,
|
||||
() => defaultBrokerEpoch(brokerId),
|
||||
time,
|
||||
|
|
@ -3065,7 +3065,7 @@ class PartitionTest extends AbstractPartitionTest {
|
|||
|
||||
val partition = new Partition(topicPartition,
|
||||
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
|
||||
interBrokerProtocolVersion = MetadataVersion.latest,
|
||||
interBrokerProtocolVersion = MetadataVersion.latestTesting,
|
||||
localBrokerId = brokerId,
|
||||
() => defaultBrokerEpoch(brokerId),
|
||||
time,
|
||||
|
|
|
|||
|
|
@ -160,7 +160,7 @@ class ControllerChannelManagerTest {
|
|||
|
||||
@Test
|
||||
def testLeaderAndIsrInterBrokerProtocolVersion(): Unit = {
|
||||
testLeaderAndIsrRequestFollowsInterBrokerProtocolVersion(MetadataVersion.latest, ApiKeys.LEADER_AND_ISR.latestVersion)
|
||||
testLeaderAndIsrRequestFollowsInterBrokerProtocolVersion(MetadataVersion.latestTesting, ApiKeys.LEADER_AND_ISR.latestVersion)
|
||||
|
||||
for (metadataVersion <- MetadataVersion.VERSIONS) {
|
||||
val leaderAndIsrRequestVersion: Short = {
|
||||
|
|
@ -379,7 +379,7 @@ class ControllerChannelManagerTest {
|
|||
|
||||
@Test
|
||||
def testUpdateMetadataInterBrokerProtocolVersion(): Unit = {
|
||||
testUpdateMetadataFollowsInterBrokerProtocolVersion(MetadataVersion.latest, ApiKeys.UPDATE_METADATA.latestVersion)
|
||||
testUpdateMetadataFollowsInterBrokerProtocolVersion(MetadataVersion.latestTesting, ApiKeys.UPDATE_METADATA.latestVersion)
|
||||
|
||||
for (metadataVersion <- MetadataVersion.VERSIONS) {
|
||||
val updateMetadataRequestVersion: Short =
|
||||
|
|
@ -625,7 +625,7 @@ class ControllerChannelManagerTest {
|
|||
|
||||
@Test
|
||||
def testMixedDeleteAndNotDeleteStopReplicaRequests(): Unit = {
|
||||
testMixedDeleteAndNotDeleteStopReplicaRequests(MetadataVersion.latest,
|
||||
testMixedDeleteAndNotDeleteStopReplicaRequests(MetadataVersion.latestTesting,
|
||||
ApiKeys.STOP_REPLICA.latestVersion)
|
||||
|
||||
for (metadataVersion <- MetadataVersion.VERSIONS) {
|
||||
|
|
@ -775,7 +775,7 @@ class ControllerChannelManagerTest {
|
|||
|
||||
@Test
|
||||
def testStopReplicaInterBrokerProtocolVersion(): Unit = {
|
||||
testStopReplicaFollowsInterBrokerProtocolVersion(MetadataVersion.latest, ApiKeys.STOP_REPLICA.latestVersion)
|
||||
testStopReplicaFollowsInterBrokerProtocolVersion(MetadataVersion.latestTesting, ApiKeys.STOP_REPLICA.latestVersion)
|
||||
|
||||
for (metadataVersion <- MetadataVersion.VERSIONS) {
|
||||
if (metadataVersion.isLessThan(IBP_2_2_IV0))
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import scala.util.{Failure, Success, Try}
|
|||
|
||||
object ControllerIntegrationTest {
|
||||
def testAlterPartitionSource(): JStream[Arguments] = {
|
||||
Seq(MetadataVersion.IBP_2_7_IV0, MetadataVersion.latest).asJava.stream.flatMap { metadataVersion =>
|
||||
Seq(MetadataVersion.IBP_2_7_IV0, MetadataVersion.latestTesting).asJava.stream.flatMap { metadataVersion =>
|
||||
ApiKeys.ALTER_PARTITION.allVersions.stream.map { alterPartitionVersion =>
|
||||
Arguments.of(metadataVersion, alterPartitionVersion)
|
||||
}
|
||||
|
|
@ -1010,7 +1010,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
|
|||
// topic ids anymore. However, the already assigned topic ids are kept. This means
|
||||
// that using AlterPartition version 2 should still work assuming that it only
|
||||
// contains topic with topics ids.
|
||||
servers = makeServers(1, interBrokerProtocolVersion = Some(MetadataVersion.latest))
|
||||
servers = makeServers(1, interBrokerProtocolVersion = Some(MetadataVersion.latestTesting))
|
||||
|
||||
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
|
||||
val tp = new TopicPartition("t", 0)
|
||||
|
|
|
|||
|
|
@ -97,7 +97,7 @@ class GroupMetadataManagerTest {
|
|||
metrics = new kMetrics()
|
||||
time = new MockTime
|
||||
replicaManager = mock(classOf[ReplicaManager])
|
||||
groupMetadataManager = new GroupMetadataManager(0, MetadataVersion.latest, offsetConfig, replicaManager,
|
||||
groupMetadataManager = new GroupMetadataManager(0, MetadataVersion.latestTesting, offsetConfig, replicaManager,
|
||||
time, metrics)
|
||||
groupMetadataManager.startup(() => numOffsetsPartitions, false)
|
||||
partition = mock(classOf[Partition])
|
||||
|
|
@ -112,7 +112,7 @@ class GroupMetadataManagerTest {
|
|||
def testLogInfoFromCleanupGroupMetadata(): Unit = {
|
||||
var expiredOffsets: Int = 0
|
||||
var infoCount = 0
|
||||
val gmm = new GroupMetadataManager(0, MetadataVersion.latest, offsetConfig, replicaManager, time, metrics) {
|
||||
val gmm = new GroupMetadataManager(0, MetadataVersion.latestTesting, offsetConfig, replicaManager, time, metrics) {
|
||||
override def cleanupGroupMetadata(groups: Iterable[GroupMetadata], requestLocal: RequestLocal,
|
||||
selector: GroupMetadata => Map[TopicPartition, OffsetAndMetadata]): Int = expiredOffsets
|
||||
|
||||
|
|
@ -2740,7 +2740,7 @@ class GroupMetadataManagerTest {
|
|||
val offsetCommitRecord = TestUtils.records(Seq(
|
||||
new SimpleRecord(
|
||||
GroupMetadataManager.offsetCommitKey(groupId, topicPartition),
|
||||
GroupMetadataManager.offsetCommitValue(OffsetAndMetadata(35L, "", time.milliseconds()), MetadataVersion.latest)
|
||||
GroupMetadataManager.offsetCommitValue(OffsetAndMetadata(35L, "", time.milliseconds()), MetadataVersion.latestTesting)
|
||||
)
|
||||
)).records.asScala.head
|
||||
val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(offsetCommitRecord)
|
||||
|
|
@ -2826,7 +2826,7 @@ class GroupMetadataManagerTest {
|
|||
protocol: String,
|
||||
memberId: String,
|
||||
assignmentBytes: Array[Byte] = Array.emptyByteArray,
|
||||
metadataVersion: MetadataVersion = MetadataVersion.latest): SimpleRecord = {
|
||||
metadataVersion: MetadataVersion = MetadataVersion.latestTesting): SimpleRecord = {
|
||||
val memberProtocols = List((protocol, Array.emptyByteArray))
|
||||
val member = new MemberMetadata(memberId, Some(groupInstanceId), "clientId", "clientHost", 30000, 10000, protocolType, memberProtocols)
|
||||
val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol, memberId,
|
||||
|
|
@ -2839,7 +2839,7 @@ class GroupMetadataManagerTest {
|
|||
private def buildEmptyGroupRecord(generation: Int, protocolType: String): SimpleRecord = {
|
||||
val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, None, Seq.empty, time)
|
||||
val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
|
||||
val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map.empty, MetadataVersion.latest)
|
||||
val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map.empty, MetadataVersion.latestTesting)
|
||||
new SimpleRecord(groupMetadataKey, groupMetadataValue)
|
||||
}
|
||||
|
||||
|
|
@ -2883,7 +2883,7 @@ class GroupMetadataManagerTest {
|
|||
|
||||
private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long],
|
||||
groupId: String = groupId,
|
||||
metadataVersion: MetadataVersion = MetadataVersion.latest,
|
||||
metadataVersion: MetadataVersion = MetadataVersion.latestTesting,
|
||||
retentionTimeOpt: Option[Long] = None): Seq[SimpleRecord] = {
|
||||
committedOffsets.map { case (topicPartition, offset) =>
|
||||
val commitTimestamp = time.milliseconds()
|
||||
|
|
|
|||
|
|
@ -246,7 +246,7 @@ class LogLoaderTest {
|
|||
|
||||
@Test
|
||||
def testProducerSnapshotsRecoveryAfterUncleanShutdownCurrentMessageFormat(): Unit = {
|
||||
testProducerSnapshotsRecoveryAfterUncleanShutdown(MetadataVersion.latest.version)
|
||||
testProducerSnapshotsRecoveryAfterUncleanShutdown(MetadataVersion.latestTesting.version)
|
||||
}
|
||||
|
||||
private def createLog(dir: File,
|
||||
|
|
|
|||
|
|
@ -158,7 +158,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
offsetCounter,
|
||||
metricsRecorder,
|
||||
|
|
@ -203,7 +203,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(0),
|
||||
metricsRecorder,
|
||||
|
|
@ -255,7 +255,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(0),
|
||||
metricsRecorder,
|
||||
|
|
@ -321,7 +321,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(0),
|
||||
metricsRecorder,
|
||||
|
|
@ -372,7 +372,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
partitionLeaderEpoch,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
offsetCounter,
|
||||
metricsRecorder,
|
||||
|
|
@ -458,7 +458,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
partitionLeaderEpoch,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(0),
|
||||
metricsRecorder,
|
||||
|
|
@ -517,7 +517,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(0),
|
||||
metricsRecorder,
|
||||
|
|
@ -565,7 +565,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest,
|
||||
MetadataVersion.latestTesting,
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(0),
|
||||
metricsRecorder,
|
||||
|
|
@ -628,7 +628,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
partitionLeaderEpoch,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(0),
|
||||
metricsRecorder,
|
||||
|
|
@ -688,7 +688,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(0),
|
||||
metricsRecorder,
|
||||
|
|
@ -714,7 +714,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(0),
|
||||
metricsRecorder,
|
||||
|
|
@ -740,7 +740,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(0),
|
||||
metricsRecorder,
|
||||
|
|
@ -766,7 +766,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(0),
|
||||
metricsRecorder,
|
||||
|
|
@ -791,7 +791,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest,
|
||||
MetadataVersion.latestTesting,
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
).validatedRecords, offset)
|
||||
|
|
@ -814,7 +814,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
).validatedRecords, offset)
|
||||
|
|
@ -838,7 +838,7 @@ class LogValidatorTest {
|
|||
5000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
).validatedRecords
|
||||
|
|
@ -863,7 +863,7 @@ class LogValidatorTest {
|
|||
5000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
).validatedRecords
|
||||
|
|
@ -889,7 +889,7 @@ class LogValidatorTest {
|
|||
5000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
).validatedRecords
|
||||
|
|
@ -915,7 +915,7 @@ class LogValidatorTest {
|
|||
5000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
).validatedRecords
|
||||
|
|
@ -939,7 +939,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
)
|
||||
|
|
@ -965,7 +965,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
)
|
||||
|
|
@ -991,7 +991,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
)
|
||||
|
|
@ -1017,7 +1017,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
)
|
||||
|
|
@ -1043,7 +1043,7 @@ class LogValidatorTest {
|
|||
5000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
))
|
||||
|
|
@ -1066,7 +1066,7 @@ class LogValidatorTest {
|
|||
5000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.COORDINATOR,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
)
|
||||
|
|
@ -1094,7 +1094,7 @@ class LogValidatorTest {
|
|||
5000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
).validatedRecords, offset)
|
||||
|
|
@ -1118,7 +1118,7 @@ class LogValidatorTest {
|
|||
5000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
).validatedRecords, offset)
|
||||
|
|
@ -1141,7 +1141,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
).validatedRecords, offset)
|
||||
|
|
@ -1164,7 +1164,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
).validatedRecords, offset)
|
||||
|
|
@ -1188,7 +1188,7 @@ class LogValidatorTest {
|
|||
5000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
).validatedRecords, offset)
|
||||
|
|
@ -1212,7 +1212,7 @@ class LogValidatorTest {
|
|||
5000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
).validatedRecords, offset)
|
||||
|
|
@ -1238,7 +1238,7 @@ class LogValidatorTest {
|
|||
5000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
))
|
||||
|
|
@ -1264,7 +1264,7 @@ class LogValidatorTest {
|
|||
5000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
))
|
||||
|
|
@ -1288,7 +1288,7 @@ class LogValidatorTest {
|
|||
5000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
).validatedRecords, offset)
|
||||
|
|
@ -1312,7 +1312,7 @@ class LogValidatorTest {
|
|||
5000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
).validatedRecords, offset)
|
||||
|
|
@ -1334,7 +1334,7 @@ class LogValidatorTest {
|
|||
5000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
))
|
||||
|
|
@ -1398,7 +1398,7 @@ class LogValidatorTest {
|
|||
1000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
)
|
||||
|
|
@ -1477,7 +1477,7 @@ class LogValidatorTest {
|
|||
timestampAfterMaxConfig,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
)
|
||||
|
|
@ -1511,7 +1511,7 @@ class LogValidatorTest {
|
|||
timestampAfterMaxConfig,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
)
|
||||
|
|
@ -1544,7 +1544,7 @@ class LogValidatorTest {
|
|||
5000L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest
|
||||
MetadataVersion.latestTesting
|
||||
).validateMessagesAndAssignOffsets(
|
||||
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
|
||||
))
|
||||
|
|
|
|||
|
|
@ -79,7 +79,7 @@ class SocketServerTest {
|
|||
TestUtils.clearYammerMetrics()
|
||||
|
||||
private val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, false,
|
||||
() => new Features(MetadataVersion.latest(), Collections.emptyMap[String, java.lang.Short], 0, true))
|
||||
() => new Features(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true))
|
||||
val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager)
|
||||
val sockets = new ArrayBuffer[Socket]
|
||||
|
||||
|
|
|
|||
|
|
@ -137,7 +137,7 @@ class ZkAuthorizationTest extends QuorumTestHarness with Logging {
|
|||
private def createBrokerInfo(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol,
|
||||
rack: Option[String] = None): BrokerInfo =
|
||||
BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
|
||||
(securityProtocol), securityProtocol)), rack = rack), MetadataVersion.latest, jmxPort = port + 10)
|
||||
(securityProtocol), securityProtocol)), rack = rack), MetadataVersion.latestTesting, jmxPort = port + 10)
|
||||
|
||||
private def newKafkaZkClient(connectionString: String, isSecure: Boolean) =
|
||||
KafkaZkClient(connectionString, isSecure, 6000, 6000, Int.MaxValue, Time.SYSTEM, "ZkAuthorizationTest",
|
||||
|
|
|
|||
|
|
@ -79,12 +79,12 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
|
|||
): Unit = {
|
||||
if (cluster.isKRaftTest && apiVersion >= 3) {
|
||||
assertEquals(1, apiVersionsResponse.data().finalizedFeatures().size())
|
||||
assertEquals(MetadataVersion.latest().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
|
||||
assertEquals(MetadataVersion.latest().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())
|
||||
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
|
||||
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())
|
||||
|
||||
assertEquals(1, apiVersionsResponse.data().supportedFeatures().size())
|
||||
assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion())
|
||||
assertEquals(MetadataVersion.latest().featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersion())
|
||||
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersion())
|
||||
}
|
||||
val expectedApis = if (!cluster.isKRaftTest) {
|
||||
ApiVersionsResponse.collectApis(
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ import scala.jdk.CollectionConverters._
|
|||
*/
|
||||
class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||
|
||||
protected var testMetadataVersion = MetadataVersion.latest
|
||||
protected var testMetadataVersion = MetadataVersion.latestTesting
|
||||
override protected def metadataVersion = testMetadataVersion
|
||||
|
||||
@BeforeEach
|
||||
|
|
|
|||
|
|
@ -30,8 +30,8 @@ import org.mockito.Mockito
|
|||
import scala.jdk.CollectionConverters._
|
||||
|
||||
class ApiVersionManagerTest {
|
||||
private val brokerFeatures = BrokerFeatures.createDefault()
|
||||
private val metadataCache = new ZkMetadataCache(1, MetadataVersion.latest(), brokerFeatures)
|
||||
private val brokerFeatures = BrokerFeatures.createDefault(true)
|
||||
private val metadataCache = new ZkMetadataCache(1, MetadataVersion.latestTesting(), brokerFeatures)
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(classOf[ListenerType])
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
|||
import org.apache.kafka.common.requests.ApiVersionsRequest
|
||||
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
|
||||
import kafka.test.junit.ClusterTestExtensions
|
||||
import org.apache.kafka.server.common.MetadataVersion
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.extension.ExtendWith
|
||||
|
|
@ -37,14 +38,20 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
|
|||
super.brokerPropertyOverrides(config.serverProperties())
|
||||
}
|
||||
|
||||
@ClusterTest
|
||||
@ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
|
||||
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
|
||||
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
|
||||
))
|
||||
def testApiVersionsRequest(): Unit = {
|
||||
val request = new ApiVersionsRequest.Builder().build()
|
||||
val apiVersionsResponse = sendApiVersionsRequest(request, cluster.clientListener())
|
||||
validateApiVersionsResponse(apiVersionsResponse)
|
||||
}
|
||||
|
||||
@ClusterTest(serverProperties = Array(new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")))
|
||||
@ClusterTest(serverProperties = Array(
|
||||
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"),
|
||||
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
|
||||
))
|
||||
def testApiVersionsRequestIncludesUnreleasedApis(): Unit = {
|
||||
val request = new ApiVersionsRequest.Builder().build()
|
||||
val apiVersionsResponse = sendApiVersionsRequest(request, cluster.clientListener())
|
||||
|
|
@ -77,7 +84,10 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
|
|||
assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion())
|
||||
}
|
||||
|
||||
@ClusterTest
|
||||
@ClusterTest(metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array(
|
||||
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
|
||||
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"),
|
||||
))
|
||||
def testApiVersionsRequestValidationV0(): Unit = {
|
||||
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
|
||||
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.clientListener())
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ class BrokerFeaturesTest {
|
|||
|
||||
@Test
|
||||
def testIncompatibilitiesDueToAbsentFeature(): Unit = {
|
||||
val brokerFeatures = BrokerFeatures.createDefault()
|
||||
val brokerFeatures = BrokerFeatures.createDefault(true)
|
||||
val supportedFeatures = Features.supportedFeatures(Map[String, SupportedVersionRange](
|
||||
"test_feature_1" -> new SupportedVersionRange(1, 4),
|
||||
"test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
|
||||
|
|
@ -51,7 +51,7 @@ class BrokerFeaturesTest {
|
|||
|
||||
@Test
|
||||
def testIncompatibilitiesDueToIncompatibleFeature(): Unit = {
|
||||
val brokerFeatures = BrokerFeatures.createDefault()
|
||||
val brokerFeatures = BrokerFeatures.createDefault(true)
|
||||
val supportedFeatures = Features.supportedFeatures(Map[String, SupportedVersionRange](
|
||||
"test_feature_1" -> new SupportedVersionRange(1, 4),
|
||||
"test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
|
||||
|
|
@ -70,7 +70,7 @@ class BrokerFeaturesTest {
|
|||
|
||||
@Test
|
||||
def testCompatibleFeatures(): Unit = {
|
||||
val brokerFeatures = BrokerFeatures.createDefault()
|
||||
val brokerFeatures = BrokerFeatures.createDefault(true)
|
||||
val supportedFeatures = Features.supportedFeatures(Map[String, SupportedVersionRange](
|
||||
"test_feature_1" -> new SupportedVersionRange(1, 4),
|
||||
"test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
|
||||
|
|
@ -86,7 +86,7 @@ class BrokerFeaturesTest {
|
|||
|
||||
@Test
|
||||
def testDefaultFinalizedFeatures(): Unit = {
|
||||
val brokerFeatures = BrokerFeatures.createDefault()
|
||||
val brokerFeatures = BrokerFeatures.createDefault(true)
|
||||
val supportedFeatures = Features.supportedFeatures(Map[String, SupportedVersionRange](
|
||||
"test_feature_1" -> new SupportedVersionRange(1, 4),
|
||||
"test_feature_2" -> new SupportedVersionRange(1, 3),
|
||||
|
|
@ -94,7 +94,7 @@ class BrokerFeaturesTest {
|
|||
brokerFeatures.setSupportedFeatures(supportedFeatures)
|
||||
|
||||
val expectedFeatures = Map[String, Short](
|
||||
MetadataVersion.FEATURE_NAME -> MetadataVersion.latest().featureLevel(),
|
||||
MetadataVersion.FEATURE_NAME -> MetadataVersion.latestTesting().featureLevel(),
|
||||
"test_feature_1" -> 4,
|
||||
"test_feature_2" -> 3,
|
||||
"test_feature_3" -> 7)
|
||||
|
|
|
|||
|
|
@ -166,7 +166,7 @@ class ControllerApisTest {
|
|||
ListenerType.CONTROLLER,
|
||||
true,
|
||||
false,
|
||||
() => Features.fromKRaftVersion(MetadataVersion.latest())),
|
||||
() => Features.fromKRaftVersion(MetadataVersion.latestTesting())),
|
||||
metadataCache
|
||||
)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ class FinalizedFeatureCacheTest {
|
|||
|
||||
@Test
|
||||
def testEmpty(): Unit = {
|
||||
assertTrue(new ZkMetadataCache(1, MetadataVersion.IBP_2_8_IV1, BrokerFeatures.createDefault()).getFeatureOption.isEmpty)
|
||||
assertTrue(new ZkMetadataCache(1, MetadataVersion.IBP_2_8_IV1, BrokerFeatures.createDefault(true)).getFeatureOption.isEmpty)
|
||||
}
|
||||
|
||||
def asJava(input: Map[String, Short]): java.util.Map[String, java.lang.Short] = {
|
||||
|
|
@ -40,7 +40,7 @@ class FinalizedFeatureCacheTest {
|
|||
def testUpdateOrThrowFailedDueToInvalidEpoch(): Unit = {
|
||||
val supportedFeatures = Map[String, SupportedVersionRange](
|
||||
"feature_1" -> new SupportedVersionRange(1, 4))
|
||||
val brokerFeatures = BrokerFeatures.createDefault()
|
||||
val brokerFeatures = BrokerFeatures.createDefault(true)
|
||||
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
|
||||
|
||||
val finalizedFeatures = Map[String, Short]("feature_1" -> 4)
|
||||
|
|
@ -63,7 +63,7 @@ class FinalizedFeatureCacheTest {
|
|||
def testUpdateOrThrowFailedDueToInvalidFeatures(): Unit = {
|
||||
val supportedFeatures =
|
||||
Map[String, SupportedVersionRange]("feature_1" -> new SupportedVersionRange(1, 1))
|
||||
val brokerFeatures = BrokerFeatures.createDefault()
|
||||
val brokerFeatures = BrokerFeatures.createDefault(true)
|
||||
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
|
||||
|
||||
val finalizedFeatures = Map[String, Short]("feature_1" -> 2)
|
||||
|
|
@ -79,7 +79,7 @@ class FinalizedFeatureCacheTest {
|
|||
def testUpdateOrThrowSuccess(): Unit = {
|
||||
val supportedFeatures =
|
||||
Map[String, SupportedVersionRange]("feature_1" -> new SupportedVersionRange(1, 4))
|
||||
val brokerFeatures = BrokerFeatures.createDefault()
|
||||
val brokerFeatures = BrokerFeatures.createDefault(true)
|
||||
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
|
||||
|
||||
val finalizedFeatures = Map[String, Short]("feature_1" -> 3)
|
||||
|
|
@ -95,7 +95,7 @@ class FinalizedFeatureCacheTest {
|
|||
def testClear(): Unit = {
|
||||
val supportedFeatures =
|
||||
Map[String, SupportedVersionRange]("feature_1" -> new SupportedVersionRange(1, 4))
|
||||
val brokerFeatures = BrokerFeatures.createDefault()
|
||||
val brokerFeatures = BrokerFeatures.createDefault(true)
|
||||
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
|
||||
|
||||
val finalizedFeatures = Map[String, Short]("feature_1" -> 3)
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
|
|||
val supportedFeaturesMap = Map[String, SupportedVersionRange](
|
||||
"feature_1" -> new SupportedVersionRange(1, 4),
|
||||
"feature_2" -> new SupportedVersionRange(1, 3))
|
||||
val brokerFeatures = BrokerFeatures.createDefault()
|
||||
val brokerFeatures = BrokerFeatures.createDefault(true)
|
||||
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeaturesMap.asJava))
|
||||
brokerFeatures
|
||||
}
|
||||
|
|
|
|||
|
|
@ -124,7 +124,7 @@ class KafkaApisTest extends Logging {
|
|||
private val metrics = new Metrics()
|
||||
private val brokerId = 1
|
||||
// KRaft tests should override this with a KRaftMetadataCache
|
||||
private var metadataCache: MetadataCache = MetadataCache.zkMetadataCache(brokerId, MetadataVersion.latest())
|
||||
private var metadataCache: MetadataCache = MetadataCache.zkMetadataCache(brokerId, MetadataVersion.latestTesting())
|
||||
private val brokerEpochManager: ZkBrokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
|
||||
private val clientQuotaManager: ClientQuotaManager = mock(classOf[ClientQuotaManager])
|
||||
private val clientRequestQuotaManager: ClientRequestQuotaManager = mock(classOf[ClientRequestQuotaManager])
|
||||
|
|
@ -149,7 +149,7 @@ class KafkaApisTest extends Logging {
|
|||
metrics.close()
|
||||
}
|
||||
|
||||
def createKafkaApis(interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest,
|
||||
def createKafkaApis(interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latestTesting,
|
||||
authorizer: Option[Authorizer] = None,
|
||||
enableForwarding: Boolean = false,
|
||||
configRepository: ConfigRepository = new MockConfigRepository(),
|
||||
|
|
@ -199,10 +199,10 @@ class KafkaApisTest extends Logging {
|
|||
val apiVersionManager = new SimpleApiVersionManager(
|
||||
listenerType,
|
||||
enabledApis,
|
||||
BrokerFeatures.defaultSupportedFeatures(),
|
||||
BrokerFeatures.defaultSupportedFeatures(true),
|
||||
true,
|
||||
false,
|
||||
() => new Features(MetadataVersion.latest(), Collections.emptyMap[String, java.lang.Short], 0, raftSupport))
|
||||
() => new Features(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, raftSupport))
|
||||
|
||||
val clientMetricsManagerOpt = if (raftSupport) Some(clientMetricsManager) else None
|
||||
|
||||
|
|
|
|||
|
|
@ -595,7 +595,7 @@ class KafkaConfigTest {
|
|||
props.setProperty(KafkaConfig.BrokerIdProp, "1")
|
||||
props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
|
||||
val conf = KafkaConfig.fromProps(props)
|
||||
assertEquals(MetadataVersion.latest, conf.interBrokerProtocolVersion)
|
||||
assertEquals(MetadataVersion.latestProduction, conf.interBrokerProtocolVersion)
|
||||
|
||||
props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.0")
|
||||
// We need to set the message format version to make the configuration valid.
|
||||
|
|
@ -611,7 +611,7 @@ class KafkaConfigTest {
|
|||
assertEquals(IBP_0_8_2, conf3.interBrokerProtocolVersion)
|
||||
|
||||
//check that latest is newer than 0.8.2
|
||||
assertTrue(MetadataVersion.latest.isAtLeast(conf3.interBrokerProtocolVersion))
|
||||
assertTrue(MetadataVersion.latestTesting.isAtLeast(conf3.interBrokerProtocolVersion))
|
||||
}
|
||||
|
||||
private def isValidKafkaConfig(props: Properties): Boolean = {
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ class KafkaRaftServerTest {
|
|||
private def invokeLoadMetaProperties(
|
||||
metaProperties: MetaProperties,
|
||||
configProperties: Properties,
|
||||
metadataVersion: Option[MetadataVersion] = Some(MetadataVersion.latest())
|
||||
metadataVersion: Option[MetadataVersion] = Some(MetadataVersion.latestTesting())
|
||||
): (MetaPropertiesEnsemble, BootstrapMetadata) = {
|
||||
val tempLogDir = TestUtils.tempDirectory()
|
||||
try {
|
||||
|
|
@ -178,7 +178,7 @@ class KafkaRaftServerTest {
|
|||
setNodeId(nodeId).
|
||||
build())
|
||||
|
||||
writeBootstrapMetadata(validDir, MetadataVersion.latest())
|
||||
writeBootstrapMetadata(validDir, MetadataVersion.latestTesting())
|
||||
|
||||
// Use a regular file as an invalid log dir to trigger an IO error
|
||||
val invalidDir = TestUtils.tempFile("blah")
|
||||
|
|
@ -314,6 +314,6 @@ class KafkaRaftServerTest {
|
|||
assertEquals(metaProperties, metaPropertiesEnsemble.logDirProps().values().iterator().next())
|
||||
assertTrue(metaPropertiesEnsemble.errorLogDirs().isEmpty())
|
||||
assertTrue(metaPropertiesEnsemble.emptyLogDirs().isEmpty())
|
||||
assertEquals(bootstrapMetadata.metadataVersion(), MetadataVersion.latest())
|
||||
assertEquals(bootstrapMetadata.metadataVersion(), MetadataVersion.latestProduction())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -127,7 +127,7 @@ class KafkaServerTest extends QuorumTestHarness {
|
|||
@Test
|
||||
def testAlterIsrManager(): Unit = {
|
||||
val props = TestUtils.createBrokerConfigs(1, zkConnect).head
|
||||
props.put(KafkaConfig.InterBrokerProtocolVersionProp, MetadataVersion.latest.toString)
|
||||
props.put(KafkaConfig.InterBrokerProtocolVersionProp, MetadataVersion.latestTesting.toString)
|
||||
|
||||
val server = TestUtils.createServer(KafkaConfig.fromProps(props))
|
||||
server.replicaManager.alterPartitionManager match {
|
||||
|
|
|
|||
|
|
@ -44,12 +44,12 @@ import scala.jdk.CollectionConverters._
|
|||
object MetadataCacheTest {
|
||||
def zkCacheProvider(): util.stream.Stream[MetadataCache] =
|
||||
util.stream.Stream.of[MetadataCache](
|
||||
MetadataCache.zkMetadataCache(1, MetadataVersion.latest())
|
||||
MetadataCache.zkMetadataCache(1, MetadataVersion.latestTesting())
|
||||
)
|
||||
|
||||
def cacheProvider(): util.stream.Stream[MetadataCache] =
|
||||
util.stream.Stream.of[MetadataCache](
|
||||
MetadataCache.zkMetadataCache(1, MetadataVersion.latest()),
|
||||
MetadataCache.zkMetadataCache(1, MetadataVersion.latestTesting()),
|
||||
MetadataCache.kRaftMetadataCache(1)
|
||||
)
|
||||
|
||||
|
|
@ -954,7 +954,7 @@ class MetadataCacheTest {
|
|||
)(
|
||||
verifier: ZkMetadataCache => Unit
|
||||
): Unit = {
|
||||
val cache = MetadataCache.zkMetadataCache(1, MetadataVersion.latest(), zkMigrationEnabled = zkMigrationEnabled)
|
||||
val cache = MetadataCache.zkMetadataCache(1, MetadataVersion.latestTesting(), zkMigrationEnabled = zkMigrationEnabled)
|
||||
cache.updateMetadata(1, new UpdateMetadataRequest.Builder(8, 1, 42, brokerEpoch,
|
||||
initialTopicStates.flatMap(_._2.values).toList.asJava, Seq.empty.asJava, initialTopicIds.asJava).build())
|
||||
cache.updateMetadata(1, updateMetadataRequest)
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
private val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(),
|
||||
0, 0, 0, partitionStates, Collections.emptyList(), topicIds.asJava).build()
|
||||
// TODO: support raft code?
|
||||
private val metadataCache = new ZkMetadataCache(0, MetadataVersion.latest(), BrokerFeatures.createEmpty())
|
||||
private val metadataCache = new ZkMetadataCache(0, MetadataVersion.latestTesting(), BrokerFeatures.createEmpty())
|
||||
metadataCache.updateMetadata(0, updateMetadataRequest)
|
||||
|
||||
private def initialFetchState(fetchOffset: Long, leaderEpoch: Int = 1): InitialFetchState = {
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ class ReplicaFetcherThreadTest {
|
|||
private val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(),
|
||||
0, 0, 0, partitionStates, Collections.emptyList(), topicIds.asJava).build()
|
||||
// TODO: support raft code?
|
||||
private var metadataCache = new ZkMetadataCache(0, MetadataVersion.latest(), BrokerFeatures.createEmpty())
|
||||
private var metadataCache = new ZkMetadataCache(0, MetadataVersion.latestTesting(), BrokerFeatures.createEmpty())
|
||||
metadataCache.updateMetadata(0, updateMetadataRequest)
|
||||
|
||||
private def initialFetchState(topicId: Option[Uuid], fetchOffset: Long, leaderEpoch: Int = 1): InitialFetchState = {
|
||||
|
|
@ -277,7 +277,7 @@ class ReplicaFetcherThreadTest {
|
|||
|
||||
@Test
|
||||
def shouldNotFetchLeaderEpochOnFirstFetchWithTruncateOnFetch(): Unit = {
|
||||
verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.latest, epochFetchCount = 0)
|
||||
verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.latestTesting, epochFetchCount = 0)
|
||||
}
|
||||
|
||||
private def verifyFetchLeaderEpochOnFirstFetch(ibp: MetadataVersion, epochFetchCount: Int = 1): Unit = {
|
||||
|
|
|
|||
|
|
@ -162,7 +162,7 @@ class ReplicaManagerConcurrencyTest extends Logging {
|
|||
setClusterId(Uuid.randomUuid().toString).
|
||||
setNodeId(1).
|
||||
build()
|
||||
TestUtils.formatDirectories(immutable.Seq(logDir.getAbsolutePath), metaProperties, MetadataVersion.latest(), None)
|
||||
TestUtils.formatDirectories(immutable.Seq(logDir.getAbsolutePath), metaProperties, MetadataVersion.latestTesting(), None)
|
||||
|
||||
val props = new Properties
|
||||
props.put(KafkaConfig.QuorumVotersProp, "100@localhost:12345")
|
||||
|
|
|
|||
|
|
@ -6175,7 +6175,7 @@ class ReplicaManagerTest {
|
|||
private def imageFromTopics(topicsImage: TopicsImage): MetadataImage = {
|
||||
val featuresImageLatest = new FeaturesImage(
|
||||
Collections.emptyMap(),
|
||||
MetadataVersion.latest(),
|
||||
MetadataVersion.latestTesting(),
|
||||
ZkMigrationState.NONE)
|
||||
new MetadataImage(
|
||||
new MetadataProvenance(100L, 10, 1000L),
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ import scala.jdk.CollectionConverters._
|
|||
*/
|
||||
class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness with Logging {
|
||||
// Set this to IBP_0_11_0_IV1 to demonstrate the tests failing in the pre-KIP-101 case
|
||||
override def metadataVersion = MetadataVersion.latest
|
||||
override def metadataVersion = MetadataVersion.latestTesting
|
||||
val topic = "topic1"
|
||||
val msg = new Array[Byte](1000)
|
||||
val msgBigger = new Array[Byte](10000)
|
||||
|
|
|
|||
|
|
@ -173,13 +173,13 @@ Found problem:
|
|||
setNodeId(2).
|
||||
build()
|
||||
val stream = new ByteArrayOutputStream()
|
||||
val bootstrapMetadata = StorageTool.buildBootstrapMetadata(MetadataVersion.latest(), None, "test format command")
|
||||
val bootstrapMetadata = StorageTool.buildBootstrapMetadata(MetadataVersion.latestTesting(), None, "test format command")
|
||||
assertEquals(0, StorageTool.
|
||||
formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latest(), ignoreFormatted = false))
|
||||
formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), ignoreFormatted = false))
|
||||
assertTrue(stream.toString().startsWith("Formatting %s".format(tempDir)))
|
||||
|
||||
try assertEquals(1, StorageTool.
|
||||
formatCommand(new PrintStream(new ByteArrayOutputStream()), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latest(), ignoreFormatted = false)) catch {
|
||||
formatCommand(new PrintStream(new ByteArrayOutputStream()), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), ignoreFormatted = false)) catch {
|
||||
case e: TerseFailure => assertEquals(s"Log directory ${tempDir} is already " +
|
||||
"formatted. Use --ignore-formatted to ignore this directory and format the " +
|
||||
"others.", e.getMessage)
|
||||
|
|
@ -187,7 +187,7 @@ Found problem:
|
|||
|
||||
val stream2 = new ByteArrayOutputStream()
|
||||
assertEquals(0, StorageTool.
|
||||
formatCommand(new PrintStream(stream2), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latest(), ignoreFormatted = true))
|
||||
formatCommand(new PrintStream(stream2), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), ignoreFormatted = true))
|
||||
assertEquals("All of the log directories are already formatted.%n".format(), stream2.toString())
|
||||
} finally Utils.delete(tempDir)
|
||||
}
|
||||
|
|
@ -379,9 +379,9 @@ Found problem:
|
|||
setNodeId(2).
|
||||
build()
|
||||
val bootstrapMetadata = StorageTool.
|
||||
buildBootstrapMetadata(MetadataVersion.latest(), None, "test format command")
|
||||
buildBootstrapMetadata(MetadataVersion.latestTesting(), None, "test format command")
|
||||
assertEquals(0, StorageTool.
|
||||
formatCommand(new PrintStream(NullOutputStream.NULL_OUTPUT_STREAM), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latest(), ignoreFormatted = false))
|
||||
formatCommand(new PrintStream(NullOutputStream.NULL_OUTPUT_STREAM), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), ignoreFormatted = false))
|
||||
|
||||
val metaPropertiesFile = Paths.get(tempDir.toURI).resolve(MetaPropertiesEnsemble.META_PROPERTIES_NAME).toFile
|
||||
assertTrue(metaPropertiesFile.exists())
|
||||
|
|
@ -416,7 +416,7 @@ Found problem:
|
|||
}
|
||||
val args = Array("format", "-c", s"${propsFile.toPath}",
|
||||
"-t", "XcZZOzUqS4yHOjhMQB6JLQ",
|
||||
"--release-version", MetadataVersion.latest().toString)
|
||||
"--release-version", MetadataVersion.latestTesting().toString)
|
||||
try {
|
||||
StorageTool.main(args)
|
||||
} catch {
|
||||
|
|
@ -428,7 +428,7 @@ Found problem:
|
|||
assertEquals("", exitString)
|
||||
assertEquals(0, exitStatus)
|
||||
} else {
|
||||
assertEquals(s"Metadata version ${MetadataVersion.latest().toString} is not ready for " +
|
||||
assertEquals(s"Metadata version ${MetadataVersion.latestTesting().toString} is not ready for " +
|
||||
"production use yet.", exitString)
|
||||
assertEquals(1, exitStatus)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -917,7 +917,7 @@ object TestUtils extends Logging {
|
|||
Broker(b.id, Seq(EndPoint("localhost", 6667, listenerName, protocol)), if (b.rack.isPresent) Some(b.rack.get()) else None)
|
||||
}
|
||||
brokers.foreach(b => zkClient.registerBroker(BrokerInfo(Broker(b.id, b.endPoints, rack = b.rack),
|
||||
MetadataVersion.latest, jmxPort = -1)))
|
||||
MetadataVersion.latestTesting, jmxPort = -1)))
|
||||
brokers
|
||||
}
|
||||
|
||||
|
|
@ -1443,7 +1443,7 @@ object TestUtils extends Logging {
|
|||
configRepository: ConfigRepository = new MockConfigRepository,
|
||||
cleanerConfig: CleanerConfig = new CleanerConfig(false),
|
||||
time: MockTime = new MockTime(),
|
||||
interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest,
|
||||
interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latestTesting,
|
||||
recoveryThreadsPerDataDir: Int = 4,
|
||||
transactionVerificationEnabled: Boolean = false,
|
||||
log: Option[UnifiedLog] = None,
|
||||
|
|
|
|||
|
|
@ -812,7 +812,7 @@ class KafkaZkClientTest extends QuorumTestHarness {
|
|||
Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)),
|
||||
rack = rack,
|
||||
features = features),
|
||||
MetadataVersion.latest, jmxPort = port + 10)
|
||||
MetadataVersion.latestTesting, jmxPort = port + 10)
|
||||
|
||||
@Test
|
||||
def testRegisterBrokerInfo(): Unit = {
|
||||
|
|
|
|||
|
|
@ -637,7 +637,7 @@ public class GroupMetadataManagerTest {
|
|||
.build());
|
||||
|
||||
assertEquals(
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())),
|
||||
syncResult.records
|
||||
);
|
||||
// Simulate a successful write to the log.
|
||||
|
|
@ -841,7 +841,7 @@ public class GroupMetadataManagerTest {
|
|||
));
|
||||
assertEquals(
|
||||
Collections.singletonList(
|
||||
RecordHelpers.newGroupMetadataRecord(group, groupAssignment, MetadataVersion.latest())),
|
||||
RecordHelpers.newGroupMetadataRecord(group, groupAssignment, MetadataVersion.latestTesting())),
|
||||
leaderSyncResult.records
|
||||
);
|
||||
|
||||
|
|
@ -901,7 +901,7 @@ public class GroupMetadataManagerTest {
|
|||
|
||||
// Now the group is stable, with the one member that joined above
|
||||
assertEquals(
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())),
|
||||
syncResult.records
|
||||
);
|
||||
// Simulate a successful write to log.
|
||||
|
|
@ -939,7 +939,7 @@ public class GroupMetadataManagerTest {
|
|||
syncResult = sendClassicGroupSync(syncRequest.setGenerationId(nextGenerationId));
|
||||
|
||||
assertEquals(
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())),
|
||||
syncResult.records
|
||||
);
|
||||
// Simulate a successful write to log.
|
||||
|
|
@ -1003,7 +1003,7 @@ public class GroupMetadataManagerTest {
|
|||
.setProtocolType("consumer")
|
||||
.setProtocol(null)
|
||||
.setCurrentStateTimestamp(time.milliseconds()),
|
||||
MetadataVersion.latest()));
|
||||
MetadataVersion.latestTesting()));
|
||||
|
||||
|
||||
Set<String> heartbeatKeys = timeouts.stream().map(timeout -> timeout.key).collect(Collectors.toSet());
|
||||
|
|
@ -4709,7 +4709,7 @@ public class GroupMetadataManagerTest {
|
|||
ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("group-id", false);
|
||||
|
||||
assertEquals(
|
||||
Collections.singletonList(RecordHelpers.newEmptyGroupMetadataRecord(group, MetadataVersion.latest())),
|
||||
Collections.singletonList(RecordHelpers.newEmptyGroupMetadataRecord(group, MetadataVersion.latestTesting())),
|
||||
joinResult.records
|
||||
);
|
||||
}
|
||||
|
|
@ -4783,7 +4783,7 @@ public class GroupMetadataManagerTest {
|
|||
.setProtocolType("consumer")
|
||||
.setProtocol("range")
|
||||
.setCurrentStateTimestamp(context.time.milliseconds()),
|
||||
MetadataVersion.latest());
|
||||
MetadataVersion.latestTesting());
|
||||
|
||||
context.replay(groupMetadataRecord);
|
||||
ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("group-id", false);
|
||||
|
|
@ -4847,7 +4847,7 @@ public class GroupMetadataManagerTest {
|
|||
.setProtocolType("consumer")
|
||||
.setProtocol("range")
|
||||
.setCurrentStateTimestamp(context.time.milliseconds()),
|
||||
MetadataVersion.latest());
|
||||
MetadataVersion.latestTesting());
|
||||
|
||||
context.replay(groupMetadataRecord);
|
||||
context.groupMetadataManager.onLoaded();
|
||||
|
|
@ -4887,7 +4887,7 @@ public class GroupMetadataManagerTest {
|
|||
.setProtocolType("consumer")
|
||||
.setProtocol("range")
|
||||
.setCurrentStateTimestamp(context.time.milliseconds()),
|
||||
MetadataVersion.latest());
|
||||
MetadataVersion.latestTesting());
|
||||
|
||||
context.replay(groupMetadataRecord);
|
||||
context.groupMetadataManager.onLoaded();
|
||||
|
|
@ -5826,7 +5826,7 @@ public class GroupMetadataManagerTest {
|
|||
timeouts.forEach(timeout -> {
|
||||
assertEquals(classicGroupHeartbeatKey("group-id", memberId), timeout.key);
|
||||
assertEquals(Collections.singletonList(
|
||||
newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
|
||||
newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())),
|
||||
timeout.result.records());
|
||||
});
|
||||
|
||||
|
|
@ -6305,7 +6305,7 @@ public class GroupMetadataManagerTest {
|
|||
);
|
||||
|
||||
assertEquals(
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())),
|
||||
joinResult.records
|
||||
);
|
||||
assertFalse(joinResult.joinFuture.isDone());
|
||||
|
|
@ -6432,7 +6432,7 @@ public class GroupMetadataManagerTest {
|
|||
);
|
||||
|
||||
assertEquals(
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())),
|
||||
joinResult.records
|
||||
);
|
||||
assertFalse(joinResult.joinFuture.isDone());
|
||||
|
|
@ -6508,7 +6508,7 @@ public class GroupMetadataManagerTest {
|
|||
supportSkippingAssignment);
|
||||
|
||||
assertEquals(
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())),
|
||||
joinResult.records
|
||||
);
|
||||
assertFalse(joinResult.joinFuture.isDone());
|
||||
|
|
@ -6646,7 +6646,7 @@ public class GroupMetadataManagerTest {
|
|||
.setProtocolType("consumer")
|
||||
.setProtocol(null)
|
||||
.setCurrentStateTimestamp(context.time.milliseconds()),
|
||||
MetadataVersion.latest())
|
||||
MetadataVersion.latestTesting())
|
||||
);
|
||||
|
||||
assertEquals(1, timeouts.size());
|
||||
|
|
@ -7076,7 +7076,7 @@ public class GroupMetadataManagerTest {
|
|||
);
|
||||
|
||||
assertEquals(
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())),
|
||||
joinResult.records
|
||||
);
|
||||
// Simulate a successful write to the log.
|
||||
|
|
@ -7389,7 +7389,7 @@ public class GroupMetadataManagerTest {
|
|||
);
|
||||
|
||||
assertEquals(
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())),
|
||||
followerJoinResult.records
|
||||
);
|
||||
// Simulate a failed write to the log.
|
||||
|
|
@ -7446,7 +7446,7 @@ public class GroupMetadataManagerTest {
|
|||
leaderSyncResult.appendFuture.complete(null);
|
||||
|
||||
assertEquals(
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())),
|
||||
leaderSyncResult.records
|
||||
);
|
||||
|
||||
|
|
@ -7496,7 +7496,7 @@ public class GroupMetadataManagerTest {
|
|||
);
|
||||
|
||||
assertEquals(
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())),
|
||||
followerJoinResult.records
|
||||
);
|
||||
|
||||
|
|
@ -7708,7 +7708,7 @@ public class GroupMetadataManagerTest {
|
|||
);
|
||||
|
||||
assertEquals(
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())),
|
||||
followerJoinResult.records
|
||||
);
|
||||
// Simulate a successful write to log.
|
||||
|
|
@ -7915,7 +7915,7 @@ public class GroupMetadataManagerTest {
|
|||
);
|
||||
|
||||
assertEquals(
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())),
|
||||
leaderJoinResult.records
|
||||
);
|
||||
// Simulate a successful write to log.
|
||||
|
|
@ -8696,7 +8696,7 @@ public class GroupMetadataManagerTest {
|
|||
|
||||
assertEquals(
|
||||
Collections.singletonList(
|
||||
RecordHelpers.newGroupMetadataRecord(group, updatedAssignment, MetadataVersion.latest())),
|
||||
RecordHelpers.newGroupMetadataRecord(group, updatedAssignment, MetadataVersion.latestTesting())),
|
||||
syncResult.records
|
||||
);
|
||||
|
||||
|
|
@ -9357,7 +9357,7 @@ public class GroupMetadataManagerTest {
|
|||
ExpiredTimeout<Void, Record> timeout = timeouts.get(0);
|
||||
assertEquals(classicGroupSyncKey("group-id"), timeout.key);
|
||||
assertEquals(
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())),
|
||||
timeout.result.records()
|
||||
);
|
||||
|
||||
|
|
@ -9513,7 +9513,7 @@ public class GroupMetadataManagerTest {
|
|||
|
||||
if (response.memberId().equals(leaderId)) {
|
||||
assertEquals(
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())),
|
||||
syncResult.records
|
||||
);
|
||||
|
||||
|
|
@ -9607,7 +9607,7 @@ public class GroupMetadataManagerTest {
|
|||
.setProtocolType(classicGroupType)
|
||||
.setProtocol("range")
|
||||
.setCurrentStateTimestamp(context.time.milliseconds()),
|
||||
MetadataVersion.latest()));
|
||||
MetadataVersion.latestTesting()));
|
||||
context.commit();
|
||||
ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false);
|
||||
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, new ConsumerGroupMember.Builder(memberId1)
|
||||
|
|
@ -9794,7 +9794,7 @@ public class GroupMetadataManagerTest {
|
|||
context.replay(newGroupMetadataRecord(
|
||||
"group-id",
|
||||
groupMetadataValue,
|
||||
MetadataVersion.latest()
|
||||
MetadataVersion.latestTesting()
|
||||
));
|
||||
context.verifyDescribeGroupsReturnsDeadGroup("group-id");
|
||||
context.commit();
|
||||
|
|
@ -9843,7 +9843,7 @@ public class GroupMetadataManagerTest {
|
|||
context.replay(newGroupMetadataRecord(
|
||||
"group-id",
|
||||
groupMetadataValue,
|
||||
MetadataVersion.latest()
|
||||
MetadataVersion.latestTesting()
|
||||
));
|
||||
ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("group-id", false);
|
||||
context.groupMetadataManager.prepareRebalance(group, "trigger rebalance");
|
||||
|
|
@ -10264,7 +10264,7 @@ public class GroupMetadataManagerTest {
|
|||
))
|
||||
);
|
||||
assertEquals(
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
|
||||
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latestTesting())),
|
||||
leaveResult.records()
|
||||
);
|
||||
// Simulate a successful write to the log.
|
||||
|
|
|
|||
|
|
@ -380,7 +380,7 @@ public class OffsetMetadataManagerTest {
|
|||
commitTimestamp,
|
||||
OptionalLong.empty()
|
||||
),
|
||||
MetadataVersion.latest()
|
||||
MetadataVersion.latestTesting()
|
||||
));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -149,7 +149,7 @@ public class ReplicaFetcherThreadBenchmark {
|
|||
setFlushStartOffsetCheckpointMs(10000L).
|
||||
setRetentionCheckMs(1000L).
|
||||
setProducerStateManagerConfig(60000, false).
|
||||
setInterBrokerProtocolVersion(MetadataVersion.latest()).
|
||||
setInterBrokerProtocolVersion(MetadataVersion.latestTesting()).
|
||||
setScheduler(scheduler).
|
||||
setBrokerTopicStats(brokerTopicStats).
|
||||
setLogDirFailureChannel(logDirFailureChannel).
|
||||
|
|
@ -178,7 +178,7 @@ public class ReplicaFetcherThreadBenchmark {
|
|||
OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class);
|
||||
Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Option.apply(0L));
|
||||
AlterPartitionManager isrChannelManager = Mockito.mock(AlterPartitionManager.class);
|
||||
Partition partition = new Partition(tp, 100, MetadataVersion.latest(),
|
||||
Partition partition = new Partition(tp, 100, MetadataVersion.latestTesting(),
|
||||
0, () -> -1, Time.SYSTEM, alterPartitionListener, new DelayedOperationsMock(tp),
|
||||
Mockito.mock(MetadataCache.class), logManager, isrChannelManager);
|
||||
|
||||
|
|
|
|||
|
|
@ -203,7 +203,7 @@ public class KRaftMetadataRequestBenchmark {
|
|||
ApiMessageType.ListenerType.BROKER,
|
||||
false,
|
||||
false,
|
||||
() -> Features.fromKRaftVersion(MetadataVersion.latest()))).
|
||||
() -> Features.fromKRaftVersion(MetadataVersion.latestTesting()))).
|
||||
build();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -113,7 +113,7 @@ public class MetadataRequestBenchmark {
|
|||
private Metrics metrics = new Metrics();
|
||||
private int brokerId = 1;
|
||||
private ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId,
|
||||
MetadataVersion.latest(), BrokerFeatures.createEmpty(), null, false);
|
||||
MetadataVersion.latestTesting(), BrokerFeatures.createEmpty(), null, false);
|
||||
private ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class);
|
||||
private ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class);
|
||||
private ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class);
|
||||
|
|
@ -204,7 +204,7 @@ public class MetadataRequestBenchmark {
|
|||
ApiMessageType.ListenerType.ZK_BROKER,
|
||||
false,
|
||||
false,
|
||||
() -> Features.fromKRaftVersion(MetadataVersion.latest()))).
|
||||
() -> Features.fromKRaftVersion(MetadataVersion.latestTesting()))).
|
||||
build();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -108,7 +108,7 @@ public class PartitionMakeFollowerBenchmark {
|
|||
setFlushStartOffsetCheckpointMs(10000L).
|
||||
setRetentionCheckMs(1000L).
|
||||
setProducerStateManagerConfig(60000, false).
|
||||
setInterBrokerProtocolVersion(MetadataVersion.latest()).
|
||||
setInterBrokerProtocolVersion(MetadataVersion.latestTesting()).
|
||||
setScheduler(scheduler).
|
||||
setBrokerTopicStats(brokerTopicStats).
|
||||
setLogDirFailureChannel(logDirFailureChannel).
|
||||
|
|
@ -122,7 +122,7 @@ public class PartitionMakeFollowerBenchmark {
|
|||
AlterPartitionListener alterPartitionListener = Mockito.mock(AlterPartitionListener.class);
|
||||
AlterPartitionManager alterPartitionManager = Mockito.mock(AlterPartitionManager.class);
|
||||
partition = new Partition(tp, 100,
|
||||
MetadataVersion.latest(), 0, () -> -1, Time.SYSTEM,
|
||||
MetadataVersion.latestTesting(), 0, () -> -1, Time.SYSTEM,
|
||||
alterPartitionListener, delayedOperations,
|
||||
Mockito.mock(MetadataCache.class), logManager, alterPartitionManager);
|
||||
partition.createLogIfNotExists(true, false, offsetCheckpoints, topicId, Option.empty());
|
||||
|
|
|
|||
|
|
@ -98,7 +98,7 @@ public class UpdateFollowerFetchStateBenchmark {
|
|||
setFlushStartOffsetCheckpointMs(10000L).
|
||||
setRetentionCheckMs(1000L).
|
||||
setProducerStateManagerConfig(60000, false).
|
||||
setInterBrokerProtocolVersion(MetadataVersion.latest()).
|
||||
setInterBrokerProtocolVersion(MetadataVersion.latestTesting()).
|
||||
setScheduler(scheduler).
|
||||
setBrokerTopicStats(brokerTopicStats).
|
||||
setLogDirFailureChannel(logDirFailureChannel).
|
||||
|
|
@ -125,7 +125,7 @@ public class UpdateFollowerFetchStateBenchmark {
|
|||
AlterPartitionListener alterPartitionListener = Mockito.mock(AlterPartitionListener.class);
|
||||
AlterPartitionManager alterPartitionManager = Mockito.mock(AlterPartitionManager.class);
|
||||
partition = new Partition(topicPartition, 100,
|
||||
MetadataVersion.latest(), 0, () -> -1, Time.SYSTEM,
|
||||
MetadataVersion.latestTesting(), 0, () -> -1, Time.SYSTEM,
|
||||
alterPartitionListener, delayedOperations,
|
||||
Mockito.mock(MetadataCache.class), logManager, alterPartitionManager);
|
||||
partition.makeLeader(partitionState, offsetCheckpoints, topicId, Option.empty());
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ public class CompressedRecordBatchValidationBenchmark extends BaseRecordBatchBen
|
|||
new LogValidator(records, new TopicPartition("a", 0),
|
||||
Time.SYSTEM, compressionType, compressionType, false, messageVersion,
|
||||
TimestampType.CREATE_TIME, Long.MAX_VALUE, Long.MAX_VALUE, 0, AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest()
|
||||
MetadataVersion.latestTesting()
|
||||
).validateMessagesAndAssignOffsetsCompressed(PrimitiveRef.ofLong(startingOffset),
|
||||
validatorMetricsRecorder, requestLocal.bufferSupplier());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ public class UncompressedRecordBatchValidationBenchmark extends BaseRecordBatchB
|
|||
new LogValidator(records, new TopicPartition("a", 0),
|
||||
Time.SYSTEM, CompressionType.NONE, CompressionType.NONE, false,
|
||||
messageVersion, TimestampType.CREATE_TIME, Long.MAX_VALUE, Long.MAX_VALUE, 0, AppendOrigin.CLIENT,
|
||||
MetadataVersion.latest()
|
||||
MetadataVersion.latestTesting()
|
||||
).assignOffsetsNonCompressed(PrimitiveRef.ofLong(startingOffset), validatorMetricsRecorder);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -109,7 +109,7 @@ public class CheckpointBench {
|
|||
JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
|
||||
this.logManager = TestUtils.createLogManager(JavaConverters.asScalaBuffer(files),
|
||||
new LogConfig(new Properties()), new MockConfigRepository(), new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d,
|
||||
1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, MetadataVersion.latest(), 4, false, Option.empty(), false);
|
||||
1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, MetadataVersion.latestTesting(), 4, false, Option.empty(), false);
|
||||
scheduler.startup();
|
||||
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
|
||||
final MetadataCache metadataCache =
|
||||
|
|
|
|||
|
|
@ -135,7 +135,7 @@ public class PartitionCreationBench {
|
|||
setFlushStartOffsetCheckpointMs(10000L).
|
||||
setRetentionCheckMs(1000L).
|
||||
setProducerStateManagerConfig(60000, false).
|
||||
setInterBrokerProtocolVersion(MetadataVersion.latest()).
|
||||
setInterBrokerProtocolVersion(MetadataVersion.latestTesting()).
|
||||
setScheduler(scheduler).
|
||||
setBrokerTopicStats(brokerTopicStats).
|
||||
setLogDirFailureChannel(failureChannel).
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ public class FeatureControlManager {
|
|||
private LogContext logContext = null;
|
||||
private SnapshotRegistry snapshotRegistry = null;
|
||||
private QuorumFeatures quorumFeatures = null;
|
||||
private MetadataVersion metadataVersion = MetadataVersion.latest();
|
||||
private MetadataVersion metadataVersion = MetadataVersion.latestProduction();
|
||||
private MetadataVersion minimumBootstrapVersion = MetadataVersion.MINIMUM_BOOTSTRAP_VERSION;
|
||||
private ClusterFeatureSupportDescriber clusterSupportDescriber = new ClusterFeatureSupportDescriber() {
|
||||
@Override
|
||||
|
|
@ -105,7 +105,7 @@ public class FeatureControlManager {
|
|||
Map<String, VersionRange> localSupportedFeatures = new HashMap<>();
|
||||
localSupportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
|
||||
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
|
||||
MetadataVersion.latest().featureLevel()));
|
||||
MetadataVersion.latestProduction().featureLevel()));
|
||||
quorumFeatures = new QuorumFeatures(0, localSupportedFeatures, Collections.singletonList(0));
|
||||
}
|
||||
return new FeatureControlManager(
|
||||
|
|
|
|||
|
|
@ -59,8 +59,8 @@ public final class QuorumFeatures {
|
|||
features.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
|
||||
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
|
||||
enableUnstable ?
|
||||
MetadataVersion.latest().featureLevel() :
|
||||
MetadataVersion.LATEST_PRODUCTION.featureLevel()));
|
||||
MetadataVersion.latestTesting().featureLevel() :
|
||||
MetadataVersion.latestProduction().featureLevel()));
|
||||
return features;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ public final class ImageWriterOptions {
|
|||
};
|
||||
|
||||
public Builder() {
|
||||
this.metadataVersion = MetadataVersion.latest();
|
||||
this.metadataVersion = MetadataVersion.latestProduction();
|
||||
}
|
||||
|
||||
public Builder(MetadataImage image) {
|
||||
|
|
|
|||
|
|
@ -112,7 +112,7 @@ public class ControllerRegistration {
|
|||
supportedFeatures = new HashMap<>();
|
||||
supportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
|
||||
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
|
||||
MetadataVersion.latest().featureLevel()));
|
||||
MetadataVersion.latestProduction().featureLevel()));
|
||||
}
|
||||
return new ControllerRegistration(id,
|
||||
incarnationId,
|
||||
|
|
|
|||
|
|
@ -82,7 +82,7 @@ public class BootstrapDirectory {
|
|||
|
||||
BootstrapMetadata readFromConfiguration() {
|
||||
if (!ibp.isPresent()) {
|
||||
return BootstrapMetadata.fromVersion(MetadataVersion.latest(), "the default bootstrap");
|
||||
return BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(), "the default bootstrap");
|
||||
}
|
||||
MetadataVersion version = MetadataVersion.fromVersionString(ibp.get());
|
||||
if (version.isLessThan(MINIMUM_BOOTSTRAP_VERSION)) {
|
||||
|
|
|
|||
|
|
@ -280,7 +280,7 @@ public class ClusterControlManagerTest {
|
|||
MetadataVersion.IBP_3_3_IV2,
|
||||
MetadataVersion.IBP_3_3_IV3,
|
||||
MetadataVersion.IBP_3_7_IV2, // introduces directory assignment
|
||||
MetadataVersion.latest()
|
||||
MetadataVersion.latestTesting()
|
||||
).map(Arguments::of);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -384,7 +384,7 @@ public class FeatureControlManagerTest {
|
|||
public void testCreateFeatureLevelRecords() {
|
||||
Map<String, VersionRange> localSupportedFeatures = new HashMap<>();
|
||||
localSupportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
|
||||
MetadataVersion.IBP_3_0_IV1.featureLevel(), MetadataVersion.latest().featureLevel()));
|
||||
MetadataVersion.IBP_3_0_IV1.featureLevel(), MetadataVersion.latestTesting().featureLevel()));
|
||||
localSupportedFeatures.put("foo", VersionRange.of(0, 2));
|
||||
FeatureControlManager manager = new FeatureControlManager.Builder().
|
||||
setQuorumFeatures(new QuorumFeatures(0, localSupportedFeatures, emptyList())).
|
||||
|
|
|
|||
|
|
@ -1057,7 +1057,7 @@ public class PartitionChangeBuilderTest {
|
|||
setPartitionEpoch(200).
|
||||
build();
|
||||
Optional<ApiMessageAndVersion> built = new PartitionChangeBuilder(registration, FOO_ID,
|
||||
0, r -> true, MetadataVersion.latest(), 2).
|
||||
0, r -> true, MetadataVersion.latestTesting(), 2).
|
||||
setDirectory(3, Uuid.fromString("pN1VKs9zRzK4APflpegAVg")).
|
||||
setDirectory(1, DirectoryId.LOST).
|
||||
setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ public class QuorumControllerIntegrationTestUtils {
|
|||
private final static Logger log = LoggerFactory.getLogger(QuorumControllerIntegrationTestUtils.class);
|
||||
|
||||
BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
|
||||
return brokerFeatures(MetadataVersion.MINIMUM_KRAFT_VERSION, MetadataVersion.latest());
|
||||
return brokerFeatures(MetadataVersion.MINIMUM_KRAFT_VERSION, MetadataVersion.latestTesting());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -94,7 +94,7 @@ public class QuorumControllerIntegrationTestUtils {
|
|||
.setBrokerId(brokerId)
|
||||
.setRack(null)
|
||||
.setClusterId(controller.clusterId())
|
||||
.setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latest()))
|
||||
.setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting()))
|
||||
.setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId))
|
||||
.setLogDirs(Collections.singletonList(
|
||||
Uuid.fromString("TESTBROKER" + Integer.toString(100000 + brokerId).substring(1) + "DIRAAAA")
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
|
|||
private OptionalLong sessionTimeoutMillis = OptionalLong.empty();
|
||||
private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty();
|
||||
private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
|
||||
fromVersion(MetadataVersion.latest(), "test-provided version");
|
||||
fromVersion(MetadataVersion.latestTesting(), "test-provided version");
|
||||
|
||||
public Builder(LocalLogManagerTestEnv logEnv) {
|
||||
this.logEnv = logEnv;
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ public class QuorumFeaturesTest {
|
|||
Map<String, VersionRange> expectedFeatures = new HashMap<>(1);
|
||||
expectedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
|
||||
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
|
||||
MetadataVersion.latest().featureLevel()));
|
||||
MetadataVersion.latestTesting().featureLevel()));
|
||||
assertEquals(expectedFeatures, QuorumFeatures.defaultFeatureMap(true));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -166,7 +166,7 @@ public class ReplicationControlManagerTest {
|
|||
private static class ReplicationControlTestContext {
|
||||
private static class Builder {
|
||||
private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
|
||||
private MetadataVersion metadataVersion = MetadataVersion.latest();
|
||||
private MetadataVersion metadataVersion = MetadataVersion.latestTesting();
|
||||
private MockTime mockTime = new MockTime();
|
||||
private boolean isElrEnabled = false;
|
||||
|
||||
|
|
@ -1623,7 +1623,7 @@ public class ReplicationControlManagerTest {
|
|||
@ParameterizedTest
|
||||
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
|
||||
public void testReassignPartitions(short version) throws Exception {
|
||||
MetadataVersion metadataVersion = MetadataVersion.latest();
|
||||
MetadataVersion metadataVersion = MetadataVersion.latestTesting();
|
||||
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
|
||||
.setMetadataVersion(metadataVersion)
|
||||
.build();
|
||||
|
|
@ -1699,7 +1699,7 @@ public class ReplicationControlManagerTest {
|
|||
)).
|
||||
setLeader(3).
|
||||
setRemovingReplicas(Collections.emptyList()).
|
||||
setAddingReplicas(Collections.emptyList()), MetadataVersion.latest().partitionChangeRecordVersion())),
|
||||
setAddingReplicas(Collections.emptyList()), MetadataVersion.latestTesting().partitionChangeRecordVersion())),
|
||||
new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
|
||||
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
|
||||
new ReassignablePartitionResponse().setPartitionIndex(0).
|
||||
|
|
@ -1960,7 +1960,7 @@ public class ReplicationControlManagerTest {
|
|||
|
||||
@Test
|
||||
public void testCancelReassignPartitions() throws Exception {
|
||||
MetadataVersion metadataVersion = MetadataVersion.latest();
|
||||
MetadataVersion metadataVersion = MetadataVersion.latestTesting();
|
||||
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
|
||||
.setMetadataVersion(metadataVersion)
|
||||
.build();
|
||||
|
|
@ -2094,7 +2094,7 @@ public class ReplicationControlManagerTest {
|
|||
Uuid.fromString("TESTBROKER00004DIRAAAA")
|
||||
)).
|
||||
setRemovingReplicas(null).
|
||||
setAddingReplicas(Collections.emptyList()), MetadataVersion.latest().partitionChangeRecordVersion())),
|
||||
setAddingReplicas(Collections.emptyList()), MetadataVersion.latestTesting().partitionChangeRecordVersion())),
|
||||
new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
|
||||
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
|
||||
new ReassignablePartitionResponse().setPartitionIndex(0).
|
||||
|
|
@ -2435,13 +2435,13 @@ public class ReplicationControlManagerTest {
|
|||
setPartitionId(0).
|
||||
setTopicId(fooId).
|
||||
setLeader(1),
|
||||
MetadataVersion.latest().partitionChangeRecordVersion()),
|
||||
MetadataVersion.latestTesting().partitionChangeRecordVersion()),
|
||||
new ApiMessageAndVersion(
|
||||
new PartitionChangeRecord().
|
||||
setPartitionId(2).
|
||||
setTopicId(fooId).
|
||||
setLeader(0),
|
||||
MetadataVersion.latest().partitionChangeRecordVersion())),
|
||||
MetadataVersion.latestTesting().partitionChangeRecordVersion())),
|
||||
election2Result.records());
|
||||
}
|
||||
|
||||
|
|
@ -2484,7 +2484,7 @@ public class ReplicationControlManagerTest {
|
|||
.setPartitionId(0)
|
||||
.setTopicId(fooId)
|
||||
.setLeader(1);
|
||||
assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latest().partitionChangeRecordVersion())), balanceResult.records());
|
||||
assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latestTesting().partitionChangeRecordVersion())), balanceResult.records());
|
||||
assertTrue(replication.arePartitionLeadersImbalanced());
|
||||
assertFalse(balanceResult.response());
|
||||
|
||||
|
|
@ -2516,7 +2516,7 @@ public class ReplicationControlManagerTest {
|
|||
.setPartitionId(2)
|
||||
.setTopicId(fooId)
|
||||
.setLeader(0);
|
||||
assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latest().partitionChangeRecordVersion())), balanceResult.records());
|
||||
assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latestTesting().partitionChangeRecordVersion())), balanceResult.records());
|
||||
assertFalse(replication.arePartitionLeadersImbalanced());
|
||||
assertFalse(balanceResult.response());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ public class FeaturesImageTest {
|
|||
map1.put("foo", (short) 2);
|
||||
map1.put("bar", (short) 1);
|
||||
map1.put("baz", (short) 8);
|
||||
IMAGE1 = new FeaturesImage(map1, MetadataVersion.latest(), ZkMigrationState.NONE);
|
||||
IMAGE1 = new FeaturesImage(map1, MetadataVersion.latestTesting(), ZkMigrationState.NONE);
|
||||
|
||||
DELTA1_RECORDS = new ArrayList<>();
|
||||
DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
|
|
@ -69,7 +69,7 @@ public class FeaturesImageTest {
|
|||
|
||||
Map<String, Short> map2 = new HashMap<>();
|
||||
map2.put("foo", (short) 3);
|
||||
IMAGE2 = new FeaturesImage(map2, MetadataVersion.latest(), ZkMigrationState.NONE);
|
||||
IMAGE2 = new FeaturesImage(map2, MetadataVersion.latestTesting(), ZkMigrationState.NONE);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ public class BootstrapDirectoryTest {
|
|||
@Test
|
||||
public void testReadFromEmptyConfiguration() throws Exception {
|
||||
try (BootstrapTestDirectory testDirectory = new BootstrapTestDirectory().createDirectory()) {
|
||||
assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.latest(),
|
||||
assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(),
|
||||
"the default bootstrap"),
|
||||
new BootstrapDirectory(testDirectory.path(), Optional.empty()).read());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -582,10 +582,15 @@ public enum MetadataVersion {
|
|||
}
|
||||
}
|
||||
|
||||
public static MetadataVersion latest() {
|
||||
// Testing only
|
||||
public static MetadataVersion latestTesting() {
|
||||
return VERSIONS[VERSIONS.length - 1];
|
||||
}
|
||||
|
||||
public static MetadataVersion latestProduction() {
|
||||
return LATEST_PRODUCTION;
|
||||
}
|
||||
|
||||
public static boolean checkIfMetadataChanged(MetadataVersion sourceVersion, MetadataVersion targetVersion) {
|
||||
if (sourceVersion == targetVersion) {
|
||||
return false;
|
||||
|
|
|
|||
|
|
@ -435,9 +435,9 @@ class MetadataVersionTest {
|
|||
|
||||
@Test
|
||||
public void assertLatestProductionIsLessThanLatest() {
|
||||
assertTrue(LATEST_PRODUCTION.ordinal() < MetadataVersion.latest().ordinal(),
|
||||
assertTrue(LATEST_PRODUCTION.ordinal() < MetadataVersion.latestTesting().ordinal(),
|
||||
"Expected LATEST_PRODUCTION " + LATEST_PRODUCTION +
|
||||
" to be less than the latest of " + MetadataVersion.latest());
|
||||
" to be less than the latest of " + MetadataVersion.latestTesting());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -447,6 +447,6 @@ class MetadataVersionTest {
|
|||
|
||||
@Test
|
||||
public void assertLatestIsNotProduction() {
|
||||
assertFalse(MetadataVersion.latest().isProduction());
|
||||
assertFalse(MetadataVersion.latestTesting().isProduction());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -244,7 +244,7 @@ public class FeatureCommand {
|
|||
} catch (Throwable e) {
|
||||
throw new TerseException("Unsupported metadata version " + metadata +
|
||||
". Supported metadata versions are " + metadataVersionsToString(
|
||||
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, MetadataVersion.latest()));
|
||||
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, MetadataVersion.latestProduction()));
|
||||
}
|
||||
updates.put(MetadataVersion.FEATURE_NAME, new FeatureUpdate(version.featureLevel(), upgradeType));
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue