KAFKA-18083 ClusterInstance custom controllerListener not work (#17932)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kuan-Po Tseng 2024-11-26 22:01:21 +08:00 committed by GitHub
parent 0b081fc310
commit 55577e73b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 14 additions and 5 deletions

View File

@ -111,7 +111,6 @@ public class KafkaClusterTestKit implements AutoCloseable {
private final String brokerSecurityProtocol;
private final String controllerSecurityProtocol;
public Builder(TestKitNodes nodes) {
this.nodes = nodes;
this.brokerListenerName = nodes.brokerListenerName().value();
@ -169,7 +168,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
append("@").
append("localhost").
append(":").
append(socketFactoryManager.getOrCreatePortForListener(nodeId, "CONTROLLER"));
append(socketFactoryManager.getOrCreatePortForListener(nodeId, controllerListenerName));
prefix = ",";
}
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, quorumVoterStringBuilder.toString());
@ -199,7 +198,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
try {
baseDirectory = new File(nodes.baseDirectory());
for (TestKitNode node : nodes.controllerNodes().values()) {
socketFactoryManager.getOrCreatePortForListener(node.id(), "CONTROLLER");
socketFactoryManager.getOrCreatePortForListener(node.id(), controllerListenerName);
}
for (TestKitNode node : nodes.controllerNodes().values()) {
setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList());
@ -316,6 +315,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
private final File baseDirectory;
private final SimpleFaultHandlerFactory faultHandlerFactory;
private final PreboundSocketFactoryManager socketFactoryManager;
private final String controllerListenerName;
private KafkaClusterTestKit(
TestKitNodes nodes,
@ -338,6 +338,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
this.baseDirectory = baseDirectory;
this.faultHandlerFactory = faultHandlerFactory;
this.socketFactoryManager = socketFactoryManager;
this.controllerListenerName = nodes.controllerListenerName().value();
}
public void format() throws Exception {
@ -389,7 +390,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME));
formatter.setUnstableFeatureVersionsEnabled(true);
formatter.setIgnoreFormatted(false);
formatter.setControllerListenerName("CONTROLLER");
formatter.setControllerListenerName(controllerListenerName);
if (writeMetadataDirectory) {
formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
} else {
@ -400,7 +401,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
String prefix = "";
for (TestKitNode controllerNode : nodes.controllerNodes().values()) {
int port = socketFactoryManager.
getOrCreatePortForListener(controllerNode.id(), "CONTROLLER");
getOrCreatePortForListener(controllerNode.id(), controllerListenerName);
dynamicVotersBuilder.append(prefix);
prefix = ",";
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",

View File

@ -322,4 +322,12 @@ public class ClusterTestExtensionsTest {
assertEquals(value, records.get(0).value());
}
}
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, controllerListener = "FOO")
public void testControllerListenerName(ClusterInstance cluster) throws ExecutionException, InterruptedException {
assertEquals("FOO", cluster.controllerListenerName().get().value());
try (Admin admin = cluster.admin(Map.of(), true)) {
assertEquals(1, admin.describeMetadataQuorum().quorumInfo().get().nodes().size());
}
}
}