From 6235a73622d58a9c70b4f581aa2b7d48ace446b3 Mon Sep 17 00:00:00 2001 From: Calvin Liu <83986057+CalvinConfluent@users.noreply.github.com> Date: Fri, 24 Jan 2025 10:57:33 -0800 Subject: [PATCH] KAFKA-16540: Clear ELRs when min.insync.replicas is changed. (#18148) In order to maintain the integrity of replication, we need to clear the ELRs of affected partitions when min.insync.replicas is changed. This could happen at the topic level, or at a global level if the cluster level default is changed. Reviewers: Colin P. McCabe --- .../ConfigurationControlManager.java | 36 ++++- .../kafka/controller/QuorumController.java | 4 + .../controller/ReplicationControlManager.java | 52 +++++++ .../common/metadata/ClearElrRecord.json | 26 ++++ .../ConfigurationControlManagerTest.java | 6 +- .../controller/QuorumControllerTest.java | 135 +++++++++++++++++- .../controller/QuorumControllerTestEnv.java | 11 +- .../ReplicationControlManagerTest.java | 46 +++++- .../common/EligibleLeaderReplicasVersion.java | 2 +- 9 files changed, 301 insertions(+), 17 deletions(-) create mode 100644 metadata/src/main/resources/common/metadata/ClearElrRecord.json diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 15b5bbbf9df..89c6c5c9fe8 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.ConfigResource.Type; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.metadata.ClearElrRecord; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; @@ -54,8 +55,6 @@ import java.util.Optional; import java.util.function.Consumer; import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND; -import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.DELETE; -import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; import static org.apache.kafka.common.config.ConfigResource.Type.BROKER; import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG; import static org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG; @@ -165,7 +164,8 @@ public class ConfigurationControlManager { ConfigurationValidator validator, Map staticConfig, int nodeId, - FeatureControlManager featureControl) { + FeatureControlManager featureControl + ) { this.log = logContext.logger(ConfigurationControlManager.class); this.snapshotRegistry = snapshotRegistry; this.configSchema = configSchema; @@ -211,9 +211,32 @@ public class ConfigurationControlManager { outputRecords); outputResults.put(resourceEntry.getKey(), apiError); } + outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords)); return ControllerResult.atomicOf(outputRecords, outputResults); } + List createClearElrRecordsAsNeeded(List input) { + if (!featureControl.isElrFeatureEnabled()) { + return Collections.emptyList(); + } + List output = new ArrayList<>(); + for (ApiMessageAndVersion messageAndVersion : input) { + if (messageAndVersion.message().apiKey() == CONFIG_RECORD.id()) { + ConfigRecord record = (ConfigRecord) messageAndVersion.message(); + if (record.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG)) { + if (Type.forId(record.resourceType()) == Type.TOPIC) { + output.add(new ApiMessageAndVersion( + new ClearElrRecord(). + setTopicName(record.resourceName()), (short) 0)); + } else { + output.add(new ApiMessageAndVersion(new ClearElrRecord(), (short) 0)); + } + } + } + } + return output; + } + ControllerResult incrementalAlterConfig( ConfigResource configResource, Map> keyToOps, @@ -225,6 +248,8 @@ public class ConfigurationControlManager { keyToOps, newlyCreatedResource, outputRecords); + + outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords)); return ControllerResult.atomicOf(outputRecords, apiError); } @@ -357,8 +382,8 @@ public class ConfigurationControlManager { " cannot be altered while ELR is enabled."); private static final ApiError DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR = - new ApiError(INVALID_CONFIG, "Cluster-level " + MIN_IN_SYNC_REPLICAS_CONFIG + - " cannot be removed while ELR is enabled."); + new ApiError(INVALID_CONFIG, "Cluster-level " + MIN_IN_SYNC_REPLICAS_CONFIG + + " cannot be removed while ELR is enabled."); boolean isDisallowedBrokerMinIsrTransition(ConfigRecord configRecord) { if (configRecord.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG) && @@ -407,6 +432,7 @@ public class ConfigurationControlManager { outputRecords, outputResults); } + outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords)); return ControllerResult.atomicOf(outputRecords, outputResults); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 1da98d632a3..2e0fbfd8917 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -61,6 +61,7 @@ import org.apache.kafka.common.metadata.AbortTransactionRecord; import org.apache.kafka.common.metadata.AccessControlEntryRecord; import org.apache.kafka.common.metadata.BeginTransactionRecord; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; +import org.apache.kafka.common.metadata.ClearElrRecord; import org.apache.kafka.common.metadata.ClientQuotaRecord; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.DelegationTokenRecord; @@ -1294,6 +1295,9 @@ public final class QuorumController implements Controller { case REGISTER_CONTROLLER_RECORD: clusterControl.replay((RegisterControllerRecord) message); break; + case CLEAR_ELR_RECORD: + replicationControl.replay((ClearElrRecord) message); + break; default: throw new RuntimeException("Unhandled record type " + type); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index b2e232cfdab..d6eaf089f1e 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -69,6 +69,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; +import org.apache.kafka.common.metadata.ClearElrRecord; import org.apache.kafka.common.metadata.FenceBrokerRecord; import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; @@ -571,6 +572,57 @@ public class ReplicationControlManager { log.info("Replayed RemoveTopicRecord for topic {} with ID {}.", topic.name, record.topicId()); } + public void replay(ClearElrRecord record) { + if (record.topicName().isEmpty()) { + replayClearAllElrs(); + } else { + replayClearTopicElrs(record.topicName()); + } + + } + + void replayClearAllElrs() { + long numRemoved = 0; + for (TopicControlInfo topic : topics.values()) { + numRemoved += removeTopicElrs(topic); + } + log.info("Removed ELRs from {} partitions in all topics.", numRemoved); + } + + void replayClearTopicElrs(String topicName) { + Uuid topicId = topicsByName.get(topicName); + if (topicId == null) { + throw new RuntimeException("Unable to find a topic named " + topicName + + " in order to clear its ELRs."); + } + TopicControlInfo topic = topics.get(topicId); + if (topic == null) { + throw new RuntimeException("Unable to find a topic with ID " + topicId + + " in order to clear its ELRs."); + } + int numRemoved = removeTopicElrs(topic); + log.info("Removed ELRs from {} partitions of topic {}.", numRemoved, topicName); + } + + int removeTopicElrs(TopicControlInfo topic) { + int numRemoved = 0; + List partitionIds = new ArrayList<>(topic.parts.keySet()); + for (int partitionId : partitionIds) { + PartitionRegistration partition = topic.parts.get(partitionId); + PartitionRegistration nextPartition = partition.merge( + new PartitionChangeRecord(). + setPartitionId(partitionId). + setTopicId(topic.id). + setEligibleLeaderReplicas(Collections.emptyList()). + setLastKnownElr(Collections.emptyList())); + if (!nextPartition.equals(partition)) { + topic.parts.put(partitionId, nextPartition); + numRemoved++; + } + } + return numRemoved; + } + ControllerResult createTopics( ControllerRequestContext context, CreateTopicsRequestData request, diff --git a/metadata/src/main/resources/common/metadata/ClearElrRecord.json b/metadata/src/main/resources/common/metadata/ClearElrRecord.json new file mode 100644 index 00000000000..510f616f346 --- /dev/null +++ b/metadata/src/main/resources/common/metadata/ClearElrRecord.json @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 28, + "type": "metadata", + "name": "ClearElrRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "TopicName", "type": "string", "versions": "0+", + "entityType": "topicName", "about": "The name of the topic to clear ELRs for, or empty if all ELRs should be cleared." } + ] +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index 11297845492..fc8e1706458 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -84,7 +84,8 @@ public class ConfigurationControlManagerTest { define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc"). define("def", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "def"). define("ghi", ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.HIGH, "ghi"). - define("quuux", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "quux")); + define("quuux", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "quux"). + define(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, ConfigDef.Type.INT, ConfigDef.Importance.HIGH, "")); } public static final Map> SYNONYMS = new HashMap<>(); @@ -95,6 +96,7 @@ public class ConfigurationControlManagerTest { SYNONYMS.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, Collections.singletonList(new ConfigSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG))); SYNONYMS.put("quuux", Collections.singletonList(new ConfigSynonym("quux", HOURS_TO_MILLISECONDS))); + SYNONYMS.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, Collections.singletonList(new ConfigSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG))); } static final KafkaConfigSchema SCHEMA = new KafkaConfigSchema(CONFIGS, SYNONYMS); @@ -114,7 +116,7 @@ public class ConfigurationControlManagerTest { } @SuppressWarnings("unchecked") - private static Map toMap(Entry... entries) { + static Map toMap(Entry... entries) { Map map = new LinkedHashMap<>(); for (Entry entry : entries) { map.put(entry.getKey(), entry.getValue()); diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index c9e0d7b0742..f080b0c86de 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -154,6 +154,7 @@ import static org.apache.kafka.common.config.ConfigResource.Type.BROKER; import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; import static org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER0; import static org.apache.kafka.controller.ConfigurationControlManagerTest.entry; +import static org.apache.kafka.controller.ConfigurationControlManagerTest.toMap; import static org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT; import static org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextFor; import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeatures; @@ -256,7 +257,7 @@ public class QuorumControllerTest { new ResultOrError<>(Collections.emptyMap())), controller.describeConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap( BROKER0, Collections.emptyList())).get()); - logEnv.logManagers().forEach(m -> m.setMaxReadOffset(6L)); + logEnv.logManagers().forEach(m -> m.setMaxReadOffset(8L)); assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get()); } @@ -378,6 +379,7 @@ public class QuorumControllerTest { } } + @Test public void testUncleanShutdownBrokerElrEnabled() throws Throwable { List allBrokers = Arrays.asList(1, 2, 3); short replicationFactor = (short) allBrokers.size(); @@ -600,6 +602,133 @@ public class QuorumControllerTest { } } + @Test + public void testMinIsrUpdateWithElr() throws Throwable { + List allBrokers = Arrays.asList(1, 2, 3); + List brokersToKeepUnfenced = Arrays.asList(1); + List brokersToFence = Arrays.asList(2, 3); + short replicationFactor = (short) allBrokers.size(); + long sessionTimeoutMillis = 300; + + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)). + setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_4_0_IV1, "test-provided bootstrap ELR enabled")). + build() + ) { + ListenerCollection listeners = new ListenerCollection(); + listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092)); + QuorumController active = controlEnv.activeController(); + Map brokerEpochs = new HashMap<>(); + + for (Integer brokerId : allBrokers) { + CompletableFuture reply = active.registerBroker( + anonymousContextFor(ApiKeys.BROKER_REGISTRATION), + new BrokerRegistrationRequestData(). + setBrokerId(brokerId). + setClusterId(active.clusterId()). + setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting(), + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))). + setIncarnationId(Uuid.randomUuid()). + setLogDirs(Collections.singletonList(Uuid.randomUuid())). + setListeners(listeners)); + brokerEpochs.put(brokerId, reply.get().epoch()); + } + + // Brokers are only registered and should still be fenced + allBrokers.forEach(brokerId -> { + assertFalse(active.clusterControl().isUnfenced(brokerId), + "Broker " + brokerId + " should have been fenced"); + }); + + // Unfence all brokers and create a topic foo (min ISR 2) + sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs); + CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics( + new CreatableTopicCollection(Arrays.asList( + new CreatableTopic().setName("foo").setNumPartitions(1). + setReplicationFactor(replicationFactor), + new CreatableTopic().setName("bar").setNumPartitions(1). + setReplicationFactor(replicationFactor) + ).iterator())); + CreateTopicsResponseData createTopicsResponseData = active.createTopics( + ANONYMOUS_CONTEXT, createTopicsRequestData, + new HashSet<>(Arrays.asList("foo", "bar"))).get(); + assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode())); + assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode())); + Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId(); + Uuid topicIdBar = createTopicsResponseData.topics().find("bar").topicId(); + ConfigRecord configRecord = new ConfigRecord() + .setResourceType(BROKER.id()) + .setResourceName("") + .setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) + .setValue("2"); + RecordTestUtils.replayAll(active.configurationControl(), singletonList(new ApiMessageAndVersion(configRecord, (short) 0))); + + // Fence brokers + TestUtils.waitForCondition(() -> { + sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs); + for (Integer brokerId : brokersToFence) { + if (active.clusterControl().isUnfenced(brokerId)) { + return false; + } + } + return true; + }, sessionTimeoutMillis * 30, + "Fencing of brokers did not process within expected time" + ); + + // Send another heartbeat to the brokers we want to keep alive + sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs); + + // At this point only the brokers we want to fence (broker 2, 3) should be fenced. + brokersToKeepUnfenced.forEach(brokerId -> { + assertTrue(active.clusterControl().isUnfenced(brokerId), + "Broker " + brokerId + " should have been unfenced"); + }); + brokersToFence.forEach(brokerId -> { + assertFalse(active.clusterControl().isUnfenced(brokerId), + "Broker " + brokerId + " should have been fenced"); + }); + sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs); + + // Verify the isr and elr for the topic partition + PartitionRegistration partition = active.replicationControl().getPartition(topicIdFoo, 0); + assertArrayEquals(new int[]{1}, partition.isr, partition.toString()); + + // The ELR set is not determined but the size is 1. + assertEquals(1, partition.elr.length, partition.toString()); + + // First, decrease the min ISR config to 1. This should clear the ELR fields. + ControllerResult> result = active.configurationControl().incrementalAlterConfigs(toMap( + entry(new ConfigResource(TOPIC, "foo"), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))), + true); + assertEquals(2, result.records().size(), result.records().toString()); + RecordTestUtils.replayAll(active.configurationControl(), singletonList(result.records().get(0))); + RecordTestUtils.replayAll(active.replicationControl(), singletonList(result.records().get(1))); + + partition = active.replicationControl().getPartition(topicIdFoo, 0); + assertEquals(0, partition.elr.length, partition.toString()); + assertArrayEquals(new int[]{1}, partition.isr, partition.toString()); + + // Second, let's try update config on cluster level with the other topic. + partition = active.replicationControl().getPartition(topicIdBar, 0); + assertArrayEquals(new int[]{1}, partition.isr, partition.toString()); + assertEquals(1, partition.elr.length, partition.toString()); + + result = active.configurationControl().incrementalAlterConfigs(toMap( + entry(new ConfigResource(BROKER, ""), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))), + true); + assertEquals(2, result.records().size(), result.records().toString()); + RecordTestUtils.replayAll(active.configurationControl(), singletonList(result.records().get(0))); + RecordTestUtils.replayAll(active.replicationControl(), singletonList(result.records().get(1))); + + partition = active.replicationControl().getPartition(topicIdBar, 0); + assertEquals(0, partition.elr.length, partition.toString()); + assertArrayEquals(new int[]{1}, partition.isr, partition.toString()); + } + } + @Test public void testBalancePartitionLeaders() throws Throwable { List allBrokers = Arrays.asList(1, 2, 3); @@ -865,7 +994,7 @@ public class QuorumControllerTest { Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))). setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))). setListeners(listeners)); - assertEquals(4L, reply.get().epoch()); + assertEquals(6L, reply.get().epoch()); CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics( new CreatableTopicCollection(Collections.singleton( @@ -881,7 +1010,7 @@ public class QuorumControllerTest { get().topics().find("foo").errorMessage()); assertEquals(new BrokerHeartbeatReply(true, false, false, false), active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData(). - setWantFence(false).setBrokerEpoch(4L).setBrokerId(0). + setWantFence(false).setBrokerEpoch(6L).setBrokerId(0). setCurrentMetadataOffset(100000L)).get()); assertEquals(Errors.NONE.code(), active.createTopics(ANONYMOUS_CONTEXT, createTopicsRequestData, Collections.singleton("foo")). diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java index a788dd22e65..925b41db79b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java @@ -105,6 +105,12 @@ public class QuorumControllerTestEnv implements AutoCloseable { for (int nodeId = 0; nodeId < numControllers; nodeId++) { QuorumController.Builder builder = new QuorumController.Builder(nodeId, logEnv.clusterId()); builder.setRaftClient(logEnv.logManagers().get(nodeId)); + if (eligibleLeaderReplicasEnabled) { + bootstrapMetadata = bootstrapMetadata.copyWithFeatureRecord( + EligibleLeaderReplicasVersion.FEATURE_NAME, + EligibleLeaderReplicasVersion.ELRV_1.featureLevel() + ); + } builder.setBootstrapMetadata(bootstrapMetadata); builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs); builder.setQuorumFeatures(new QuorumFeatures(nodeId, QuorumFeatures.defaultSupportedFeatureMap(true), nodeIds)); @@ -120,11 +126,6 @@ public class QuorumControllerTestEnv implements AutoCloseable { nonFatalFaultHandlers.put(nodeId, fatalFaultHandler); controllerBuilderInitializer.accept(builder); QuorumController controller = builder.build(); - if (eligibleLeaderReplicasEnabled) { - bootstrapMetadata = bootstrapMetadata.copyWithFeatureRecord( - EligibleLeaderReplicasVersion.FEATURE_NAME, - EligibleLeaderReplicasVersion.ELRV_1.featureLevel()); - } this.controllers.add(controller); } } catch (Exception e) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 0012046080e..bd4cad63fda 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -103,6 +103,7 @@ import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; @@ -188,7 +189,7 @@ public class ReplicationControlManagerTest { return this; } - Builder setIsElrEnabled(Boolean isElrEnabled) { + Builder setIsElrEnabled(boolean isElrEnabled) { this.isElrEnabled = isElrEnabled; return this; } @@ -3243,4 +3244,47 @@ public class ReplicationControlManagerTest { private static List filter(List records, Class clazz) { return records.stream().filter(r -> clazz.equals(r.message().getClass())).collect(Collectors.toList()); } + + @ParameterizedTest + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + void testElrsRemovedOnMinIsrUpdate(boolean clusterLevel, boolean useLegacyAlterConfigs) { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder(). + setIsElrEnabled(true). + setStaticConfig(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"). + build(); + ctx.registerBrokers(1, 2, 3, 4); + ctx.unfenceBrokers(1, 2, 3, 4); + Uuid fooId = ctx.createTestTopic("foo", new int[][]{ + new int[]{1, 2, 4}, new int[]{1, 3, 4}}).topicId(); + Uuid barId = ctx.createTestTopic("bar", new int[][]{ + new int[]{1, 2, 4}, new int[]{1, 3, 4}}).topicId(); + ctx.fenceBrokers(4); + ctx.fenceBrokers(1); + assertArrayEquals(new int[]{1}, ctx.replicationControl.getPartition(fooId, 0).elr); + assertArrayEquals(new int[]{1}, ctx.replicationControl.getPartition(barId, 0).elr); + ConfigResource configResource; + if (clusterLevel) { + configResource = new ConfigResource(ConfigResource.Type.BROKER, ""); + } else { + configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo"); + } + if (useLegacyAlterConfigs) { + ctx.replay(ctx.configurationControl.legacyAlterConfigs( + Collections.singletonMap(configResource, + Collections.singletonMap(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")), + false).records()); + } else { + ctx.replay(ctx.configurationControl.incrementalAlterConfigs( + Collections.singletonMap(configResource, + Collections.singletonMap(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, + new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "1"))), + false).records()); + } + assertArrayEquals(new int[]{}, ctx.replicationControl.getPartition(fooId, 0).elr); + if (clusterLevel) { + assertArrayEquals(new int[]{}, ctx.replicationControl.getPartition(barId, 0).elr); + } else { + assertArrayEquals(new int[]{1}, ctx.replicationControl.getPartition(barId, 0).elr); + } + } } diff --git a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java index 68dabd2594a..84a80c82d45 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java @@ -29,7 +29,7 @@ public enum EligibleLeaderReplicasVersion implements FeatureVersion { public static final String FEATURE_NAME = "eligible.leader.replicas.version"; - public static final EligibleLeaderReplicasVersion LATEST_PRODUCTION = ELRV_0; + public static final EligibleLeaderReplicasVersion LATEST_PRODUCTION = ELRV_1; private final short featureLevel; private final MetadataVersion bootstrapMetadataVersion;