diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index 3e58cabeac3..aebcc865619 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -649,88 +649,115 @@ public class QuorumControllerTest { // Unfence all brokers and create a topic foo (min ISR 2) sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs); - CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics( - new CreatableTopicCollection(List.of( - new CreatableTopic().setName("foo").setNumPartitions(1). - setReplicationFactor(replicationFactor), - new CreatableTopic().setName("bar").setNumPartitions(1). - setReplicationFactor(replicationFactor) - ).iterator())); - CreateTopicsResponseData createTopicsResponseData = active.createTopics( - ANONYMOUS_CONTEXT, createTopicsRequestData, - Set.of("foo", "bar")).get(); - assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode())); - assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode())); - Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId(); - Uuid topicIdBar = createTopicsResponseData.topics().find("bar").topicId(); - ConfigRecord configRecord = new ConfigRecord() - .setResourceType(BROKER.id()) - .setResourceName("") - .setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) - .setValue("2"); - RecordTestUtils.replayAll(active.configurationControl(), List.of(new ApiMessageAndVersion(configRecord, (short) 0))); - // Fence brokers - TestUtils.waitForCondition(() -> { - sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs); - for (Integer brokerId : brokersToFence) { - if (active.clusterControl().isUnfenced(brokerId)) { - return false; - } + // Heartbeat pumper + final java.util.concurrent.ScheduledExecutorService hbExec = + java.util.concurrent.Executors.newSingleThreadScheduledExecutor(); + final java.util.concurrent.atomic.AtomicBoolean keepOnly = + new java.util.concurrent.atomic.AtomicBoolean(false); + final long periodMs = Math.max(50L, sessionTimeoutMillis / 3); + + hbExec.scheduleAtFixedRate(() -> { + try { + if (keepOnly.get()) { + sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs); + } else { + sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs); } - return true; - }, sessionTimeoutMillis * 30, - "Fencing of brokers did not process within expected time" - ); + } catch (Throwable t) { + throw new RuntimeException(t); + } + }, 0L, periodMs, java.util.concurrent.TimeUnit.MILLISECONDS); - // Send another heartbeat to the brokers we want to keep alive - sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs); + try { + CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics( + new CreatableTopicCollection(List.of( + new CreatableTopic().setName("foo").setNumPartitions(1). + setReplicationFactor(replicationFactor), + new CreatableTopic().setName("bar").setNumPartitions(1). + setReplicationFactor(replicationFactor) + ).iterator())); + CreateTopicsResponseData createTopicsResponseData = active.createTopics( + ANONYMOUS_CONTEXT, createTopicsRequestData, + Set.of("foo", "bar")).get(); + assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode())); + assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode())); + Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId(); + Uuid topicIdBar = createTopicsResponseData.topics().find("bar").topicId(); + ConfigRecord configRecord = new ConfigRecord() + .setResourceType(BROKER.id()) + .setResourceName("") + .setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) + .setValue("2"); + RecordTestUtils.replayAll(active.configurationControl(), List.of(new ApiMessageAndVersion(configRecord, (short) 0))); - // At this point only the brokers we want to fence (broker 2, 3) should be fenced. - brokersToKeepUnfenced.forEach(brokerId -> { - assertTrue(active.clusterControl().isUnfenced(brokerId), - "Broker " + brokerId + " should have been unfenced"); - }); - brokersToFence.forEach(brokerId -> { - assertFalse(active.clusterControl().isUnfenced(brokerId), - "Broker " + brokerId + " should have been fenced"); - }); - sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs); + // Before fencing wait, switch pumper to only keep brokersToKeepUnfenced alive + keepOnly.set(true); - // Verify the isr and elr for the topic partition - PartitionRegistration partition = active.replicationControl().getPartition(topicIdFoo, 0); - assertArrayEquals(new int[]{1}, partition.isr, partition.toString()); + // Fence brokers + TestUtils.waitForCondition(() -> { + sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs); + for (Integer brokerId : brokersToFence) { + if (active.clusterControl().isUnfenced(brokerId)) { + return false; + } + } + return true; + }, sessionTimeoutMillis * 30, + "Fencing of brokers did not process within expected time" + ); - // The ELR set is not determined but the size is 1. - assertEquals(1, partition.elr.length, partition.toString()); + // Send another heartbeat to the brokers we want to keep alive + sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs); - // First, decrease the min ISR config to 1. This should clear the ELR fields. - ControllerResult> result = active.configurationControl().incrementalAlterConfigs(toMap( - entry(new ConfigResource(TOPIC, "foo"), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))), - true); - assertEquals(2, result.records().size(), result.records().toString()); - RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0))); - RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1))); + // At this point only the brokers we want to fence (broker 2, 3) should be fenced. + brokersToKeepUnfenced.forEach(brokerId -> { + assertTrue(active.clusterControl().isUnfenced(brokerId), + "Broker " + brokerId + " should have been unfenced"); + }); + brokersToFence.forEach(brokerId -> { + assertFalse(active.clusterControl().isUnfenced(brokerId), + "Broker " + brokerId + " should have been fenced"); + }); + sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs); - partition = active.replicationControl().getPartition(topicIdFoo, 0); - assertEquals(0, partition.elr.length, partition.toString()); - assertArrayEquals(new int[]{1}, partition.isr, partition.toString()); + // Verify the isr and elr for the topic partition + PartitionRegistration partition = active.replicationControl().getPartition(topicIdFoo, 0); + assertArrayEquals(new int[]{1}, partition.isr, partition.toString()); - // Second, let's try update config on cluster level with the other topic. - partition = active.replicationControl().getPartition(topicIdBar, 0); - assertArrayEquals(new int[]{1}, partition.isr, partition.toString()); - assertEquals(1, partition.elr.length, partition.toString()); + // The ELR set is not determined but the size is 1. + assertEquals(1, partition.elr.length, partition.toString()); - result = active.configurationControl().incrementalAlterConfigs(toMap( - entry(new ConfigResource(BROKER, ""), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))), - true); - assertEquals(2, result.records().size(), result.records().toString()); - RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0))); - RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1))); + // First, decrease the min ISR config to 1. This should clear the ELR fields. + ControllerResult> result = active.configurationControl().incrementalAlterConfigs(toMap( + entry(new ConfigResource(TOPIC, "foo"), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))), + true); + assertEquals(2, result.records().size(), result.records().toString()); + RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0))); + RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1))); - partition = active.replicationControl().getPartition(topicIdBar, 0); - assertEquals(0, partition.elr.length, partition.toString()); - assertArrayEquals(new int[]{1}, partition.isr, partition.toString()); + partition = active.replicationControl().getPartition(topicIdFoo, 0); + assertEquals(0, partition.elr.length, partition.toString()); + assertArrayEquals(new int[]{1}, partition.isr, partition.toString()); + + // Second, let's try update config on cluster level with the other topic. + partition = active.replicationControl().getPartition(topicIdBar, 0); + assertArrayEquals(new int[]{1}, partition.isr, partition.toString()); + assertEquals(1, partition.elr.length, partition.toString()); + + result = active.configurationControl().incrementalAlterConfigs(toMap( + entry(new ConfigResource(BROKER, ""), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))), + true); + assertEquals(2, result.records().size(), result.records().toString()); + RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0))); + RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1))); + + partition = active.replicationControl().getPartition(topicIdBar, 0); + assertEquals(0, partition.elr.length, partition.toString()); + assertArrayEquals(new int[]{1}, partition.isr, partition.toString()); + } finally { + hbExec.shutdownNow(); + } } }