MINOR: Bump LATEST_PRODUCTION to 4.1IV1 and Use MV to enable ELR (#20174)
CI / build (push) Waiting to run Details

Removing the isEligibleLeaderReplicasV1Enabled to let ELR be enabled if
MV is at least 4.1IV1. Also bump the Latest Prod MV to 4.1IV1

Reviewers: Jun Rao <junrao@gmail.com>
This commit is contained in:
Calvin Liu 2025-07-15 20:23:53 -07:00 committed by GitHub
parent f35f94b3e6
commit 98cb8df7a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 49 additions and 25 deletions

View File

@ -52,6 +52,7 @@ import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVersion}
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.logger.LoggingController
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogFileUtils}
@ -60,7 +61,7 @@ import org.apache.logging.log4j.core.config.Configurator
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{MethodSource}
import org.junit.jupiter.params.provider.MethodSource
import org.slf4j.LoggerFactory
import java.util.AbstractMap.SimpleImmutableEntry
@ -3002,6 +3003,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
def testElectUncleanLeadersForOnePartition(): Unit = {
// Case: unclean leader election with one topic partition
client = createAdminClient
disableEligibleLeaderReplicas(client)
val broker1 = 1
val broker2 = 2
@ -3029,6 +3031,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
def testElectUncleanLeadersForManyPartitions(): Unit = {
// Case: unclean leader election with many topic partitions
client = createAdminClient
disableEligibleLeaderReplicas(client)
val broker1 = 1
val broker2 = 2
@ -3068,6 +3071,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
def testElectUncleanLeadersForAllPartitions(): Unit = {
// Case: noop unclean leader election and valid unclean leader election for all partitions
client = createAdminClient
disableEligibleLeaderReplicas(client)
val broker1 = 1
val broker2 = 2
@ -3107,6 +3111,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
def testElectUncleanLeadersForUnknownPartitions(): Unit = {
// Case: unclean leader election for unknown topic
client = createAdminClient
disableEligibleLeaderReplicas(client)
val broker1 = 1
val broker2 = 2
@ -3132,6 +3137,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
def testElectUncleanLeadersWhenNoLiveBrokers(): Unit = {
// Case: unclean leader election with no live brokers
client = createAdminClient
disableEligibleLeaderReplicas(client)
val broker1 = 1
val broker2 = 2
@ -3160,6 +3166,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
def testElectUncleanLeadersNoop(): Unit = {
// Case: noop unclean leader election with explicit topic partitions
client = createAdminClient
disableEligibleLeaderReplicas(client)
val broker1 = 1
val broker2 = 2
@ -3187,6 +3194,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
def testElectUncleanLeadersAndNoop(): Unit = {
// Case: one noop unclean leader election and one valid unclean leader election
client = createAdminClient
disableEligibleLeaderReplicas(client)
val broker1 = 1
val broker2 = 2
@ -3878,6 +3886,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
testAppendConfig(props, "0:0", "1:1,0:0")
}
private def disableEligibleLeaderReplicas(admin: Admin): Unit = {
if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_1_IV0)) {
admin.updateFeatures(
util.Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, new FeatureUpdate(0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)),
new UpdateFeaturesOptions()).all().get()
}
}
private def testAppendConfig(props: Properties, append: String, expected: String): Unit = {
client = createAdminClient
createTopic(topic, topicConfig = props)

View File

@ -40,7 +40,7 @@ import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.queue.KafkaEventQueue
import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.server.{ClientMetricsManager, ServerSocketFactory}
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVersion, TransactionVersion}
import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
import org.apache.kafka.server.util.timer.SystemTimer
@ -284,12 +284,6 @@ abstract class QuorumTestHarness extends Logging {
} else TransactionVersion.TV_1.featureLevel()
formatter.setFeatureLevel(TransactionVersion.FEATURE_NAME, transactionVersion)
val elrVersion =
if (TestInfoUtils.isEligibleLeaderReplicasV1Enabled(testInfo)) {
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()
} else EligibleLeaderReplicasVersion.ELRV_0.featureLevel()
formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME, elrVersion)
addFormatterSettings(formatter)
formatter.run()
val bootstrapMetadata = formatter.bootstrapMetadata()

View File

@ -50,12 +50,4 @@ object TestInfoUtils {
def isTransactionV2Enabled(testInfo: TestInfo): Boolean = {
!testInfo.getDisplayName.contains("isTV2Enabled=false")
}
/**
* Returns whether eligible leader replicas version 1 is enabled.
* When no parameter is provided, the default returned is false.
*/
def isEligibleLeaderReplicasV1Enabled(testInfo: TestInfo): Boolean = {
testInfo.getDisplayName.contains("isELRV1Enabled=true")
}
}

View File

@ -31,7 +31,7 @@ import org.apache.kafka.common.errors.{InvalidConfigurationException, TimeoutExc
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, AlterConfigsResult, ConfigEntry}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, AlterConfigsResult, ConfigEntry, FeatureUpdate, UpdateFeaturesOptions}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.metrics.KafkaYammerMetrics
@ -42,6 +42,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import com.yammer.metrics.core.Meter
import org.apache.kafka.metadata.LeaderConstants
import org.apache.kafka.server.common.MetadataVersion
import org.apache.logging.log4j.core.config.Configurator
class UncleanLeaderElectionTest extends QuorumTestHarness {
@ -119,6 +120,14 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
admin = TestUtils.createAdminClient(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), adminConfigs)
}
private def disableEligibleLeaderReplicas(): Unit = {
if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_1_IV0)) {
admin.updateFeatures(
java.util.Map.of("eligible.leader.replicas.version", new FeatureUpdate(0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)),
new UpdateFeaturesOptions()).all().get()
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testUncleanLeaderElectionEnabled(groupProtocol: String): Unit = {
@ -126,6 +135,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
configProps1.put("unclean.leader.election.enable", "true")
configProps2.put("unclean.leader.election.enable", "true")
startBrokers(Seq(configProps1, configProps2))
disableEligibleLeaderReplicas()
// create topic with 1 partition, 2 replicas, one on each broker
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)))
@ -137,6 +147,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
def testUncleanLeaderElectionDisabled(groupProtocol: String): Unit = {
// unclean leader election is disabled by default
startBrokers(Seq(configProps1, configProps2))
disableEligibleLeaderReplicas()
// create topic with 1 partition, 2 replicas, one on each broker
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)))
@ -151,6 +162,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
configProps1.put("unclean.leader.election.enable", "false")
configProps2.put("unclean.leader.election.enable", "false")
startBrokers(Seq(configProps1, configProps2))
disableEligibleLeaderReplicas()
// create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election enabled
val topicProps = new Properties()
@ -167,6 +179,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
configProps1.put("unclean.leader.election.enable", "true")
configProps2.put("unclean.leader.election.enable", "true")
startBrokers(Seq(configProps1, configProps2))
disableEligibleLeaderReplicas()
// create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election disabled
val topicProps = new Properties()
@ -180,6 +193,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testUncleanLeaderElectionInvalidTopicOverride(groupProtocol: String): Unit = {
startBrokers(Seq(configProps1))
disableEligibleLeaderReplicas()
// create topic with an invalid value for unclean leader election
val topicProps = new Properties()
@ -328,6 +342,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
def testTopicUncleanLeaderElectionEnableWithAlterTopicConfigs(groupProtocol: String): Unit = {
// unclean leader election is disabled by default
startBrokers(Seq(configProps1, configProps2))
disableEligibleLeaderReplicas()
// create topic with 1 partition, 2 replicas, one on each broker
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)))

View File

@ -378,6 +378,9 @@ public class FormatterTest {
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.latestProduction().featureLevel()),
(short) 0));
expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(EligibleLeaderReplicasVersion.FEATURE_NAME).
setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel()), (short) 0));
expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(GroupVersion.FEATURE_NAME).
setFeatureLevel(GroupVersion.GV_1.featureLevel()), (short) 0));

View File

@ -105,12 +105,6 @@ public enum MetadataVersion {
// Enables async remote LIST_OFFSETS support (KIP-1075)
IBP_4_0_IV3(25, "4.0", "IV3", false),
//
// NOTE: MetadataVersions after this point are unstable and may be changed.
// If users attempt to use an unstable MetadataVersion, they will get an error.
// Please move this comment when updating the LATEST_PRODUCTION constant.
//
// Enables ELR by default for new clusters (KIP-966).
// Share groups are preview in 4.1 (KIP-932).
// Streams groups are early access in 4.1 (KIP-1071).
@ -119,6 +113,12 @@ public enum MetadataVersion {
// Send FETCH version 18 in the replica fetcher (KIP-1166)
IBP_4_1_IV1(27, "4.1", "IV1", false),
//
// NOTE: MetadataVersions after this point are unstable and may be changed.
// If users attempt to use an unstable MetadataVersion, they will get an error.
// Please move this comment when updating the LATEST_PRODUCTION constant.
//
// Insert any additional IBP_4_1_IVx versions above this comment, and bump the feature level of
// IBP_4_2_IVx accordingly. When 4.2 development begins, IBP_4_2_IV0 will cease to be
// a placeholder.
@ -157,7 +157,7 @@ public enum MetadataVersion {
* <strong>Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION,
* IT CANNOT BE CHANGED.</strong>
*/
public static final MetadataVersion LATEST_PRODUCTION = IBP_4_0_IV3;
public static final MetadataVersion LATEST_PRODUCTION = IBP_4_1_IV1;
// If you change the value above please also update
// LATEST_STABLE_METADATA_VERSION version in tests/kafkatest/version.py

View File

@ -78,12 +78,16 @@ class MetadataVersionTest {
assertEquals(IBP_3_9_IV0, MetadataVersion.fromVersionString("3.9-IV0"));
// 4.0-IV3 is the latest production version in the 4.0 line
assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0"));
assertEquals(IBP_4_0_IV0, MetadataVersion.fromVersionString("4.0-IV0"));
assertEquals(IBP_4_0_IV1, MetadataVersion.fromVersionString("4.0-IV1"));
assertEquals(IBP_4_0_IV2, MetadataVersion.fromVersionString("4.0-IV2"));
assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0-IV3"));
// 4.1-IV1 is the latest production version in the 4.1 line
assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1"));
assertEquals(IBP_4_1_IV0, MetadataVersion.fromVersionString("4.1-IV0"));
assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1-IV1"));
}
@Test

View File

@ -114,7 +114,7 @@ DEV_VERSION = KafkaVersion("4.1.0-SNAPSHOT")
LATEST_STABLE_TRANSACTION_VERSION = 2
# This should match the LATEST_PRODUCTION version defined in MetadataVersion.java
LATEST_STABLE_METADATA_VERSION = "4.0-IV3"
LATEST_STABLE_METADATA_VERSION = "4.1-IV1"
# 2.1.x versions
V_2_1_0 = KafkaVersion("2.1.0")