KAFKA-17721 Enable to configure listener name and protocol for controller (#17525)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kuan-Po Tseng 2024-11-20 23:06:29 +08:00 committed by GitHub
parent e9fd0437d5
commit c6294aacef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 80 additions and 13 deletions

View File

@ -48,6 +48,8 @@ public class TestKitNodes {
public static final int BROKER_ID_OFFSET = 0;
public static final SecurityProtocol DEFAULT_BROKER_SECURITY_PROTOCOL = SecurityProtocol.PLAINTEXT;
public static final String DEFAULT_BROKER_LISTENER_NAME = "EXTERNAL";
public static final SecurityProtocol DEFAULT_CONTROLLER_SECURITY_PROTOCOL = SecurityProtocol.PLAINTEXT;
public static final String DEFAULT_CONTROLLER_LISTENER_NAME = "CONTROLLER";
public static class Builder {
private boolean combined;
@ -66,10 +68,12 @@ public class TestKitNodes {
public Builder(BootstrapMetadata bootstrapMetadata) {
this.bootstrapMetadata = bootstrapMetadata;
}
// The brokerListenerName and brokerSecurityProtocol configurations must
// The broker and controller listener name and SecurityProtocol configurations must
// be kept in sync with the default values in ClusterTest.
private ListenerName brokerListenerName = ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME);
private SecurityProtocol brokerSecurityProtocol = DEFAULT_BROKER_SECURITY_PROTOCOL;
private ListenerName controllerListenerName = ListenerName.normalised(DEFAULT_CONTROLLER_LISTENER_NAME);
private SecurityProtocol controllerSecurityProtocol = DEFAULT_CONTROLLER_SECURITY_PROTOCOL;
public Builder setClusterId(String clusterId) {
this.clusterId = clusterId;
@ -134,6 +138,16 @@ public class TestKitNodes {
return this;
}
public Builder setControllerListenerName(ListenerName listenerName) {
this.controllerListenerName = listenerName;
return this;
}
public Builder setControllerSecurityProtocol(SecurityProtocol securityProtocol) {
this.controllerSecurityProtocol = securityProtocol;
return this;
}
public TestKitNodes build() {
if (numControllerNodes < 0) {
throw new IllegalArgumentException("Invalid negative value for numControllerNodes");
@ -145,7 +159,7 @@ public class TestKitNodes {
throw new IllegalArgumentException("Invalid value for numDisksPerBroker");
}
// TODO: remove this assertion after https://issues.apache.org/jira/browse/KAFKA-16680 is finished
if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT) {
if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT || controllerSecurityProtocol != SecurityProtocol.PLAINTEXT) {
throw new IllegalArgumentException("Currently only support PLAINTEXT security protocol");
}
if (baseDirectory == null) {
@ -203,7 +217,7 @@ public class TestKitNodes {
}
return new TestKitNodes(baseDirectory.toFile().getAbsolutePath(), clusterId, bootstrapMetadata, controllerNodes, brokerNodes,
brokerListenerName, brokerSecurityProtocol, new ListenerName("CONTROLLER"), SecurityProtocol.PLAINTEXT);
brokerListenerName, brokerSecurityProtocol, controllerListenerName, controllerSecurityProtocol);
}
}

View File

@ -36,18 +36,25 @@ public class TestKitNodeTest {
assertEquals("Currently only support PLAINTEXT security protocol",
assertThrows(IllegalArgumentException.class,
() -> new TestKitNodes.Builder().setBrokerSecurityProtocol(securityProtocol).build()).getMessage());
assertEquals("Currently only support PLAINTEXT security protocol",
assertThrows(IllegalArgumentException.class,
() -> new TestKitNodes.Builder().setControllerSecurityProtocol(securityProtocol).build()).getMessage());
}
}
@Test
public void testListenerName() {
ListenerName listenerName = ListenerName.normalised("FOOBAR");
ListenerName brokerListenerName = ListenerName.normalised("FOOBAR");
ListenerName controllerListenerName = ListenerName.normalised("BAZQUX");
TestKitNodes testKitNodes = new TestKitNodes.Builder()
.setNumBrokerNodes(1)
.setNumControllerNodes(1)
.setBrokerListenerName(listenerName)
.setBrokerListenerName(brokerListenerName)
.setBrokerSecurityProtocol(SecurityProtocol.PLAINTEXT)
.setControllerListenerName(controllerListenerName)
.setControllerSecurityProtocol(SecurityProtocol.PLAINTEXT)
.build();
assertEquals(listenerName, testKitNodes.brokerListenerName());
assertEquals(brokerListenerName, testKitNodes.brokerListenerName());
assertEquals(controllerListenerName, testKitNodes.controllerListenerName());
}
}

View File

@ -38,6 +38,8 @@ import java.util.stream.Stream;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_SECURITY_PROTOCOL;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_LISTENER_NAME;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_SECURITY_PROTOCOL;
/**
* Represents an immutable requested configuration of a Kafka cluster for integration testing.
@ -51,6 +53,8 @@ public class ClusterConfig {
private final boolean autoStart;
private final SecurityProtocol brokerSecurityProtocol;
private final ListenerName brokerListenerName;
private final SecurityProtocol controllerSecurityProtocol;
private final ListenerName controllerListenerName;
private final File trustStoreFile;
private final MetadataVersion metadataVersion;
@ -66,7 +70,8 @@ public class ClusterConfig {
@SuppressWarnings("checkstyle:ParameterNumber")
private ClusterConfig(Set<Type> types, int brokers, int controllers, int disksPerBroker, boolean autoStart,
SecurityProtocol brokerSecurityProtocol, ListenerName brokerListenerName, File trustStoreFile,
SecurityProtocol brokerSecurityProtocol, ListenerName brokerListenerName,
SecurityProtocol controllerSecurityProtocol, ListenerName controllerListenerName, File trustStoreFile,
MetadataVersion metadataVersion, Map<String, String> serverProperties, Map<String, String> producerProperties,
Map<String, String> consumerProperties, Map<String, String> adminClientProperties, Map<String, String> saslServerProperties,
Map<String, String> saslClientProperties, Map<Integer, Map<String, String>> perServerProperties, List<String> tags,
@ -83,6 +88,8 @@ public class ClusterConfig {
this.autoStart = autoStart;
this.brokerSecurityProtocol = Objects.requireNonNull(brokerSecurityProtocol);
this.brokerListenerName = Objects.requireNonNull(brokerListenerName);
this.controllerSecurityProtocol = Objects.requireNonNull(controllerSecurityProtocol);
this.controllerListenerName = Objects.requireNonNull(controllerListenerName);
this.trustStoreFile = trustStoreFile;
this.metadataVersion = Objects.requireNonNull(metadataVersion);
this.serverProperties = Objects.requireNonNull(serverProperties);
@ -144,6 +151,14 @@ public class ClusterConfig {
return brokerSecurityProtocol;
}
public ListenerName controllerListenerName() {
return controllerListenerName;
}
public SecurityProtocol controllerSecurityProtocol() {
return controllerSecurityProtocol;
}
public ListenerName brokerListenerName() {
return brokerListenerName;
}
@ -173,6 +188,8 @@ public class ClusterConfig {
displayTags.add("MetadataVersion=" + metadataVersion);
displayTags.add("BrokerSecurityProtocol=" + brokerSecurityProtocol.name());
displayTags.add("BrokerListenerName=" + brokerListenerName);
displayTags.add("ControllerSecurityProtocol=" + controllerSecurityProtocol.name());
displayTags.add("ControllerListenerName=" + controllerListenerName);
return displayTags;
}
@ -185,6 +202,8 @@ public class ClusterConfig {
.setAutoStart(true)
.setBrokerSecurityProtocol(DEFAULT_BROKER_SECURITY_PROTOCOL)
.setBrokerListenerName(ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME))
.setControllerSecurityProtocol(DEFAULT_CONTROLLER_SECURITY_PROTOCOL)
.setControllerListenerName(ListenerName.normalised(DEFAULT_CONTROLLER_LISTENER_NAME))
.setMetadataVersion(MetadataVersion.latestTesting());
}
@ -201,6 +220,8 @@ public class ClusterConfig {
.setAutoStart(clusterConfig.autoStart)
.setBrokerSecurityProtocol(clusterConfig.brokerSecurityProtocol)
.setBrokerListenerName(clusterConfig.brokerListenerName)
.setControllerSecurityProtocol(clusterConfig.controllerSecurityProtocol)
.setControllerListenerName(clusterConfig.controllerListenerName)
.setTrustStoreFile(clusterConfig.trustStoreFile)
.setMetadataVersion(clusterConfig.metadataVersion)
.setServerProperties(clusterConfig.serverProperties)
@ -222,6 +243,8 @@ public class ClusterConfig {
private boolean autoStart;
private SecurityProtocol brokerSecurityProtocol;
private ListenerName brokerListenerName;
private SecurityProtocol controllerSecurityProtocol;
private ListenerName controllerListenerName;
private File trustStoreFile;
private MetadataVersion metadataVersion;
private Map<String, String> serverProperties = Collections.emptyMap();
@ -271,6 +294,16 @@ public class ClusterConfig {
return this;
}
public Builder setControllerSecurityProtocol(SecurityProtocol securityProtocol) {
this.controllerSecurityProtocol = securityProtocol;
return this;
}
public Builder setControllerListenerName(ListenerName listenerName) {
this.controllerListenerName = listenerName;
return this;
}
public Builder setTrustStoreFile(File trustStoreFile) {
this.trustStoreFile = trustStoreFile;
return this;
@ -329,7 +362,8 @@ public class ClusterConfig {
}
public ClusterConfig build() {
return new ClusterConfig(types, brokers, controllers, disksPerBroker, autoStart, brokerSecurityProtocol, brokerListenerName,
return new ClusterConfig(types, brokers, controllers, disksPerBroker, autoStart,
brokerSecurityProtocol, brokerListenerName, controllerSecurityProtocol, controllerListenerName,
trustStoreFile, metadataVersion, serverProperties, producerProperties, consumerProperties,
adminClientProperties, saslServerProperties, saslClientProperties,
perServerProperties, tags, features);

View File

@ -31,6 +31,7 @@ import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_LISTENER_NAME;
@Documented
@Target({METHOD})
@ -44,11 +45,13 @@ public @interface ClusterTest {
int controllers() default 0;
int disksPerBroker() default 0;
AutoStart autoStart() default AutoStart.DEFAULT;
// The brokerListenerName and brokerSecurityProtocol configurations must
// The broker/controller listener name and SecurityProtocol configurations must
// be kept in sync with the default values in TestKitNodes, as many tests
// directly use TestKitNodes without relying on the ClusterTest annotation.
SecurityProtocol brokerSecurityProtocol() default SecurityProtocol.PLAINTEXT;
String brokerListener() default DEFAULT_BROKER_LISTENER_NAME;
SecurityProtocol controllerSecurityProtocol() default SecurityProtocol.PLAINTEXT;
String controllerListener() default DEFAULT_CONTROLLER_LISTENER_NAME;
MetadataVersion metadataVersion() default MetadataVersion.IBP_4_0_IV3;
ClusterConfigProperty[] serverProperties() default {};
// users can add tags that they want to display in test

View File

@ -250,9 +250,11 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
.setDisksPerBroker(clusterTest.disksPerBroker() == 0 ? defaults.disksPerBroker() : clusterTest.disksPerBroker())
.setAutoStart(clusterTest.autoStart() == AutoStart.DEFAULT ? defaults.autoStart() : clusterTest.autoStart() == AutoStart.YES)
.setBrokerListenerName(ListenerName.normalised(clusterTest.brokerListener()))
.setBrokerSecurityProtocol(clusterTest.brokerSecurityProtocol())
.setControllerListenerName(ListenerName.normalised(clusterTest.controllerListener()))
.setControllerSecurityProtocol(clusterTest.controllerSecurityProtocol())
.setServerProperties(serverProperties)
.setPerServerProperties(perServerProperties)
.setBrokerSecurityProtocol(clusterTest.brokerSecurityProtocol())
.setMetadataVersion(clusterTest.metadataVersion())
.setTags(Arrays.asList(clusterTest.tags()))
.setFeatures(features)

View File

@ -15,8 +15,8 @@ Arbitrary server properties can also be provided in the annotation:
```java
@ClusterTest(
types = {Type.KRAFT},
brokerSecurityProtocol = SecurityProtocol.PLAINTEXT,
types = {Type.KRAFT},
brokerSecurityProtocol = SecurityProtocol.PLAINTEXT,
properties = {
@ClusterProperty(key = "inter.broker.protocol.version", value = "2.7-IV2"),
@ClusterProperty(key = "socket.send.buffer.bytes", value = "10240"),

View File

@ -284,11 +284,12 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
.setNumControllerNodes(clusterConfig.numControllers())
.setBrokerListenerName(listenerName)
.setBrokerSecurityProtocol(clusterConfig.brokerSecurityProtocol())
.setControllerListenerName(clusterConfig.controllerListenerName())
.setControllerSecurityProtocol(clusterConfig.controllerSecurityProtocol())
.build();
KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes);
// Copy properties into the TestKit builder
clusterConfig.serverProperties().forEach(builder::setConfigProp);
// KAFKA-12512 need to pass security protocol and listener name here
this.clusterTestKit = builder.build();
this.clusterTestKit.format();
}

View File

@ -37,6 +37,8 @@ import java.util.stream.Collectors;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_SECURITY_PROTOCOL;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_LISTENER_NAME;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_SECURITY_PROTOCOL;
public class ClusterConfigTest {
@ -60,6 +62,8 @@ public class ClusterConfigTest {
.setTags(Arrays.asList("name", "Generated Test"))
.setBrokerSecurityProtocol(SecurityProtocol.PLAINTEXT)
.setBrokerListenerName(ListenerName.normalised("EXTERNAL"))
.setControllerSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)
.setControllerListenerName(ListenerName.normalised("CONTROLLER"))
.setTrustStoreFile(trustStoreFile)
.setMetadataVersion(MetadataVersion.IBP_0_8_0)
.setServerProperties(Collections.singletonMap("broker", "broker_value"))
@ -116,5 +120,7 @@ public class ClusterConfigTest {
Assertions.assertTrue(expectedDisplayTags.contains("MetadataVersion=" + MetadataVersion.latestTesting()));
Assertions.assertTrue(expectedDisplayTags.contains("BrokerSecurityProtocol=" + DEFAULT_BROKER_SECURITY_PROTOCOL));
Assertions.assertTrue(expectedDisplayTags.contains("BrokerListenerName=" + ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME)));
Assertions.assertTrue(expectedDisplayTags.contains("ControllerSecurityProtocol=" + DEFAULT_CONTROLLER_SECURITY_PROTOCOL));
Assertions.assertTrue(expectedDisplayTags.contains("ControllerListenerName=" + ListenerName.normalised(DEFAULT_CONTROLLER_LISTENER_NAME)));
}
}