KAFKA-18635: reenable the unclean shutdown detection (#18277)

We need to re-enable the unclean shutdown detection when in ELR mode, which was inadvertently removed during the development process.

Reviewers: David Mao <dmao@confluent.io>,  Jun Rao <junrao@gmail.com>
This commit is contained in:
Calvin Liu 2025-02-03 22:26:57 -08:00 committed by GitHub
parent 7719b5f70d
commit ad031b99d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 613 additions and 69 deletions

View File

@ -0,0 +1,446 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.integration;
import kafka.integration.KafkaServerTestHarness;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.utils.Logging;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.mutable.HashMap;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarness implements Logging {
private String bootstrapServer;
private String testTopicName;
private Admin adminClient;
@Override
public Seq<KafkaConfig> generateConfigs() {
List<Properties> brokerConfigs = new ArrayList<>();
brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs(
5, // The tests require 4 brokers to host the partition. However, we need the 5th broker to handle the admin client requests.
true,
true,
scala.Option.<SecurityProtocol>empty(),
scala.Option.<File>empty(),
scala.Option.<Properties>empty(),
true,
false,
false,
false,
new HashMap<>(),
1,
false,
1,
(short) 4,
0,
false
)));
List<KafkaConfig> configs = new ArrayList<>();
for (Properties props : brokerConfigs) {
configs.add(KafkaConfig.fromProps(props));
}
return JavaConverters.asScalaBuffer(configs).toSeq();
}
@BeforeEach
@Override
public void setUp(TestInfo info) {
super.setUp(info);
// create adminClient
Properties props = new Properties();
bootstrapServer = bootstrapServers(listenerName());
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
adminClient = Admin.create(props);
adminClient.updateFeatures(
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)),
new UpdateFeaturesOptions()
);
testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), "ELR-test");
}
@AfterEach
public void close() throws Exception {
if (adminClient != null) adminClient.close();
}
@ParameterizedTest
@ValueSource(strings = {"kraft"})
public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(String quorum) throws ExecutionException, InterruptedException {
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, 1, (short) 4))).all().get();
TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
Collection<AlterConfigOp> ops = new ArrayList<>();
ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET));
Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops);
// alter configs on target cluster
adminClient.incrementalAlterConfigs(configOps).all().get();
Producer producer = null;
Consumer consumer = null;
try {
TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName);
TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0);
List<Node> initialReplicas = topicPartitionInfo.replicas();
assertEquals(4, topicPartitionInfo.isr().size());
assertEquals(0, topicPartitionInfo.elr().size());
assertEquals(0, topicPartitionInfo.lastKnownElr().size());
Properties producerProps = new Properties();
producerProps.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
// Use Ack=1 for the producer.
producerProps.put(ProducerConfig.ACKS_CONFIG, "1");
producer = new KafkaProducer(producerProps);
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singleton(testTopicName));
producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get();
waitUntilOneMessageIsConsumed(consumer);
killBroker(initialReplicas.get(0).id());
killBroker(initialReplicas.get(1).id());
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 2 && elrSize == 1;
});
// Now the partition is under min ISR. HWM should not advance.
producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get();
Thread.sleep(100);
assertEquals(0, consumer.poll(Duration.ofSeconds(1L)).count());
// Restore the min ISR and the previous log should be visible.
startBroker(initialReplicas.get(1).id());
startBroker(initialReplicas.get(0).id());
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 4 && elrSize == 0;
});
waitUntilOneMessageIsConsumed(consumer);
} finally {
restartDeadBrokers(false);
if (consumer != null) consumer.close();
if (producer != null) producer.close();
}
}
void waitUntilOneMessageIsConsumed(Consumer consumer) {
kafka.utils.TestUtils.waitUntilTrue(
() -> {
try {
ConsumerRecords record = consumer.poll(Duration.ofMillis(100L));
return record.count() >= 1;
} catch (Exception e) {
return false;
}
},
() -> "fail to consume messages",
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L
);
}
@ParameterizedTest
@ValueSource(strings = {"kraft"})
public void testElrMemberCanBeElected(String quorum) throws ExecutionException, InterruptedException {
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, 1, (short) 4))).all().get();
TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
Collection<AlterConfigOp> ops = new ArrayList<>();
ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET));
Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops);
// alter configs on target cluster
adminClient.incrementalAlterConfigs(configOps).all().get();
try {
TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName);
TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0);
List<Node> initialReplicas = topicPartitionInfo.replicas();
assertEquals(4, topicPartitionInfo.isr().size());
assertEquals(0, topicPartitionInfo.elr().size());
assertEquals(0, topicPartitionInfo.lastKnownElr().size());
killBroker(initialReplicas.get(0).id());
killBroker(initialReplicas.get(1).id());
killBroker(initialReplicas.get(2).id());
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 1 && elrSize == 2;
});
killBroker(initialReplicas.get(3).id());
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 0 && elrSize == 3;
});
topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName).partitions().get(0);
assertEquals(1, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString());
int expectLastKnownLeader = initialReplicas.get(3).id();
assertEquals(expectLastKnownLeader, topicPartitionInfo.lastKnownElr().get(0).id(), topicPartitionInfo.toString());
// At this point, all the replicas are failed and the last know leader is No.3 and 3 members in the ELR.
// Restart one broker of the ELR and it should be the leader.
int expectLeader = topicPartitionInfo.elr().stream()
.filter(node -> node.id() != expectLastKnownLeader).collect(Collectors.toList()).get(0).id();
startBroker(expectLeader);
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 1 && elrSize == 2;
});
topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName).partitions().get(0);
assertEquals(0, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString());
assertEquals(expectLeader, topicPartitionInfo.leader().id(), topicPartitionInfo.toString());
// Start another 2 brokers and the ELR fields should be cleaned.
topicPartitionInfo.replicas().stream().filter(node -> node.id() != expectLeader).limit(2)
.forEach(node -> startBroker(node.id()));
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 3 && elrSize == 0;
});
topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName).partitions().get(0);
assertEquals(0, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString());
assertEquals(expectLeader, topicPartitionInfo.leader().id(), topicPartitionInfo.toString());
} finally {
restartDeadBrokers(false);
}
}
@ParameterizedTest
@ValueSource(strings = {"kraft"})
public void testElrMemberShouldBeKickOutWhenUncleanShutdown(String quorum) throws ExecutionException, InterruptedException {
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, 1, (short) 4))).all().get();
TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
Collection<AlterConfigOp> ops = new ArrayList<>();
ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET));
Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops);
// alter configs on target cluster
adminClient.incrementalAlterConfigs(configOps).all().get();
try {
TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName);
TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0);
List<Node> initialReplicas = topicPartitionInfo.replicas();
assertEquals(4, topicPartitionInfo.isr().size());
assertEquals(0, topicPartitionInfo.elr().size());
assertEquals(0, topicPartitionInfo.lastKnownElr().size());
killBroker(initialReplicas.get(0).id());
killBroker(initialReplicas.get(1).id());
killBroker(initialReplicas.get(2).id());
killBroker(initialReplicas.get(3).id());
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 0 && elrSize == 3;
});
topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName).partitions().get(0);
int brokerToBeUncleanShutdown = topicPartitionInfo.elr().get(0).id();
KafkaBroker broker = brokers().find(b -> {
return b.config().brokerId() == brokerToBeUncleanShutdown;
}).get();
Seq<File> dirs = broker.logManager().liveLogDirs();
assertEquals(1, dirs.size());
CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString());
assertTrue(handler.exists());
assertDoesNotThrow(() -> handler.delete());
// After remove the clean shutdown file, the broker should report unclean shutdown during restart.
startBroker(brokerToBeUncleanShutdown);
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 0 && elrSize == 2;
});
topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName).partitions().get(0);
assertTrue(topicPartitionInfo.leader() == null);
assertEquals(1, topicPartitionInfo.lastKnownElr().size());
} finally {
restartDeadBrokers(false);
}
}
/*
This test is only valid for KIP-966 part 1. When the unclean recovery is implemented, it should be removed.
*/
@ParameterizedTest
@ValueSource(strings = {"kraft"})
public void testLastKnownLeaderShouldBeElectedIfEmptyElr(String quorum) throws ExecutionException, InterruptedException {
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, 1, (short) 4))).all().get();
TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
Collection<AlterConfigOp> ops = new ArrayList<>();
ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET));
Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops);
// alter configs on target cluster
adminClient.incrementalAlterConfigs(configOps).all().get();
try {
TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName);
TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0);
List<Node> initialReplicas = topicPartitionInfo.replicas();
assertEquals(4, topicPartitionInfo.isr().size());
assertEquals(0, topicPartitionInfo.elr().size());
assertEquals(0, topicPartitionInfo.lastKnownElr().size());
killBroker(initialReplicas.get(0).id());
killBroker(initialReplicas.get(1).id());
killBroker(initialReplicas.get(2).id());
killBroker(initialReplicas.get(3).id());
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 0 && elrSize == 3;
});
topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName).partitions().get(0);
int lastKnownLeader = topicPartitionInfo.lastKnownElr().get(0).id();
Set<Integer> initialReplicaSet = initialReplicas.stream().map(node -> node.id()).collect(Collectors.toSet());
brokers().foreach(broker -> {
if (initialReplicaSet.contains(broker.config().brokerId())) {
Seq<File> dirs = broker.logManager().liveLogDirs();
assertEquals(1, dirs.size());
CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString());
assertDoesNotThrow(() -> handler.delete());
}
return true;
});
// After remove the clean shutdown file, the broker should report unclean shutdown during restart.
topicPartitionInfo.replicas().stream().forEach(replica -> {
if (replica.id() != lastKnownLeader) startBroker(replica.id());
});
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 0 && elrSize == 1;
});
topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName).partitions().get(0);
assertTrue(topicPartitionInfo.leader() == null);
assertEquals(1, topicPartitionInfo.lastKnownElr().size());
// Now if the last known leader goes through unclean shutdown, it will still be elected.
startBroker(lastKnownLeader);
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize > 0 && elrSize == 0;
});
topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName).partitions().get(0);
assertEquals(0, topicPartitionInfo.lastKnownElr().size());
assertEquals(0, topicPartitionInfo.elr().size());
assertEquals(lastKnownLeader, topicPartitionInfo.leader().id());
} finally {
restartDeadBrokers(false);
}
}
void waitForIsrAndElr(BiFunction<Integer, Integer, Boolean> isIsrAndElrSizeSatisfied) {
kafka.utils.TestUtils.waitUntilTrue(
() -> {
try {
TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName);
TopicPartitionInfo partition = topicDescription.partitions().get(0);
if (!isIsrAndElrSizeSatisfied.apply(partition.isr().size(), partition.elr().size())) return false;
} catch (Exception e) {
return false;
}
return true;
},
() -> String.format("Partition metadata for %s is not propagated", testTopicName),
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
}
}

View File

@ -90,7 +90,7 @@ public class ClusterControlManager {
private long sessionTimeoutNs = DEFAULT_SESSION_TIMEOUT_NS;
private ReplicaPlacer replicaPlacer = null;
private FeatureControlManager featureControl = null;
private BrokerUncleanShutdownHandler brokerUncleanShutdownHandler = null;
private BrokerShutdownHandler brokerShutdownHandler = null;
private String interBrokerListenerName = "PLAINTEXT";
Builder setLogContext(LogContext logContext) {
@ -128,8 +128,8 @@ public class ClusterControlManager {
return this;
}
Builder setBrokerUncleanShutdownHandler(BrokerUncleanShutdownHandler brokerUncleanShutdownHandler) {
this.brokerUncleanShutdownHandler = brokerUncleanShutdownHandler;
Builder setBrokerShutdownHandler(BrokerShutdownHandler brokerShutdownHandler) {
this.brokerShutdownHandler = brokerShutdownHandler;
return this;
}
@ -154,8 +154,8 @@ public class ClusterControlManager {
if (featureControl == null) {
throw new RuntimeException("You must specify FeatureControlManager");
}
if (brokerUncleanShutdownHandler == null) {
throw new RuntimeException("You must specify BrokerUncleanShutdownHandler");
if (brokerShutdownHandler == null) {
throw new RuntimeException("You must specify BrokerShutdownHandler");
}
return new ClusterControlManager(logContext,
clusterId,
@ -164,7 +164,7 @@ public class ClusterControlManager {
sessionTimeoutNs,
replicaPlacer,
featureControl,
brokerUncleanShutdownHandler,
brokerShutdownHandler,
interBrokerListenerName
);
}
@ -252,7 +252,7 @@ public class ClusterControlManager {
*/
private final FeatureControlManager featureControl;
private final BrokerUncleanShutdownHandler brokerUncleanShutdownHandler;
private final BrokerShutdownHandler brokerShutdownHandler;
/**
* The statically configured inter-broker listener name.
@ -277,7 +277,7 @@ public class ClusterControlManager {
long sessionTimeoutNs,
ReplicaPlacer replicaPlacer,
FeatureControlManager featureControl,
BrokerUncleanShutdownHandler brokerUncleanShutdownHandler,
BrokerShutdownHandler brokerShutdownHandler,
String interBrokerListenerName
) {
this.logContext = logContext;
@ -293,7 +293,7 @@ public class ClusterControlManager {
this.featureControl = featureControl;
this.controllerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
this.directoryToBroker = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokerUncleanShutdownHandler = brokerUncleanShutdownHandler;
this.brokerShutdownHandler = brokerShutdownHandler;
this.interBrokerListenerName = interBrokerListenerName;
}
@ -335,7 +335,8 @@ public class ClusterControlManager {
public ControllerResult<BrokerRegistrationReply> registerBroker(
BrokerRegistrationRequestData request,
long newBrokerEpoch,
FinalizedControllerFeatures finalizedFeatures
FinalizedControllerFeatures finalizedFeatures,
boolean cleanShutdownDetectionEnabled
) {
if (heartbeatManager == null) {
throw new RuntimeException("ClusterControlManager is not active.");
@ -348,8 +349,10 @@ public class ClusterControlManager {
List<ApiMessageAndVersion> records = new ArrayList<>();
BrokerRegistration existing = brokerRegistrations.get(brokerId);
Uuid prevIncarnationId = null;
long storedBrokerEpoch = -2; // BrokerRegistration.previousBrokerEpoch default value is -1
if (existing != null) {
prevIncarnationId = existing.incarnationId();
storedBrokerEpoch = existing.epoch();
if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) {
if (!request.incarnationId().equals(prevIncarnationId)) {
throw new DuplicateBrokerRegistrationException("Another broker is " +
@ -424,7 +427,9 @@ public class ClusterControlManager {
if (!request.incarnationId().equals(prevIncarnationId)) {
int prevNumRecords = records.size();
brokerUncleanShutdownHandler.addRecordsForShutdown(request.brokerId(), records);
boolean isCleanShutdown = cleanShutdownDetectionEnabled ?
storedBrokerEpoch == request.previousBrokerEpoch() : false;
brokerShutdownHandler.addRecordsForShutdown(request.brokerId(), isCleanShutdown, records);
int numRecordsAdded = records.size() - prevNumRecords;
if (existing == null) {
log.info("No previous registration found for broker {}. New incarnation ID is " +
@ -847,7 +852,7 @@ public class ClusterControlManager {
}
@FunctionalInterface
interface BrokerUncleanShutdownHandler {
void addRecordsForShutdown(int brokerId, List<ApiMessageAndVersion> records);
interface BrokerShutdownHandler {
void addRecordsForShutdown(int brokerId, boolean isCleanShutdown, List<ApiMessageAndVersion> records);
}
}

View File

@ -1555,7 +1555,7 @@ public final class QuorumController implements Controller {
setSessionTimeoutNs(sessionTimeoutNs).
setReplicaPlacer(replicaPlacer).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown).
setBrokerShutdownHandler(this::handleBrokerShutdown).
setInterBrokerListenerName(interBrokerListenerName).
build();
this.configurationControl = new ConfigurationControlManager.Builder().
@ -2025,7 +2025,8 @@ public final class QuorumController implements Controller {
return appendWriteEvent("registerBroker", context.deadlineNs(),
() -> clusterControl.
registerBroker(request, offsetControl.nextWriteOffset(),
new FinalizedControllerFeatures(controllerFeatures, Long.MAX_VALUE)),
new FinalizedControllerFeatures(controllerFeatures, Long.MAX_VALUE),
context.requestHeader().requestApiVersion() >= 3),
EnumSet.noneOf(ControllerOperationFlag.class));
}
@ -2203,7 +2204,7 @@ public final class QuorumController implements Controller {
return controllerMetrics;
}
void handleUncleanBrokerShutdown(int brokerId, List<ApiMessageAndVersion> records) {
replicationControl.handleBrokerUncleanShutdown(brokerId, records);
void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List<ApiMessageAndVersion> records) {
replicationControl.handleBrokerShutdown(brokerId, isCleanShutdown, records);
}
}

View File

@ -1461,21 +1461,22 @@ public class ReplicationControlManager {
}
/**
* Create partition change records to remove replicas from any ISR or ELR for brokers doing unclean shutdown.
* Create partition change records to remove replicas from any ISR or ELR for brokers when the shutdown is detected.
*
* @param brokerId The broker id.
* @param records The record list to append to.
* @param brokerId The broker id to be shut down.
* @param isCleanShutdown Whether the broker has a clean shutdown.
* @param records The record list to append to.
*/
void handleBrokerUncleanShutdown(int brokerId, List<ApiMessageAndVersion> records) {
if (featureControl.metadataVersion().isElrSupported()) {
void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List<ApiMessageAndVersion> records) {
if (featureControl.metadataVersion().isElrSupported() && !isCleanShutdown) {
// ELR is enabled, generate unclean shutdown partition change records
generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, NO_LEADER, brokerId, records,
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, NO_LEADER, brokerId, records,
brokersToElrs.partitionsWithBrokerInElr(brokerId));
} else {
// ELR is not enabled, handle the unclean shutdown as if the broker was fenced
generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", brokerId, NO_LEADER, NO_LEADER, records,
// ELR is not enabled or if it is a clean shutdown, handle the shutdown as if the broker was fenced
generateLeaderAndIsrUpdates("handleBrokerShutdown", brokerId, NO_LEADER, NO_LEADER, records,
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
}
}

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.message.ControllerRegistrationRequestData;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
@ -102,7 +103,7 @@ public class ClusterControlManagerTest {
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
assertFalse(clusterControl.isUnfenced(0));
@ -164,7 +165,7 @@ public class ClusterControlManagerTest {
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
assertFalse(clusterControl.isUnfenced(0));
@ -217,7 +218,7 @@ public class ClusterControlManagerTest {
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
assertFalse(clusterControl.isUnfenced(0));
@ -272,7 +273,7 @@ public class ClusterControlManagerTest {
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
assertThrows(InconsistentClusterIdException.class, () ->
@ -282,7 +283,8 @@ public class ClusterControlManagerTest {
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
new FinalizedControllerFeatures(Collections.emptyMap(), 456L)));
new FinalizedControllerFeatures(Collections.emptyMap(), 456L),
false));
}
private static Stream<Arguments> metadataVersions() {
@ -311,7 +313,7 @@ public class ClusterControlManagerTest {
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
@ -327,7 +329,8 @@ public class ClusterControlManagerTest {
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
new FinalizedControllerFeatures(Collections.emptyMap(), 456L));
new FinalizedControllerFeatures(Collections.emptyMap(), 456L),
false);
short expectedVersion = metadataVersion.registerBrokerRecordVersion();
@ -372,7 +375,7 @@ public class ClusterControlManagerTest {
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
clusterControl.replay(brokerRecord, 100L);
@ -411,7 +414,7 @@ public class ClusterControlManagerTest {
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
for (int i = 0; i < numUsableBrokers; i++) {
@ -475,7 +478,7 @@ public class ClusterControlManagerTest {
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
assertFalse(clusterControl.isUnfenced(0));
@ -557,7 +560,7 @@ public class ClusterControlManagerTest {
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
FeatureLevelRecord testFeatureRecord = new FeatureLevelRecord().
@ -582,7 +585,8 @@ public class ClusterControlManagerTest {
setMinSupportedVersion(MetadataVersion.IBP_3_1_IV0.featureLevel()).
setMaxSupportedVersion(MetadataVersion.IBP_3_7_IV0.featureLevel())).iterator())),
123L,
featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage());
featureControl.finalizedFeatures(Long.MAX_VALUE),
false)).getMessage());
}
@Test
@ -605,7 +609,7 @@ public class ClusterControlManagerTest {
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
@ -632,7 +636,8 @@ public class ClusterControlManagerTest {
setMinSupportedVersion(MetadataVersion.IBP_3_9_IV0.featureLevel()).
setMaxSupportedVersion(MetadataVersion.IBP_3_9_IV0.featureLevel())).iterator())),
123L,
updatedFinalizedFeatures)).getMessage());
updatedFinalizedFeatures,
false)).getMessage());
assertEquals("Unable to register because the broker does not support finalized version 1 of " +
"kraft.version. The broker wants a version between 0 and 0, inclusive.",
@ -649,7 +654,8 @@ public class ClusterControlManagerTest {
setMinSupportedVersion(KRaftVersion.KRAFT_VERSION_0.featureLevel()).
setMaxSupportedVersion(KRaftVersion.KRAFT_VERSION_0.featureLevel())).iterator())),
123L,
updatedFinalizedFeatures)).getMessage());
updatedFinalizedFeatures,
false)).getMessage());
clusterControl.registerBroker(
baseRequest.setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
@ -663,7 +669,8 @@ public class ClusterControlManagerTest {
setMinSupportedVersion(KRaftVersion.KRAFT_VERSION_1.featureLevel()).
setMaxSupportedVersion(KRaftVersion.KRAFT_VERSION_1.featureLevel())).iterator())),
123L,
updatedFinalizedFeatures);
updatedFinalizedFeatures,
false);
}
@Test
@ -683,7 +690,7 @@ public class ClusterControlManagerTest {
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
@ -697,7 +704,8 @@ public class ClusterControlManagerTest {
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage());
featureControl.finalizedFeatures(Long.MAX_VALUE),
false)).getMessage());
assertEquals("Unable to register because the broker does not support finalized version 4 of " +
"metadata.version. The broker wants a version between 7 and 7, inclusive.",
@ -714,7 +722,8 @@ public class ClusterControlManagerTest {
setMaxSupportedVersion(MetadataVersion.IBP_3_3_IV3.featureLevel())).iterator())).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage());
featureControl.finalizedFeatures(Long.MAX_VALUE),
false)).getMessage());
}
@Test
@ -725,7 +734,7 @@ public class ClusterControlManagerTest {
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
assertEquals("The current MetadataVersion is too old to support controller registrations.",
@ -738,7 +747,7 @@ public class ClusterControlManagerTest {
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("QzZZEtC7SxucRM29Xdzijw").
setFeatureControlManager(new FeatureControlManager.Builder().build()).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(0).setLogDirs(asList(
Uuid.fromString("yJGxmjfbQZSVFAlNM3uXZg"),
@ -772,7 +781,7 @@ public class ClusterControlManagerTest {
.setIncarnationId(new Uuid(brokerId, brokerId))
.setLogDirs(dirs);
FinalizedControllerFeatures finalizedFeatures = new FinalizedControllerFeatures(Collections.emptyMap(), 456L);
ControllerResult<BrokerRegistrationReply> result = clusterControl.registerBroker(data, 123L, finalizedFeatures);
ControllerResult<BrokerRegistrationReply> result = clusterControl.registerBroker(data, 123L, finalizedFeatures, false);
RecordTestUtils.replayAll(clusterControl, result.records());
}
@ -781,7 +790,7 @@ public class ClusterControlManagerTest {
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setFeatureControlManager(new FeatureControlManager.Builder().build()).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
registerNewBrokerWithDirs(clusterControl, 1, asList(Uuid.fromString("dir1SEbpRuG1dcpTRGOvJw"), Uuid.fromString("dir2xaEwR2m3JHTiy7PWwA")));
@ -800,7 +809,7 @@ public class ClusterControlManagerTest {
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setFeatureControlManager(new FeatureControlManager.Builder().build()).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(1).setLogDirs(Collections.emptyList());
@ -820,7 +829,7 @@ public class ClusterControlManagerTest {
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setFeatureControlManager(new FeatureControlManager.Builder().build()).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
RecordTestUtils.replayAll(clusterControl, clusterControl.registerBroker(
@ -830,7 +839,8 @@ public class ClusterControlManagerTest {
setIncarnationId(Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ")).
setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))),
100,
new FinalizedControllerFeatures(Collections.emptyMap(), 100L)).
new FinalizedControllerFeatures(Collections.emptyMap(), 100L),
false).
records());
RecordTestUtils.replayAll(clusterControl, clusterControl.registerBroker(
new BrokerRegistrationRequestData().
@ -840,7 +850,8 @@ public class ClusterControlManagerTest {
Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww") : Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ")).
setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))),
111,
new FinalizedControllerFeatures(Collections.emptyMap(), 100L)).
new FinalizedControllerFeatures(Collections.emptyMap(), 100L),
false).
records());
if (newIncarnationId) {
assertEquals(Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww"),
@ -855,13 +866,64 @@ public class ClusterControlManagerTest {
}
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testReRegistrationWithCleanShutdownDetection(boolean isCleanShutdown) {
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setFeatureControlManager(new FeatureControlManager.Builder().build()).
setBrokerShutdownHandler((brokerId, cleanShutdown, records) -> {
if (!cleanShutdown) {
records.add(new ApiMessageAndVersion(new PartitionChangeRecord(), PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION));
}
}).
build();
clusterControl.activate();
List<ApiMessageAndVersion> records = clusterControl.registerBroker(
new BrokerRegistrationRequestData().
setBrokerId(1).
setClusterId(clusterControl.clusterId()).
setIncarnationId(Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ")).
setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))),
100,
new FinalizedControllerFeatures(Collections.emptyMap(), 100L),
true).
records();
records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
setBrokerId(1).setBrokerEpoch(100).
setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()),
(short) 1));
RecordTestUtils.replayAll(clusterControl, records);
records = clusterControl.registerBroker(
new BrokerRegistrationRequestData().
setBrokerId(1).
setClusterId(clusterControl.clusterId()).
setIncarnationId(Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww")).
setPreviousBrokerEpoch(isCleanShutdown ? 100 : 10).
setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))),
111,
new FinalizedControllerFeatures(Collections.emptyMap(), 100L),
true).records();
RecordTestUtils.replayAll(clusterControl, records);
assertEquals(Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww"),
clusterControl.brokerRegistrations().get(1).incarnationId());
assertFalse(clusterControl.brokerRegistrations().get(1).inControlledShutdown());
assertEquals(111, clusterControl.brokerRegistrations().get(1).epoch());
if (isCleanShutdown) {
assertEquals(1, records.size());
} else {
assertEquals(2, records.size());
}
}
@Test
public void testBrokerContactTimesAreUpdatedOnClusterControlActivation() {
MockTime time = new MockTime(0L, 20L, 1000L);
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setFeatureControlManager(new FeatureControlManager.Builder().build()).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
setTime(time).
build();
clusterControl.replay(new RegisterBrokerRecord().

View File

@ -55,7 +55,7 @@ public class ProducerIdControlManagerTest {
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();

View File

@ -460,16 +460,16 @@ public class QuorumControllerTest {
int brokerToUncleanShutdown, brokerToBeTheLeader;
// lastKnownElr stores the last known leader.
brokerToUncleanShutdown = lastKnownElr[0];
if (lastKnownElr[0] == partition.elr[0]) {
brokerToUncleanShutdown = partition.elr[0];
brokerToBeTheLeader = partition.elr[1];
} else {
brokerToUncleanShutdown = partition.elr[1];
brokerToBeTheLeader = partition.elr[0];
}
// Unclean shutdown should remove the ELR members.
active.registerBroker(
// Unclean shutdown should remove brokerToUncleanShutdown from the ELR members, but it should still be in
// the lastKnownElr.
CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
anonymousContextFor(ApiKeys.BROKER_REGISTRATION),
new BrokerRegistrationRequestData().
setBrokerId(brokerToUncleanShutdown).
@ -477,22 +477,23 @@ public class QuorumControllerTest {
setFeatures(features).
setIncarnationId(Uuid.randomUuid()).
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
setListeners(listeners)).get();
setListeners(listeners));
brokerEpochs.put(brokerToUncleanShutdown, reply.get().epoch());
partition = active.replicationControl().getPartition(topicIdFoo, 0);
assertArrayEquals(new int[]{brokerToBeTheLeader}, partition.elr, partition.toString());
assertArrayEquals(lastKnownElr, partition.lastKnownElr, partition.toString());
// Unclean shutdown should not remove the last known ELR members.
active.registerBroker(
anonymousContextFor(ApiKeys.BROKER_REGISTRATION),
new BrokerRegistrationRequestData().
setBrokerId(lastKnownElr[0]).
setBrokerId(brokerToBeTheLeader).
setClusterId(active.clusterId()).
setFeatures(features).
setIncarnationId(Uuid.randomUuid()).
setPreviousBrokerEpoch(brokerEpochs.get(brokerToBeTheLeader)).
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
setListeners(listeners)).get();
partition = active.replicationControl().getPartition(topicIdFoo, 0);
assertArrayEquals(lastKnownElr, partition.lastKnownElr, partition.toString());
setListeners(listeners));
// Unfence the last one in the ELR, it should be elected.
sendBrokerHeartbeatToUnfenceBrokers(active, singletonList(brokerToBeTheLeader), brokerEpochs);

View File

@ -259,7 +259,7 @@ public class ReplicationControlManagerTest {
setSessionTimeoutNs(TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS)).
setReplicaPlacer(new StripedReplicaPlacer(random)).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown).
setBrokerShutdownHandler(this::handleBrokerShutdown).
build();
this.configurationControl = new ConfigurationControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
@ -282,8 +282,8 @@ public class ReplicationControlManagerTest {
clusterControl.activate();
}
void handleUncleanBrokerShutdown(int brokerId, List<ApiMessageAndVersion> records) {
replicationControl.handleBrokerUncleanShutdown(brokerId, records);
void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List<ApiMessageAndVersion> records) {
replicationControl.handleBrokerShutdown(brokerId, isCleanShutdown, records);
}
CreatableTopicResult createTestTopic(String name,
@ -412,10 +412,10 @@ public class ReplicationControlManagerTest {
}
}
void handleBrokersUncleanShutdown(Integer... brokerIds) {
void handleBrokersShutdown(boolean isCleanShutdown, Integer... brokerIds) {
List<ApiMessageAndVersion> records = new ArrayList<>();
for (int brokerId : brokerIds) {
replicationControl.handleBrokerUncleanShutdown(brokerId, records);
replicationControl.handleBrokerShutdown(brokerId, isCleanShutdown, records);
}
replay(records);
}
@ -1070,6 +1070,34 @@ public class ReplicationControlManagerTest {
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
}
@Test
public void testEligibleLeaderReplicas_ShrinkToEmptyIsr() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2}});
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
ctx.alterTopicConfig("foo", TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3");
// Change ISR to {0}.
ctx.fenceBrokers(Set.of(1, 2));
PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertArrayEquals(new int[]{1, 2}, partition.elr, partition.toString());
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
// Clean shutdown the broker
ctx.handleBrokersShutdown(true, 0);
partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertArrayEquals(new int[]{0, 1, 2}, partition.elr, partition.toString());
assertArrayEquals(new int[]{0}, partition.lastKnownElr, partition.toString());
assertEquals(0, partition.isr.length);
}
@Test
public void testEligibleLeaderReplicas_BrokerFence() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
@ -1204,13 +1232,13 @@ public class ReplicationControlManagerTest {
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
// An unclean shutdown ELR member should be kicked out of ELR.
ctx.handleBrokersUncleanShutdown(3);
ctx.handleBrokersShutdown(false, 3);
partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertArrayEquals(new int[]{2}, partition.elr, partition.toString());
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
// An unclean shutdown last ISR member should be recognized as the last known leader.
ctx.handleBrokersUncleanShutdown(0);
ctx.handleBrokersShutdown(false, 0);
partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertArrayEquals(new int[]{2}, partition.elr, partition.toString());
assertArrayEquals(new int[]{0}, partition.lastKnownElr, partition.toString());