MINOR: Add interface for aliveBroker and isShutDwon for Brokers. (#16323)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TaiJuWu 2024-06-19 22:32:08 +08:00 committed by GitHub
parent 2b491f9611
commit c0add50a99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 32 additions and 0 deletions

View File

@ -733,6 +733,10 @@ class BrokerServer(
}
}
override def isShutdown(): Boolean = {
status == SHUTDOWN || status == SHUTTING_DOWN
}
override def awaitShutdown(): Unit = {
lock.lock()
try {

View File

@ -97,6 +97,7 @@ trait KafkaBroker extends Logging {
def awaitShutdown(): Unit
def shutdown(): Unit = shutdown(Duration.ofMinutes(5))
def shutdown(timeout: Duration): Unit
def isShutdown(): Boolean
def brokerTopicStats: BrokerTopicStats
def credentialProvider: CredentialProvider
def clientToControllerChannelManager: NodeToControllerChannelManager

View File

@ -1085,6 +1085,13 @@ class KafkaServer(
}
}
override def isShutdown(): Boolean = {
BrokerState.fromValue(brokerState.value()) match {
case BrokerState.SHUTTING_DOWN | BrokerState.NOT_RUNNING => true
case _ => false
}
}
/**
* After calling shutdown(), use this API to wait until the shutdown is complete
*/

View File

@ -52,6 +52,11 @@ public interface ClusterInstance {
Map<Integer, KafkaBroker> brokers();
default Map<Integer, KafkaBroker> aliveBrokers() {
return brokers().entrySet().stream().filter(entry -> !entry.getValue().isShutdown())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
Map<Integer, ControllerServer> controllers();
/**

View File

@ -234,4 +234,19 @@ public class ClusterTestExtensionsTest {
Assertions.assertTrue(clusterInstance.supportedGroupProtocols().contains(CLASSIC));
Assertions.assertEquals(1, clusterInstance.supportedGroupProtocols().size());
}
@ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
public void testClusterAliveBrokers(ClusterInstance clusterInstance) throws Exception {
clusterInstance.waitForReadyBrokers();
// Remove broker id 0
clusterInstance.shutdownBroker(0);
Assertions.assertFalse(clusterInstance.aliveBrokers().containsKey(0));
Assertions.assertTrue(clusterInstance.brokers().containsKey(0));
// add broker id 0 back
clusterInstance.startBroker(0);
Assertions.assertTrue(clusterInstance.aliveBrokers().containsKey(0));
Assertions.assertTrue(clusterInstance.brokers().containsKey(0));
}
}