KAFKA-18981: Deflake testMinIsrUpdateWithElr by heartbeating survivor broker

This commit is contained in:
Anshul Bisht 2025-10-06 15:05:04 -05:00
parent f5a87b3703
commit e9b246100f
1 changed files with 98 additions and 71 deletions

View File

@ -649,88 +649,115 @@ public class QuorumControllerTest {
// Unfence all brokers and create a topic foo (min ISR 2) // Unfence all brokers and create a topic foo (min ISR 2)
sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs); sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(List.of(
new CreatableTopic().setName("foo").setNumPartitions(1).
setReplicationFactor(replicationFactor),
new CreatableTopic().setName("bar").setNumPartitions(1).
setReplicationFactor(replicationFactor)
).iterator()));
CreateTopicsResponseData createTopicsResponseData = active.createTopics(
ANONYMOUS_CONTEXT, createTopicsRequestData,
Set.of("foo", "bar")).get();
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode()));
Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
Uuid topicIdBar = createTopicsResponseData.topics().find("bar").topicId();
ConfigRecord configRecord = new ConfigRecord()
.setResourceType(BROKER.id())
.setResourceName("")
.setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
.setValue("2");
RecordTestUtils.replayAll(active.configurationControl(), List.of(new ApiMessageAndVersion(configRecord, (short) 0)));
// Fence brokers // Heartbeat pumper
TestUtils.waitForCondition(() -> { final java.util.concurrent.ScheduledExecutorService hbExec =
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs); java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
for (Integer brokerId : brokersToFence) { final java.util.concurrent.atomic.AtomicBoolean keepOnly =
if (active.clusterControl().isUnfenced(brokerId)) { new java.util.concurrent.atomic.AtomicBoolean(false);
return false; final long periodMs = Math.max(50L, sessionTimeoutMillis / 3);
}
hbExec.scheduleAtFixedRate(() -> {
try {
if (keepOnly.get()) {
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
} else {
sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
} }
return true; } catch (Throwable t) {
}, sessionTimeoutMillis * 30, throw new RuntimeException(t);
"Fencing of brokers did not process within expected time" }
); }, 0L, periodMs, java.util.concurrent.TimeUnit.MILLISECONDS);
// Send another heartbeat to the brokers we want to keep alive try {
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs); CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(List.of(
new CreatableTopic().setName("foo").setNumPartitions(1).
setReplicationFactor(replicationFactor),
new CreatableTopic().setName("bar").setNumPartitions(1).
setReplicationFactor(replicationFactor)
).iterator()));
CreateTopicsResponseData createTopicsResponseData = active.createTopics(
ANONYMOUS_CONTEXT, createTopicsRequestData,
Set.of("foo", "bar")).get();
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode()));
Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
Uuid topicIdBar = createTopicsResponseData.topics().find("bar").topicId();
ConfigRecord configRecord = new ConfigRecord()
.setResourceType(BROKER.id())
.setResourceName("")
.setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
.setValue("2");
RecordTestUtils.replayAll(active.configurationControl(), List.of(new ApiMessageAndVersion(configRecord, (short) 0)));
// At this point only the brokers we want to fence (broker 2, 3) should be fenced. // Before fencing wait, switch pumper to only keep brokersToKeepUnfenced alive
brokersToKeepUnfenced.forEach(brokerId -> { keepOnly.set(true);
assertTrue(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been unfenced");
});
brokersToFence.forEach(brokerId -> {
assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
// Verify the isr and elr for the topic partition // Fence brokers
PartitionRegistration partition = active.replicationControl().getPartition(topicIdFoo, 0); TestUtils.waitForCondition(() -> {
assertArrayEquals(new int[]{1}, partition.isr, partition.toString()); sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
for (Integer brokerId : brokersToFence) {
if (active.clusterControl().isUnfenced(brokerId)) {
return false;
}
}
return true;
}, sessionTimeoutMillis * 30,
"Fencing of brokers did not process within expected time"
);
// The ELR set is not determined but the size is 1. // Send another heartbeat to the brokers we want to keep alive
assertEquals(1, partition.elr.length, partition.toString()); sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
// First, decrease the min ISR config to 1. This should clear the ELR fields. // At this point only the brokers we want to fence (broker 2, 3) should be fenced.
ControllerResult<Map<ConfigResource, ApiError>> result = active.configurationControl().incrementalAlterConfigs(toMap( brokersToKeepUnfenced.forEach(brokerId -> {
entry(new ConfigResource(TOPIC, "foo"), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))), assertTrue(active.clusterControl().isUnfenced(brokerId),
true); "Broker " + brokerId + " should have been unfenced");
assertEquals(2, result.records().size(), result.records().toString()); });
RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0))); brokersToFence.forEach(brokerId -> {
RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1))); assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
partition = active.replicationControl().getPartition(topicIdFoo, 0); // Verify the isr and elr for the topic partition
assertEquals(0, partition.elr.length, partition.toString()); PartitionRegistration partition = active.replicationControl().getPartition(topicIdFoo, 0);
assertArrayEquals(new int[]{1}, partition.isr, partition.toString()); assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
// Second, let's try update config on cluster level with the other topic. // The ELR set is not determined but the size is 1.
partition = active.replicationControl().getPartition(topicIdBar, 0); assertEquals(1, partition.elr.length, partition.toString());
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
assertEquals(1, partition.elr.length, partition.toString());
result = active.configurationControl().incrementalAlterConfigs(toMap( // First, decrease the min ISR config to 1. This should clear the ELR fields.
entry(new ConfigResource(BROKER, ""), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))), ControllerResult<Map<ConfigResource, ApiError>> result = active.configurationControl().incrementalAlterConfigs(toMap(
true); entry(new ConfigResource(TOPIC, "foo"), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
assertEquals(2, result.records().size(), result.records().toString()); true);
RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0))); assertEquals(2, result.records().size(), result.records().toString());
RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1))); RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0)));
RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1)));
partition = active.replicationControl().getPartition(topicIdBar, 0); partition = active.replicationControl().getPartition(topicIdFoo, 0);
assertEquals(0, partition.elr.length, partition.toString()); assertEquals(0, partition.elr.length, partition.toString());
assertArrayEquals(new int[]{1}, partition.isr, partition.toString()); assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
// Second, let's try update config on cluster level with the other topic.
partition = active.replicationControl().getPartition(topicIdBar, 0);
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
assertEquals(1, partition.elr.length, partition.toString());
result = active.configurationControl().incrementalAlterConfigs(toMap(
entry(new ConfigResource(BROKER, ""), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
true);
assertEquals(2, result.records().size(), result.records().toString());
RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0)));
RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1)));
partition = active.replicationControl().getPartition(topicIdBar, 0);
assertEquals(0, partition.elr.length, partition.toString());
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
} finally {
hbExec.shutdownNow();
}
} }
} }