diff --git a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java new file mode 100644 index 00000000000..90503e66b1a --- /dev/null +++ b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.integration; +import kafka.integration.KafkaServerTestHarness; +import kafka.server.KafkaBroker; +import kafka.server.KafkaConfig; +import kafka.utils.Logging; +import kafka.utils.TestUtils; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.FeatureUpdate; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.UpdateFeaturesOptions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; +import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.File; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import scala.collection.JavaConverters; +import scala.collection.Seq; +import scala.collection.mutable.HashMap; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarness implements Logging { + private String bootstrapServer; + private String testTopicName; + private Admin adminClient; + @Override + public Seq generateConfigs() { + List brokerConfigs = new ArrayList<>(); + brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs( + 5, // The tests require 4 brokers to host the partition. However, we need the 5th broker to handle the admin client requests. + true, + true, + scala.Option.empty(), + scala.Option.empty(), + scala.Option.empty(), + true, + false, + false, + false, + new HashMap<>(), + 1, + false, + 1, + (short) 4, + 0, + false + ))); + List configs = new ArrayList<>(); + for (Properties props : brokerConfigs) { + configs.add(KafkaConfig.fromProps(props)); + } + return JavaConverters.asScalaBuffer(configs).toSeq(); + } + + @BeforeEach + @Override + public void setUp(TestInfo info) { + super.setUp(info); + // create adminClient + Properties props = new Properties(); + bootstrapServer = bootstrapServers(listenerName()); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + adminClient = Admin.create(props); + adminClient.updateFeatures( + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions() + ); + testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), "ELR-test"); + } + + @AfterEach + public void close() throws Exception { + if (adminClient != null) adminClient.close(); + } + + @ParameterizedTest + @ValueSource(strings = {"kraft"}) + public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map> configOps = Collections.singletonMap(configResource, ops); + // alter configs on target cluster + adminClient.incrementalAlterConfigs(configOps).all().get(); + Producer producer = null; + Consumer consumer = null; + try { + TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); + List initialReplicas = topicPartitionInfo.replicas(); + assertEquals(4, topicPartitionInfo.isr().size()); + assertEquals(0, topicPartitionInfo.elr().size()); + assertEquals(0, topicPartitionInfo.lastKnownElr().size()); + + Properties producerProps = new Properties(); + producerProps.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + // Use Ack=1 for the producer. + producerProps.put(ProducerConfig.ACKS_CONFIG, "1"); + producer = new KafkaProducer(producerProps); + + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); + consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Collections.singleton(testTopicName)); + + producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get(); + waitUntilOneMessageIsConsumed(consumer); + + killBroker(initialReplicas.get(0).id()); + killBroker(initialReplicas.get(1).id()); + + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 2 && elrSize == 1; + }); + + // Now the partition is under min ISR. HWM should not advance. + producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get(); + Thread.sleep(100); + assertEquals(0, consumer.poll(Duration.ofSeconds(1L)).count()); + + // Restore the min ISR and the previous log should be visible. + startBroker(initialReplicas.get(1).id()); + startBroker(initialReplicas.get(0).id()); + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 4 && elrSize == 0; + }); + + waitUntilOneMessageIsConsumed(consumer); + } finally { + restartDeadBrokers(false); + if (consumer != null) consumer.close(); + if (producer != null) producer.close(); + } + } + + void waitUntilOneMessageIsConsumed(Consumer consumer) { + kafka.utils.TestUtils.waitUntilTrue( + () -> { + try { + ConsumerRecords record = consumer.poll(Duration.ofMillis(100L)); + return record.count() >= 1; + } catch (Exception e) { + return false; + } + }, + () -> "fail to consume messages", + org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L + ); + } + + @ParameterizedTest + @ValueSource(strings = {"kraft"}) + public void testElrMemberCanBeElected(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map> configOps = Collections.singletonMap(configResource, ops); + // alter configs on target cluster + adminClient.incrementalAlterConfigs(configOps).all().get(); + + try { + TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); + List initialReplicas = topicPartitionInfo.replicas(); + assertEquals(4, topicPartitionInfo.isr().size()); + assertEquals(0, topicPartitionInfo.elr().size()); + assertEquals(0, topicPartitionInfo.lastKnownElr().size()); + + killBroker(initialReplicas.get(0).id()); + killBroker(initialReplicas.get(1).id()); + killBroker(initialReplicas.get(2).id()); + + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 1 && elrSize == 2; + }); + + killBroker(initialReplicas.get(3).id()); + + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 0 && elrSize == 3; + }); + + topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + assertEquals(1, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); + int expectLastKnownLeader = initialReplicas.get(3).id(); + assertEquals(expectLastKnownLeader, topicPartitionInfo.lastKnownElr().get(0).id(), topicPartitionInfo.toString()); + + // At this point, all the replicas are failed and the last know leader is No.3 and 3 members in the ELR. + // Restart one broker of the ELR and it should be the leader. + + int expectLeader = topicPartitionInfo.elr().stream() + .filter(node -> node.id() != expectLastKnownLeader).collect(Collectors.toList()).get(0).id(); + + startBroker(expectLeader); + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 1 && elrSize == 2; + }); + + topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + assertEquals(0, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); + assertEquals(expectLeader, topicPartitionInfo.leader().id(), topicPartitionInfo.toString()); + + // Start another 2 brokers and the ELR fields should be cleaned. + topicPartitionInfo.replicas().stream().filter(node -> node.id() != expectLeader).limit(2) + .forEach(node -> startBroker(node.id())); + + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 3 && elrSize == 0; + }); + + topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + assertEquals(0, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); + assertEquals(expectLeader, topicPartitionInfo.leader().id(), topicPartitionInfo.toString()); + } finally { + restartDeadBrokers(false); + } + } + + @ParameterizedTest + @ValueSource(strings = {"kraft"}) + public void testElrMemberShouldBeKickOutWhenUncleanShutdown(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map> configOps = Collections.singletonMap(configResource, ops); + // alter configs on target cluster + adminClient.incrementalAlterConfigs(configOps).all().get(); + + try { + TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); + List initialReplicas = topicPartitionInfo.replicas(); + assertEquals(4, topicPartitionInfo.isr().size()); + assertEquals(0, topicPartitionInfo.elr().size()); + assertEquals(0, topicPartitionInfo.lastKnownElr().size()); + + killBroker(initialReplicas.get(0).id()); + killBroker(initialReplicas.get(1).id()); + killBroker(initialReplicas.get(2).id()); + killBroker(initialReplicas.get(3).id()); + + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 0 && elrSize == 3; + }); + topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + + int brokerToBeUncleanShutdown = topicPartitionInfo.elr().get(0).id(); + KafkaBroker broker = brokers().find(b -> { + return b.config().brokerId() == brokerToBeUncleanShutdown; + }).get(); + Seq dirs = broker.logManager().liveLogDirs(); + assertEquals(1, dirs.size()); + CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); + assertTrue(handler.exists()); + assertDoesNotThrow(() -> handler.delete()); + + // After remove the clean shutdown file, the broker should report unclean shutdown during restart. + startBroker(brokerToBeUncleanShutdown); + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 0 && elrSize == 2; + }); + topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + assertTrue(topicPartitionInfo.leader() == null); + assertEquals(1, topicPartitionInfo.lastKnownElr().size()); + } finally { + restartDeadBrokers(false); + } + } + + /* + This test is only valid for KIP-966 part 1. When the unclean recovery is implemented, it should be removed. + */ + @ParameterizedTest + @ValueSource(strings = {"kraft"}) + public void testLastKnownLeaderShouldBeElectedIfEmptyElr(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map> configOps = Collections.singletonMap(configResource, ops); + // alter configs on target cluster + adminClient.incrementalAlterConfigs(configOps).all().get(); + + try { + TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); + List initialReplicas = topicPartitionInfo.replicas(); + assertEquals(4, topicPartitionInfo.isr().size()); + assertEquals(0, topicPartitionInfo.elr().size()); + assertEquals(0, topicPartitionInfo.lastKnownElr().size()); + + killBroker(initialReplicas.get(0).id()); + killBroker(initialReplicas.get(1).id()); + killBroker(initialReplicas.get(2).id()); + killBroker(initialReplicas.get(3).id()); + + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 0 && elrSize == 3; + }); + topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + int lastKnownLeader = topicPartitionInfo.lastKnownElr().get(0).id(); + + Set initialReplicaSet = initialReplicas.stream().map(node -> node.id()).collect(Collectors.toSet()); + brokers().foreach(broker -> { + if (initialReplicaSet.contains(broker.config().brokerId())) { + Seq dirs = broker.logManager().liveLogDirs(); + assertEquals(1, dirs.size()); + CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); + assertDoesNotThrow(() -> handler.delete()); + } + return true; + }); + + + // After remove the clean shutdown file, the broker should report unclean shutdown during restart. + topicPartitionInfo.replicas().stream().forEach(replica -> { + if (replica.id() != lastKnownLeader) startBroker(replica.id()); + }); + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 0 && elrSize == 1; + }); + topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + assertTrue(topicPartitionInfo.leader() == null); + assertEquals(1, topicPartitionInfo.lastKnownElr().size()); + + // Now if the last known leader goes through unclean shutdown, it will still be elected. + startBroker(lastKnownLeader); + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize > 0 && elrSize == 0; + }); + topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + assertEquals(0, topicPartitionInfo.lastKnownElr().size()); + assertEquals(0, topicPartitionInfo.elr().size()); + assertEquals(lastKnownLeader, topicPartitionInfo.leader().id()); + } finally { + restartDeadBrokers(false); + } + } + + void waitForIsrAndElr(BiFunction isIsrAndElrSizeSatisfied) { + kafka.utils.TestUtils.waitUntilTrue( + () -> { + try { + TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo partition = topicDescription.partitions().get(0); + if (!isIsrAndElrSizeSatisfied.apply(partition.isr().size(), partition.elr().size())) return false; + } catch (Exception e) { + return false; + } + return true; + }, + () -> String.format("Partition metadata for %s is not propagated", testTopicName), + org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index 43edafb77f4..087d47c570d 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -90,7 +90,7 @@ public class ClusterControlManager { private long sessionTimeoutNs = DEFAULT_SESSION_TIMEOUT_NS; private ReplicaPlacer replicaPlacer = null; private FeatureControlManager featureControl = null; - private BrokerUncleanShutdownHandler brokerUncleanShutdownHandler = null; + private BrokerShutdownHandler brokerShutdownHandler = null; private String interBrokerListenerName = "PLAINTEXT"; Builder setLogContext(LogContext logContext) { @@ -128,8 +128,8 @@ public class ClusterControlManager { return this; } - Builder setBrokerUncleanShutdownHandler(BrokerUncleanShutdownHandler brokerUncleanShutdownHandler) { - this.brokerUncleanShutdownHandler = brokerUncleanShutdownHandler; + Builder setBrokerShutdownHandler(BrokerShutdownHandler brokerShutdownHandler) { + this.brokerShutdownHandler = brokerShutdownHandler; return this; } @@ -154,8 +154,8 @@ public class ClusterControlManager { if (featureControl == null) { throw new RuntimeException("You must specify FeatureControlManager"); } - if (brokerUncleanShutdownHandler == null) { - throw new RuntimeException("You must specify BrokerUncleanShutdownHandler"); + if (brokerShutdownHandler == null) { + throw new RuntimeException("You must specify BrokerShutdownHandler"); } return new ClusterControlManager(logContext, clusterId, @@ -164,7 +164,7 @@ public class ClusterControlManager { sessionTimeoutNs, replicaPlacer, featureControl, - brokerUncleanShutdownHandler, + brokerShutdownHandler, interBrokerListenerName ); } @@ -252,7 +252,7 @@ public class ClusterControlManager { */ private final FeatureControlManager featureControl; - private final BrokerUncleanShutdownHandler brokerUncleanShutdownHandler; + private final BrokerShutdownHandler brokerShutdownHandler; /** * The statically configured inter-broker listener name. @@ -277,7 +277,7 @@ public class ClusterControlManager { long sessionTimeoutNs, ReplicaPlacer replicaPlacer, FeatureControlManager featureControl, - BrokerUncleanShutdownHandler brokerUncleanShutdownHandler, + BrokerShutdownHandler brokerShutdownHandler, String interBrokerListenerName ) { this.logContext = logContext; @@ -293,7 +293,7 @@ public class ClusterControlManager { this.featureControl = featureControl; this.controllerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0); this.directoryToBroker = new TimelineHashMap<>(snapshotRegistry, 0); - this.brokerUncleanShutdownHandler = brokerUncleanShutdownHandler; + this.brokerShutdownHandler = brokerShutdownHandler; this.interBrokerListenerName = interBrokerListenerName; } @@ -335,7 +335,8 @@ public class ClusterControlManager { public ControllerResult registerBroker( BrokerRegistrationRequestData request, long newBrokerEpoch, - FinalizedControllerFeatures finalizedFeatures + FinalizedControllerFeatures finalizedFeatures, + boolean cleanShutdownDetectionEnabled ) { if (heartbeatManager == null) { throw new RuntimeException("ClusterControlManager is not active."); @@ -348,8 +349,10 @@ public class ClusterControlManager { List records = new ArrayList<>(); BrokerRegistration existing = brokerRegistrations.get(brokerId); Uuid prevIncarnationId = null; + long storedBrokerEpoch = -2; // BrokerRegistration.previousBrokerEpoch default value is -1 if (existing != null) { prevIncarnationId = existing.incarnationId(); + storedBrokerEpoch = existing.epoch(); if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) { if (!request.incarnationId().equals(prevIncarnationId)) { throw new DuplicateBrokerRegistrationException("Another broker is " + @@ -424,7 +427,9 @@ public class ClusterControlManager { if (!request.incarnationId().equals(prevIncarnationId)) { int prevNumRecords = records.size(); - brokerUncleanShutdownHandler.addRecordsForShutdown(request.brokerId(), records); + boolean isCleanShutdown = cleanShutdownDetectionEnabled ? + storedBrokerEpoch == request.previousBrokerEpoch() : false; + brokerShutdownHandler.addRecordsForShutdown(request.brokerId(), isCleanShutdown, records); int numRecordsAdded = records.size() - prevNumRecords; if (existing == null) { log.info("No previous registration found for broker {}. New incarnation ID is " + @@ -847,7 +852,7 @@ public class ClusterControlManager { } @FunctionalInterface - interface BrokerUncleanShutdownHandler { - void addRecordsForShutdown(int brokerId, List records); + interface BrokerShutdownHandler { + void addRecordsForShutdown(int brokerId, boolean isCleanShutdown, List records); } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 2e0fbfd8917..4ab5c340fa3 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1555,7 +1555,7 @@ public final class QuorumController implements Controller { setSessionTimeoutNs(sessionTimeoutNs). setReplicaPlacer(replicaPlacer). setFeatureControlManager(featureControl). - setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown). + setBrokerShutdownHandler(this::handleBrokerShutdown). setInterBrokerListenerName(interBrokerListenerName). build(); this.configurationControl = new ConfigurationControlManager.Builder(). @@ -2025,7 +2025,8 @@ public final class QuorumController implements Controller { return appendWriteEvent("registerBroker", context.deadlineNs(), () -> clusterControl. registerBroker(request, offsetControl.nextWriteOffset(), - new FinalizedControllerFeatures(controllerFeatures, Long.MAX_VALUE)), + new FinalizedControllerFeatures(controllerFeatures, Long.MAX_VALUE), + context.requestHeader().requestApiVersion() >= 3), EnumSet.noneOf(ControllerOperationFlag.class)); } @@ -2203,7 +2204,7 @@ public final class QuorumController implements Controller { return controllerMetrics; } - void handleUncleanBrokerShutdown(int brokerId, List records) { - replicationControl.handleBrokerUncleanShutdown(brokerId, records); + void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List records) { + replicationControl.handleBrokerShutdown(brokerId, isCleanShutdown, records); } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index fc4ae6b38bb..4c84f53f779 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -1461,21 +1461,22 @@ public class ReplicationControlManager { } /** - * Create partition change records to remove replicas from any ISR or ELR for brokers doing unclean shutdown. + * Create partition change records to remove replicas from any ISR or ELR for brokers when the shutdown is detected. * - * @param brokerId The broker id. - * @param records The record list to append to. + * @param brokerId The broker id to be shut down. + * @param isCleanShutdown Whether the broker has a clean shutdown. + * @param records The record list to append to. */ - void handleBrokerUncleanShutdown(int brokerId, List records) { - if (featureControl.metadataVersion().isElrSupported()) { + void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List records) { + if (featureControl.metadataVersion().isElrSupported() && !isCleanShutdown) { // ELR is enabled, generate unclean shutdown partition change records generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, NO_LEADER, brokerId, records, brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, NO_LEADER, brokerId, records, brokersToElrs.partitionsWithBrokerInElr(brokerId)); } else { - // ELR is not enabled, handle the unclean shutdown as if the broker was fenced - generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", brokerId, NO_LEADER, NO_LEADER, records, + // ELR is not enabled or if it is a clean shutdown, handle the shutdown as if the broker was fenced + generateLeaderAndIsrUpdates("handleBrokerShutdown", brokerId, NO_LEADER, NO_LEADER, records, brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java index e7d190339f4..28b22d5a77d 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.message.ControllerRegistrationRequestData; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint; import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection; @@ -102,7 +103,7 @@ public class ClusterControlManagerTest { setSnapshotRegistry(snapshotRegistry). setSessionTimeoutNs(1000). setFeatureControlManager(featureControl). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). build(); clusterControl.activate(); assertFalse(clusterControl.isUnfenced(0)); @@ -164,7 +165,7 @@ public class ClusterControlManagerTest { setSnapshotRegistry(snapshotRegistry). setSessionTimeoutNs(1000). setFeatureControlManager(featureControl). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). build(); assertFalse(clusterControl.isUnfenced(0)); @@ -217,7 +218,7 @@ public class ClusterControlManagerTest { setSnapshotRegistry(snapshotRegistry). setSessionTimeoutNs(1000). setFeatureControlManager(featureControl). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). build(); assertFalse(clusterControl.isUnfenced(0)); @@ -272,7 +273,7 @@ public class ClusterControlManagerTest { setSnapshotRegistry(snapshotRegistry). setSessionTimeoutNs(1000). setFeatureControlManager(featureControl). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). build(); clusterControl.activate(); assertThrows(InconsistentClusterIdException.class, () -> @@ -282,7 +283,8 @@ public class ClusterControlManagerTest { setRack(null). setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")), 123L, - new FinalizedControllerFeatures(Collections.emptyMap(), 456L))); + new FinalizedControllerFeatures(Collections.emptyMap(), 456L), + false)); } private static Stream metadataVersions() { @@ -311,7 +313,7 @@ public class ClusterControlManagerTest { setSnapshotRegistry(snapshotRegistry). setSessionTimeoutNs(1000). setFeatureControlManager(featureControl). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). build(); clusterControl.activate(); @@ -327,7 +329,8 @@ public class ClusterControlManagerTest { setRack(null). setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")), 123L, - new FinalizedControllerFeatures(Collections.emptyMap(), 456L)); + new FinalizedControllerFeatures(Collections.emptyMap(), 456L), + false); short expectedVersion = metadataVersion.registerBrokerRecordVersion(); @@ -372,7 +375,7 @@ public class ClusterControlManagerTest { setSnapshotRegistry(snapshotRegistry). setSessionTimeoutNs(1000). setFeatureControlManager(featureControl). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). build(); clusterControl.activate(); clusterControl.replay(brokerRecord, 100L); @@ -411,7 +414,7 @@ public class ClusterControlManagerTest { setSnapshotRegistry(snapshotRegistry). setSessionTimeoutNs(1000). setFeatureControlManager(featureControl). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). build(); clusterControl.activate(); for (int i = 0; i < numUsableBrokers; i++) { @@ -475,7 +478,7 @@ public class ClusterControlManagerTest { setSnapshotRegistry(snapshotRegistry). setSessionTimeoutNs(1000). setFeatureControlManager(featureControl). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). build(); clusterControl.activate(); assertFalse(clusterControl.isUnfenced(0)); @@ -557,7 +560,7 @@ public class ClusterControlManagerTest { setTime(new MockTime(0, 0, 0)). setSnapshotRegistry(snapshotRegistry). setFeatureControlManager(featureControl). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). build(); clusterControl.activate(); FeatureLevelRecord testFeatureRecord = new FeatureLevelRecord(). @@ -582,7 +585,8 @@ public class ClusterControlManagerTest { setMinSupportedVersion(MetadataVersion.IBP_3_1_IV0.featureLevel()). setMaxSupportedVersion(MetadataVersion.IBP_3_7_IV0.featureLevel())).iterator())), 123L, - featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage()); + featureControl.finalizedFeatures(Long.MAX_VALUE), + false)).getMessage()); } @Test @@ -605,7 +609,7 @@ public class ClusterControlManagerTest { setTime(new MockTime(0, 0, 0)). setSnapshotRegistry(snapshotRegistry). setFeatureControlManager(featureControl). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). build(); clusterControl.activate(); @@ -632,7 +636,8 @@ public class ClusterControlManagerTest { setMinSupportedVersion(MetadataVersion.IBP_3_9_IV0.featureLevel()). setMaxSupportedVersion(MetadataVersion.IBP_3_9_IV0.featureLevel())).iterator())), 123L, - updatedFinalizedFeatures)).getMessage()); + updatedFinalizedFeatures, + false)).getMessage()); assertEquals("Unable to register because the broker does not support finalized version 1 of " + "kraft.version. The broker wants a version between 0 and 0, inclusive.", @@ -649,7 +654,8 @@ public class ClusterControlManagerTest { setMinSupportedVersion(KRaftVersion.KRAFT_VERSION_0.featureLevel()). setMaxSupportedVersion(KRaftVersion.KRAFT_VERSION_0.featureLevel())).iterator())), 123L, - updatedFinalizedFeatures)).getMessage()); + updatedFinalizedFeatures, + false)).getMessage()); clusterControl.registerBroker( baseRequest.setFeatures(new BrokerRegistrationRequestData.FeatureCollection( @@ -663,7 +669,8 @@ public class ClusterControlManagerTest { setMinSupportedVersion(KRaftVersion.KRAFT_VERSION_1.featureLevel()). setMaxSupportedVersion(KRaftVersion.KRAFT_VERSION_1.featureLevel())).iterator())), 123L, - updatedFinalizedFeatures); + updatedFinalizedFeatures, + false); } @Test @@ -683,7 +690,7 @@ public class ClusterControlManagerTest { setTime(new MockTime(0, 0, 0)). setSnapshotRegistry(snapshotRegistry). setFeatureControlManager(featureControl). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). build(); clusterControl.activate(); @@ -697,7 +704,8 @@ public class ClusterControlManagerTest { setRack(null). setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")), 123L, - featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage()); + featureControl.finalizedFeatures(Long.MAX_VALUE), + false)).getMessage()); assertEquals("Unable to register because the broker does not support finalized version 4 of " + "metadata.version. The broker wants a version between 7 and 7, inclusive.", @@ -714,7 +722,8 @@ public class ClusterControlManagerTest { setMaxSupportedVersion(MetadataVersion.IBP_3_3_IV3.featureLevel())).iterator())). setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")), 123L, - featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage()); + featureControl.finalizedFeatures(Long.MAX_VALUE), + false)).getMessage()); } @Test @@ -725,7 +734,7 @@ public class ClusterControlManagerTest { ClusterControlManager clusterControl = new ClusterControlManager.Builder(). setClusterId("fPZv1VBsRFmnlRvmGcOW9w"). setFeatureControlManager(featureControl). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). build(); clusterControl.activate(); assertEquals("The current MetadataVersion is too old to support controller registrations.", @@ -738,7 +747,7 @@ public class ClusterControlManagerTest { ClusterControlManager clusterControl = new ClusterControlManager.Builder(). setClusterId("QzZZEtC7SxucRM29Xdzijw"). setFeatureControlManager(new FeatureControlManager.Builder().build()). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). build(); RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(0).setLogDirs(asList( Uuid.fromString("yJGxmjfbQZSVFAlNM3uXZg"), @@ -772,7 +781,7 @@ public class ClusterControlManagerTest { .setIncarnationId(new Uuid(brokerId, brokerId)) .setLogDirs(dirs); FinalizedControllerFeatures finalizedFeatures = new FinalizedControllerFeatures(Collections.emptyMap(), 456L); - ControllerResult result = clusterControl.registerBroker(data, 123L, finalizedFeatures); + ControllerResult result = clusterControl.registerBroker(data, 123L, finalizedFeatures, false); RecordTestUtils.replayAll(clusterControl, result.records()); } @@ -781,7 +790,7 @@ public class ClusterControlManagerTest { ClusterControlManager clusterControl = new ClusterControlManager.Builder(). setClusterId("pjvUwj3ZTEeSVQmUiH3IJw"). setFeatureControlManager(new FeatureControlManager.Builder().build()). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). build(); clusterControl.activate(); registerNewBrokerWithDirs(clusterControl, 1, asList(Uuid.fromString("dir1SEbpRuG1dcpTRGOvJw"), Uuid.fromString("dir2xaEwR2m3JHTiy7PWwA"))); @@ -800,7 +809,7 @@ public class ClusterControlManagerTest { ClusterControlManager clusterControl = new ClusterControlManager.Builder(). setClusterId("pjvUwj3ZTEeSVQmUiH3IJw"). setFeatureControlManager(new FeatureControlManager.Builder().build()). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). build(); clusterControl.activate(); RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(1).setLogDirs(Collections.emptyList()); @@ -820,7 +829,7 @@ public class ClusterControlManagerTest { ClusterControlManager clusterControl = new ClusterControlManager.Builder(). setClusterId("pjvUwj3ZTEeSVQmUiH3IJw"). setFeatureControlManager(new FeatureControlManager.Builder().build()). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). build(); clusterControl.activate(); RecordTestUtils.replayAll(clusterControl, clusterControl.registerBroker( @@ -830,7 +839,8 @@ public class ClusterControlManagerTest { setIncarnationId(Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ")). setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))), 100, - new FinalizedControllerFeatures(Collections.emptyMap(), 100L)). + new FinalizedControllerFeatures(Collections.emptyMap(), 100L), + false). records()); RecordTestUtils.replayAll(clusterControl, clusterControl.registerBroker( new BrokerRegistrationRequestData(). @@ -840,7 +850,8 @@ public class ClusterControlManagerTest { Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww") : Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ")). setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))), 111, - new FinalizedControllerFeatures(Collections.emptyMap(), 100L)). + new FinalizedControllerFeatures(Collections.emptyMap(), 100L), + false). records()); if (newIncarnationId) { assertEquals(Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww"), @@ -855,13 +866,64 @@ public class ClusterControlManagerTest { } } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testReRegistrationWithCleanShutdownDetection(boolean isCleanShutdown) { + ClusterControlManager clusterControl = new ClusterControlManager.Builder(). + setClusterId("pjvUwj3ZTEeSVQmUiH3IJw"). + setFeatureControlManager(new FeatureControlManager.Builder().build()). + setBrokerShutdownHandler((brokerId, cleanShutdown, records) -> { + if (!cleanShutdown) { + records.add(new ApiMessageAndVersion(new PartitionChangeRecord(), PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION)); + } + }). + build(); + clusterControl.activate(); + List records = clusterControl.registerBroker( + new BrokerRegistrationRequestData(). + setBrokerId(1). + setClusterId(clusterControl.clusterId()). + setIncarnationId(Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ")). + setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))), + 100, + new FinalizedControllerFeatures(Collections.emptyMap(), 100L), + true). + records(); + records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord(). + setBrokerId(1).setBrokerEpoch(100). + setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()), + (short) 1)); + RecordTestUtils.replayAll(clusterControl, records); + + records = clusterControl.registerBroker( + new BrokerRegistrationRequestData(). + setBrokerId(1). + setClusterId(clusterControl.clusterId()). + setIncarnationId(Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww")). + setPreviousBrokerEpoch(isCleanShutdown ? 100 : 10). + setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))), + 111, + new FinalizedControllerFeatures(Collections.emptyMap(), 100L), + true).records(); + RecordTestUtils.replayAll(clusterControl, records); + assertEquals(Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww"), + clusterControl.brokerRegistrations().get(1).incarnationId()); + assertFalse(clusterControl.brokerRegistrations().get(1).inControlledShutdown()); + assertEquals(111, clusterControl.brokerRegistrations().get(1).epoch()); + if (isCleanShutdown) { + assertEquals(1, records.size()); + } else { + assertEquals(2, records.size()); + } + } + @Test public void testBrokerContactTimesAreUpdatedOnClusterControlActivation() { MockTime time = new MockTime(0L, 20L, 1000L); ClusterControlManager clusterControl = new ClusterControlManager.Builder(). setClusterId("pjvUwj3ZTEeSVQmUiH3IJw"). setFeatureControlManager(new FeatureControlManager.Builder().build()). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). setTime(time). build(); clusterControl.replay(new RegisterBrokerRecord(). diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java index 407e03f4b50..a27f249a2e2 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java @@ -55,7 +55,7 @@ public class ProducerIdControlManagerTest { setSnapshotRegistry(snapshotRegistry). setSessionTimeoutNs(1000). setFeatureControlManager(featureControl). - setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). build(); clusterControl.activate(); diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index f080b0c86de..31cde1eda18 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -460,16 +460,16 @@ public class QuorumControllerTest { int brokerToUncleanShutdown, brokerToBeTheLeader; // lastKnownElr stores the last known leader. + brokerToUncleanShutdown = lastKnownElr[0]; if (lastKnownElr[0] == partition.elr[0]) { - brokerToUncleanShutdown = partition.elr[0]; brokerToBeTheLeader = partition.elr[1]; } else { - brokerToUncleanShutdown = partition.elr[1]; brokerToBeTheLeader = partition.elr[0]; } - // Unclean shutdown should remove the ELR members. - active.registerBroker( + // Unclean shutdown should remove brokerToUncleanShutdown from the ELR members, but it should still be in + // the lastKnownElr. + CompletableFuture reply = active.registerBroker( anonymousContextFor(ApiKeys.BROKER_REGISTRATION), new BrokerRegistrationRequestData(). setBrokerId(brokerToUncleanShutdown). @@ -477,22 +477,23 @@ public class QuorumControllerTest { setFeatures(features). setIncarnationId(Uuid.randomUuid()). setLogDirs(Collections.singletonList(Uuid.randomUuid())). - setListeners(listeners)).get(); + setListeners(listeners)); + brokerEpochs.put(brokerToUncleanShutdown, reply.get().epoch()); partition = active.replicationControl().getPartition(topicIdFoo, 0); assertArrayEquals(new int[]{brokerToBeTheLeader}, partition.elr, partition.toString()); + assertArrayEquals(lastKnownElr, partition.lastKnownElr, partition.toString()); // Unclean shutdown should not remove the last known ELR members. active.registerBroker( anonymousContextFor(ApiKeys.BROKER_REGISTRATION), new BrokerRegistrationRequestData(). - setBrokerId(lastKnownElr[0]). + setBrokerId(brokerToBeTheLeader). setClusterId(active.clusterId()). setFeatures(features). setIncarnationId(Uuid.randomUuid()). + setPreviousBrokerEpoch(brokerEpochs.get(brokerToBeTheLeader)). setLogDirs(Collections.singletonList(Uuid.randomUuid())). - setListeners(listeners)).get(); - partition = active.replicationControl().getPartition(topicIdFoo, 0); - assertArrayEquals(lastKnownElr, partition.lastKnownElr, partition.toString()); + setListeners(listeners)); // Unfence the last one in the ELR, it should be elected. sendBrokerHeartbeatToUnfenceBrokers(active, singletonList(brokerToBeTheLeader), brokerEpochs); diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index a6c81f6510e..a8b001c6389 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -259,7 +259,7 @@ public class ReplicationControlManagerTest { setSessionTimeoutNs(TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS)). setReplicaPlacer(new StripedReplicaPlacer(random)). setFeatureControlManager(featureControl). - setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown). + setBrokerShutdownHandler(this::handleBrokerShutdown). build(); this.configurationControl = new ConfigurationControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). @@ -282,8 +282,8 @@ public class ReplicationControlManagerTest { clusterControl.activate(); } - void handleUncleanBrokerShutdown(int brokerId, List records) { - replicationControl.handleBrokerUncleanShutdown(brokerId, records); + void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List records) { + replicationControl.handleBrokerShutdown(brokerId, isCleanShutdown, records); } CreatableTopicResult createTestTopic(String name, @@ -412,10 +412,10 @@ public class ReplicationControlManagerTest { } } - void handleBrokersUncleanShutdown(Integer... brokerIds) { + void handleBrokersShutdown(boolean isCleanShutdown, Integer... brokerIds) { List records = new ArrayList<>(); for (int brokerId : brokerIds) { - replicationControl.handleBrokerUncleanShutdown(brokerId, records); + replicationControl.handleBrokerShutdown(brokerId, isCleanShutdown, records); } replay(records); } @@ -1070,6 +1070,34 @@ public class ReplicationControlManagerTest { assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString()); } + @Test + public void testEligibleLeaderReplicas_ShrinkToEmptyIsr() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", + new int[][] {new int[] {0, 1, 2}}); + + TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0); + assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition)); + ctx.alterTopicConfig("foo", TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"); + + // Change ISR to {0}. + ctx.fenceBrokers(Set.of(1, 2)); + PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId()); + assertArrayEquals(new int[]{1, 2}, partition.elr, partition.toString()); + assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString()); + + // Clean shutdown the broker + ctx.handleBrokersShutdown(true, 0); + + partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId()); + assertArrayEquals(new int[]{0, 1, 2}, partition.elr, partition.toString()); + assertArrayEquals(new int[]{0}, partition.lastKnownElr, partition.toString()); + assertEquals(0, partition.isr.length); + } + @Test public void testEligibleLeaderReplicas_BrokerFence() { ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build(); @@ -1204,13 +1232,13 @@ public class ReplicationControlManagerTest { assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString()); // An unclean shutdown ELR member should be kicked out of ELR. - ctx.handleBrokersUncleanShutdown(3); + ctx.handleBrokersShutdown(false, 3); partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId()); assertArrayEquals(new int[]{2}, partition.elr, partition.toString()); assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString()); // An unclean shutdown last ISR member should be recognized as the last known leader. - ctx.handleBrokersUncleanShutdown(0); + ctx.handleBrokersShutdown(false, 0); partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId()); assertArrayEquals(new int[]{2}, partition.elr, partition.toString()); assertArrayEquals(new int[]{0}, partition.lastKnownElr, partition.toString());