This commit is contained in:
Anshul Bisht 2025-10-07 23:22:27 +00:00 committed by GitHub
commit 4fa279ddb0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 98 additions and 71 deletions

View File

@ -649,88 +649,115 @@ public class QuorumControllerTest {
// Unfence all brokers and create a topic foo (min ISR 2)
sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(List.of(
new CreatableTopic().setName("foo").setNumPartitions(1).
setReplicationFactor(replicationFactor),
new CreatableTopic().setName("bar").setNumPartitions(1).
setReplicationFactor(replicationFactor)
).iterator()));
CreateTopicsResponseData createTopicsResponseData = active.createTopics(
ANONYMOUS_CONTEXT, createTopicsRequestData,
Set.of("foo", "bar")).get();
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode()));
Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
Uuid topicIdBar = createTopicsResponseData.topics().find("bar").topicId();
ConfigRecord configRecord = new ConfigRecord()
.setResourceType(BROKER.id())
.setResourceName("")
.setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
.setValue("2");
RecordTestUtils.replayAll(active.configurationControl(), List.of(new ApiMessageAndVersion(configRecord, (short) 0)));
// Fence brokers
TestUtils.waitForCondition(() -> {
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
for (Integer brokerId : brokersToFence) {
if (active.clusterControl().isUnfenced(brokerId)) {
return false;
}
// Heartbeat pumper
final java.util.concurrent.ScheduledExecutorService hbExec =
java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
final java.util.concurrent.atomic.AtomicBoolean keepOnly =
new java.util.concurrent.atomic.AtomicBoolean(false);
final long periodMs = Math.max(50L, sessionTimeoutMillis / 3);
hbExec.scheduleAtFixedRate(() -> {
try {
if (keepOnly.get()) {
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
} else {
sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
}
return true;
}, sessionTimeoutMillis * 30,
"Fencing of brokers did not process within expected time"
);
} catch (Throwable t) {
throw new RuntimeException(t);
}
}, 0L, periodMs, java.util.concurrent.TimeUnit.MILLISECONDS);
// Send another heartbeat to the brokers we want to keep alive
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
try {
CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(List.of(
new CreatableTopic().setName("foo").setNumPartitions(1).
setReplicationFactor(replicationFactor),
new CreatableTopic().setName("bar").setNumPartitions(1).
setReplicationFactor(replicationFactor)
).iterator()));
CreateTopicsResponseData createTopicsResponseData = active.createTopics(
ANONYMOUS_CONTEXT, createTopicsRequestData,
Set.of("foo", "bar")).get();
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode()));
Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
Uuid topicIdBar = createTopicsResponseData.topics().find("bar").topicId();
ConfigRecord configRecord = new ConfigRecord()
.setResourceType(BROKER.id())
.setResourceName("")
.setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
.setValue("2");
RecordTestUtils.replayAll(active.configurationControl(), List.of(new ApiMessageAndVersion(configRecord, (short) 0)));
// At this point only the brokers we want to fence (broker 2, 3) should be fenced.
brokersToKeepUnfenced.forEach(brokerId -> {
assertTrue(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been unfenced");
});
brokersToFence.forEach(brokerId -> {
assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
// Before fencing wait, switch pumper to only keep brokersToKeepUnfenced alive
keepOnly.set(true);
// Verify the isr and elr for the topic partition
PartitionRegistration partition = active.replicationControl().getPartition(topicIdFoo, 0);
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
// Fence brokers
TestUtils.waitForCondition(() -> {
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
for (Integer brokerId : brokersToFence) {
if (active.clusterControl().isUnfenced(brokerId)) {
return false;
}
}
return true;
}, sessionTimeoutMillis * 30,
"Fencing of brokers did not process within expected time"
);
// The ELR set is not determined but the size is 1.
assertEquals(1, partition.elr.length, partition.toString());
// Send another heartbeat to the brokers we want to keep alive
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
// First, decrease the min ISR config to 1. This should clear the ELR fields.
ControllerResult<Map<ConfigResource, ApiError>> result = active.configurationControl().incrementalAlterConfigs(toMap(
entry(new ConfigResource(TOPIC, "foo"), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
true);
assertEquals(2, result.records().size(), result.records().toString());
RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0)));
RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1)));
// At this point only the brokers we want to fence (broker 2, 3) should be fenced.
brokersToKeepUnfenced.forEach(brokerId -> {
assertTrue(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been unfenced");
});
brokersToFence.forEach(brokerId -> {
assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
partition = active.replicationControl().getPartition(topicIdFoo, 0);
assertEquals(0, partition.elr.length, partition.toString());
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
// Verify the isr and elr for the topic partition
PartitionRegistration partition = active.replicationControl().getPartition(topicIdFoo, 0);
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
// Second, let's try update config on cluster level with the other topic.
partition = active.replicationControl().getPartition(topicIdBar, 0);
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
assertEquals(1, partition.elr.length, partition.toString());
// The ELR set is not determined but the size is 1.
assertEquals(1, partition.elr.length, partition.toString());
result = active.configurationControl().incrementalAlterConfigs(toMap(
entry(new ConfigResource(BROKER, ""), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
true);
assertEquals(2, result.records().size(), result.records().toString());
RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0)));
RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1)));
// First, decrease the min ISR config to 1. This should clear the ELR fields.
ControllerResult<Map<ConfigResource, ApiError>> result = active.configurationControl().incrementalAlterConfigs(toMap(
entry(new ConfigResource(TOPIC, "foo"), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
true);
assertEquals(2, result.records().size(), result.records().toString());
RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0)));
RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1)));
partition = active.replicationControl().getPartition(topicIdBar, 0);
assertEquals(0, partition.elr.length, partition.toString());
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
partition = active.replicationControl().getPartition(topicIdFoo, 0);
assertEquals(0, partition.elr.length, partition.toString());
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
// Second, let's try update config on cluster level with the other topic.
partition = active.replicationControl().getPartition(topicIdBar, 0);
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
assertEquals(1, partition.elr.length, partition.toString());
result = active.configurationControl().incrementalAlterConfigs(toMap(
entry(new ConfigResource(BROKER, ""), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
true);
assertEquals(2, result.records().size(), result.records().toString());
RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0)));
RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1)));
partition = active.replicationControl().getPartition(topicIdBar, 0);
assertEquals(0, partition.elr.length, partition.toString());
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
} finally {
hbExec.shutdownNow();
}
}
}