KAFKA-17256 KRAFT should honor the listener name and security protocol from ClusterConfig (#16824)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kuan-Po Tseng 2024-10-17 10:34:15 +08:00 committed by GitHub
parent 1de4f27ec0
commit 8adfdbbde0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 170 additions and 55 deletions

View File

@ -406,11 +406,11 @@ class KRaftClusterTest {
.build() .build()
doOnStartedKafkaCluster(nodes) { implicit cluster => doOnStartedKafkaCluster(nodes) { implicit cluster =>
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.externalListenerName, (15L, SECONDS)) sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.brokerListenerName, (15L, SECONDS))
.nodes.values.forEach { broker => .nodes.values.forEach { broker =>
assertEquals("localhost", broker.host, assertEquals("localhost", broker.host,
"Did not advertise configured advertised host") "Did not advertise configured advertised host")
assertEquals(cluster.brokers.get(broker.id).socketServer.boundPort(cluster.nodes.externalListenerName), broker.port, assertEquals(cluster.brokers.get(broker.id).socketServer.boundPort(cluster.nodes.brokerListenerName), broker.port,
"Did not advertise bound socket port") "Did not advertise bound socket port")
} }
} }
@ -434,7 +434,7 @@ class KRaftClusterTest {
.build() .build()
doOnStartedKafkaCluster(nodes) { implicit cluster => doOnStartedKafkaCluster(nodes) { implicit cluster =>
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.externalListenerName, (15L, SECONDS)) sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.brokerListenerName, (15L, SECONDS))
.nodes.values.forEach { broker => .nodes.values.forEach { broker =>
assertEquals(s"advertised-host-${broker.id}", broker.host, "Did not advertise configured advertised host") assertEquals(s"advertised-host-${broker.id}", broker.host, "Did not advertise configured advertised host")
assertEquals(broker.id + 100, broker.port, "Did not advertise configured advertised port") assertEquals(broker.id + 100, broker.port, "Did not advertise configured advertised port")

View File

@ -57,7 +57,7 @@ object SaslApiVersionsRequestTest {
serverProperties.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0") serverProperties.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
List(ClusterConfig.defaultBuilder List(ClusterConfig.defaultBuilder
.setSecurityProtocol(securityProtocol) .setBrokerSecurityProtocol(securityProtocol)
.setTypes(Set(Type.KRAFT).asJava) .setTypes(Set(Type.KRAFT).asJava)
.setSaslServerProperties(saslServerProperties) .setSaslServerProperties(saslServerProperties)
.setSaslClientProperties(saslClientProperties) .setSaslClientProperties(saslClientProperties)

View File

@ -144,12 +144,20 @@ public class KafkaClusterTestKit implements AutoCloseable {
} }
public static class Builder { public static class Builder {
private TestKitNodes nodes; private final TestKitNodes nodes;
private final Map<String, Object> configProps = new HashMap<>(); private final Map<String, Object> configProps = new HashMap<>();
private final SimpleFaultHandlerFactory faultHandlerFactory = new SimpleFaultHandlerFactory(); private final SimpleFaultHandlerFactory faultHandlerFactory = new SimpleFaultHandlerFactory();
private final String brokerListenerName;
private final String controllerListenerName;
private final String brokerSecurityProtocol;
private final String controllerSecurityProtocol;
public Builder(TestKitNodes nodes) { public Builder(TestKitNodes nodes) {
this.nodes = nodes; this.nodes = nodes;
this.brokerListenerName = nodes.brokerListenerName().value();
this.controllerListenerName = nodes.controllerListenerName().value();
this.brokerSecurityProtocol = nodes.brokerListenerProtocol().name;
this.controllerSecurityProtocol = nodes.controllerListenerProtocol().name;
} }
public Builder setConfigProp(String key, Object value) { public Builder setConfigProp(String key, Object value) {
@ -187,12 +195,11 @@ public class KafkaClusterTestKit implements AutoCloseable {
// We allow configuring the listeners and related properties via Builder::setConfigProp, // We allow configuring the listeners and related properties via Builder::setConfigProp,
// and they shouldn't be overridden here // and they shouldn't be overridden here
props.putIfAbsent(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, props.putIfAbsent(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, String.format("%s:%s,%s:%s",
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); brokerListenerName, brokerSecurityProtocol, controllerListenerName, controllerSecurityProtocol));
props.putIfAbsent(SocketServerConfigs.LISTENERS_CONFIG, listeners(node.id())); props.putIfAbsent(SocketServerConfigs.LISTENERS_CONFIG, listeners(node.id()));
props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG, props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG, brokerListenerName);
nodes.interBrokerListenerName().value()); props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, controllerListenerName);
props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER");
// Note: we can't accurately set controller.quorum.voters yet, since we don't // Note: we can't accurately set controller.quorum.voters yet, since we don't
// yet know what ports each controller will pick. Set it to a dummy string // yet know what ports each controller will pick. Set it to a dummy string
@ -307,12 +314,12 @@ public class KafkaClusterTestKit implements AutoCloseable {
private String listeners(int node) { private String listeners(int node) {
if (nodes.isCombined(node)) { if (nodes.isCombined(node)) {
return "EXTERNAL://localhost:0,CONTROLLER://localhost:0"; return String.format("%s://localhost:0,%s://localhost:0", brokerListenerName, controllerListenerName);
} }
if (nodes.controllerNodes().containsKey(node)) { if (nodes.controllerNodes().containsKey(node)) {
return "CONTROLLER://localhost:0"; return String.format("%s://localhost:0", controllerListenerName);
} }
return "EXTERNAL://localhost:0"; return String.format("%s://localhost:0", brokerListenerName);
} }
private String roles(int node) { private String roles(int node) {
@ -521,7 +528,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) { for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey(); int brokerId = entry.getKey();
BrokerServer broker = entry.getValue(); BrokerServer broker = entry.getValue();
ListenerName listenerName = nodes.externalListenerName(); ListenerName listenerName = nodes.brokerListenerName();
int port = broker.boundPort(listenerName); int port = broker.boundPort(listenerName);
if (port <= 0) { if (port <= 0) {
throw new RuntimeException("Broker " + brokerId + " does not yet " + throw new RuntimeException("Broker " + brokerId + " does not yet " +

View File

@ -19,6 +19,7 @@ package org.apache.kafka.common.test;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.properties.MetaProperties; import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble; import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
@ -39,10 +40,13 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import java.util.stream.Stream; import java.util.stream.Stream;
@SuppressWarnings("NPathComplexity")
public class TestKitNodes { public class TestKitNodes {
public static final int CONTROLLER_ID_OFFSET = 3000; public static final int CONTROLLER_ID_OFFSET = 3000;
public static final int BROKER_ID_OFFSET = 0; public static final int BROKER_ID_OFFSET = 0;
public static final SecurityProtocol DEFAULT_BROKER_SECURITY_PROTOCOL = SecurityProtocol.PLAINTEXT;
public static final String DEFAULT_BROKER_LISTENER_NAME = "EXTERNAL";
public static class Builder { public static class Builder {
private boolean combined; private boolean combined;
@ -53,6 +57,10 @@ public class TestKitNodes {
private Map<Integer, Map<String, String>> perServerProperties = Collections.emptyMap(); private Map<Integer, Map<String, String>> perServerProperties = Collections.emptyMap();
private BootstrapMetadata bootstrapMetadata = BootstrapMetadata. private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
fromVersion(MetadataVersion.latestTesting(), "testkit"); fromVersion(MetadataVersion.latestTesting(), "testkit");
// The brokerListenerName and brokerSecurityProtocol configurations must
// be kept in sync with the default values in ClusterTest.
private ListenerName brokerListenerName = ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME);
private SecurityProtocol brokerSecurityProtocol = DEFAULT_BROKER_SECURITY_PROTOCOL;
public Builder setClusterId(String clusterId) { public Builder setClusterId(String clusterId) {
this.clusterId = clusterId; this.clusterId = clusterId;
@ -96,6 +104,16 @@ public class TestKitNodes {
return this; return this;
} }
public Builder setBrokerListenerName(ListenerName listenerName) {
this.brokerListenerName = listenerName;
return this;
}
public Builder setBrokerSecurityProtocol(SecurityProtocol securityProtocol) {
this.brokerSecurityProtocol = securityProtocol;
return this;
}
public TestKitNodes build() { public TestKitNodes build() {
if (numControllerNodes < 0) { if (numControllerNodes < 0) {
throw new IllegalArgumentException("Invalid negative value for numControllerNodes"); throw new IllegalArgumentException("Invalid negative value for numControllerNodes");
@ -106,6 +124,10 @@ public class TestKitNodes {
if (numDisksPerBroker <= 0) { if (numDisksPerBroker <= 0) {
throw new IllegalArgumentException("Invalid value for numDisksPerBroker"); throw new IllegalArgumentException("Invalid value for numDisksPerBroker");
} }
// TODO: remove this assertion after https://issues.apache.org/jira/browse/KAFKA-16680 is finished
if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT) {
throw new IllegalArgumentException("Currently only support PLAINTEXT security protocol");
}
String baseDirectory = TestUtils.tempDirectory().getAbsolutePath(); String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();
if (clusterId == null) { if (clusterId == null) {
@ -159,7 +181,8 @@ public class TestKitNodes {
brokerNodes.put(id, brokerNode); brokerNodes.put(id, brokerNode);
} }
return new TestKitNodes(baseDirectory, clusterId, bootstrapMetadata, controllerNodes, brokerNodes); return new TestKitNodes(baseDirectory, clusterId, bootstrapMetadata, controllerNodes, brokerNodes,
brokerListenerName, brokerSecurityProtocol, new ListenerName("CONTROLLER"), SecurityProtocol.PLAINTEXT);
} }
} }
@ -168,19 +191,31 @@ public class TestKitNodes {
private final BootstrapMetadata bootstrapMetadata; private final BootstrapMetadata bootstrapMetadata;
private final SortedMap<Integer, TestKitNode> controllerNodes; private final SortedMap<Integer, TestKitNode> controllerNodes;
private final SortedMap<Integer, TestKitNode> brokerNodes; private final SortedMap<Integer, TestKitNode> brokerNodes;
private final ListenerName brokerListenerName;
private final ListenerName controllerListenerName;
private final SecurityProtocol brokerSecurityProtocol;
private final SecurityProtocol controllerSecurityProtocol;
private TestKitNodes( private TestKitNodes(
String baseDirectory, String baseDirectory,
String clusterId, String clusterId,
BootstrapMetadata bootstrapMetadata, BootstrapMetadata bootstrapMetadata,
SortedMap<Integer, TestKitNode> controllerNodes, SortedMap<Integer, TestKitNode> controllerNodes,
SortedMap<Integer, TestKitNode> brokerNodes SortedMap<Integer, TestKitNode> brokerNodes,
ListenerName brokerListenerName,
SecurityProtocol brokerSecurityProtocol,
ListenerName controllerListenerName,
SecurityProtocol controllerSecurityProtocol
) { ) {
this.baseDirectory = Objects.requireNonNull(baseDirectory); this.baseDirectory = Objects.requireNonNull(baseDirectory);
this.clusterId = Objects.requireNonNull(clusterId); this.clusterId = Objects.requireNonNull(clusterId);
this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata); this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata);
this.controllerNodes = Collections.unmodifiableSortedMap(new TreeMap<>(Objects.requireNonNull(controllerNodes))); this.controllerNodes = Collections.unmodifiableSortedMap(new TreeMap<>(Objects.requireNonNull(controllerNodes)));
this.brokerNodes = Collections.unmodifiableSortedMap(new TreeMap<>(Objects.requireNonNull(brokerNodes))); this.brokerNodes = Collections.unmodifiableSortedMap(new TreeMap<>(Objects.requireNonNull(brokerNodes)));
this.brokerListenerName = Objects.requireNonNull(brokerListenerName);
this.controllerListenerName = Objects.requireNonNull(controllerListenerName);
this.brokerSecurityProtocol = Objects.requireNonNull(brokerSecurityProtocol);
this.controllerSecurityProtocol = Objects.requireNonNull(controllerSecurityProtocol);
} }
public boolean isCombined(int node) { public boolean isCombined(int node) {
@ -207,16 +242,20 @@ public class TestKitNodes {
return brokerNodes; return brokerNodes;
} }
public ListenerName interBrokerListenerName() { public ListenerName brokerListenerName() {
return new ListenerName("EXTERNAL"); return brokerListenerName;
} }
public ListenerName externalListenerName() { public SecurityProtocol brokerListenerProtocol() {
return new ListenerName("EXTERNAL"); return brokerSecurityProtocol;
} }
public ListenerName controllerListenerName() { public ListenerName controllerListenerName() {
return new ListenerName("CONTROLLER"); return controllerListenerName;
}
public SecurityProtocol controllerListenerProtocol() {
return controllerSecurityProtocol;
} }
private static TestKitNode buildBrokerNode(int id, private static TestKitNode buildBrokerNode(int id,

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.test;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class TestKitNodeTest {
@ParameterizedTest
@EnumSource(SecurityProtocol.class)
public void testSecurityProtocol(SecurityProtocol securityProtocol) {
if (securityProtocol != SecurityProtocol.PLAINTEXT) {
assertEquals("Currently only support PLAINTEXT security protocol",
assertThrows(IllegalArgumentException.class,
() -> new TestKitNodes.Builder().setBrokerSecurityProtocol(securityProtocol).build()).getMessage());
}
}
@Test
public void testListenerName() {
ListenerName listenerName = ListenerName.normalised("FOOBAR");
TestKitNodes testKitNodes = new TestKitNodes.Builder()
.setNumBrokerNodes(1)
.setNumControllerNodes(1)
.setBrokerListenerName(listenerName)
.setBrokerSecurityProtocol(SecurityProtocol.PLAINTEXT)
.build();
assertEquals(listenerName, testKitNodes.brokerListenerName());
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.test.api; package org.apache.kafka.common.test.api;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
@ -35,6 +36,9 @@ import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_SECURITY_PROTOCOL;
/** /**
* Represents an immutable requested configuration of a Kafka cluster for integration testing. * Represents an immutable requested configuration of a Kafka cluster for integration testing.
*/ */
@ -45,8 +49,8 @@ public class ClusterConfig {
private final int controllers; private final int controllers;
private final int disksPerBroker; private final int disksPerBroker;
private final boolean autoStart; private final boolean autoStart;
private final SecurityProtocol securityProtocol; private final SecurityProtocol brokerSecurityProtocol;
private final String listenerName; private final ListenerName brokerListenerName;
private final File trustStoreFile; private final File trustStoreFile;
private final MetadataVersion metadataVersion; private final MetadataVersion metadataVersion;
@ -62,7 +66,7 @@ public class ClusterConfig {
@SuppressWarnings("checkstyle:ParameterNumber") @SuppressWarnings("checkstyle:ParameterNumber")
private ClusterConfig(Set<Type> types, int brokers, int controllers, int disksPerBroker, boolean autoStart, private ClusterConfig(Set<Type> types, int brokers, int controllers, int disksPerBroker, boolean autoStart,
SecurityProtocol securityProtocol, String listenerName, File trustStoreFile, SecurityProtocol brokerSecurityProtocol, ListenerName brokerListenerName, File trustStoreFile,
MetadataVersion metadataVersion, Map<String, String> serverProperties, Map<String, String> producerProperties, MetadataVersion metadataVersion, Map<String, String> serverProperties, Map<String, String> producerProperties,
Map<String, String> consumerProperties, Map<String, String> adminClientProperties, Map<String, String> saslServerProperties, Map<String, String> consumerProperties, Map<String, String> adminClientProperties, Map<String, String> saslServerProperties,
Map<String, String> saslClientProperties, Map<Integer, Map<String, String>> perServerProperties, List<String> tags, Map<String, String> saslClientProperties, Map<Integer, Map<String, String>> perServerProperties, List<String> tags,
@ -77,8 +81,8 @@ public class ClusterConfig {
this.controllers = controllers; this.controllers = controllers;
this.disksPerBroker = disksPerBroker; this.disksPerBroker = disksPerBroker;
this.autoStart = autoStart; this.autoStart = autoStart;
this.securityProtocol = Objects.requireNonNull(securityProtocol); this.brokerSecurityProtocol = Objects.requireNonNull(brokerSecurityProtocol);
this.listenerName = listenerName; this.brokerListenerName = Objects.requireNonNull(brokerListenerName);
this.trustStoreFile = trustStoreFile; this.trustStoreFile = trustStoreFile;
this.metadataVersion = Objects.requireNonNull(metadataVersion); this.metadataVersion = Objects.requireNonNull(metadataVersion);
this.serverProperties = Objects.requireNonNull(serverProperties); this.serverProperties = Objects.requireNonNull(serverProperties);
@ -136,12 +140,12 @@ public class ClusterConfig {
return saslClientProperties; return saslClientProperties;
} }
public SecurityProtocol securityProtocol() { public SecurityProtocol brokerSecurityProtocol() {
return securityProtocol; return brokerSecurityProtocol;
} }
public Optional<String> listenerName() { public ListenerName brokerListenerName() {
return Optional.ofNullable(listenerName); return brokerListenerName;
} }
public Optional<File> trustStoreFile() { public Optional<File> trustStoreFile() {
@ -167,8 +171,8 @@ public class ClusterConfig {
public Set<String> displayTags() { public Set<String> displayTags() {
Set<String> displayTags = new LinkedHashSet<>(tags); Set<String> displayTags = new LinkedHashSet<>(tags);
displayTags.add("MetadataVersion=" + metadataVersion); displayTags.add("MetadataVersion=" + metadataVersion);
displayTags.add("Security=" + securityProtocol.name()); displayTags.add("BrokerSecurityProtocol=" + brokerSecurityProtocol.name());
listenerName().ifPresent(listener -> displayTags.add("Listener=" + listener)); displayTags.add("BrokerListenerName=" + brokerListenerName);
return displayTags; return displayTags;
} }
@ -179,7 +183,8 @@ public class ClusterConfig {
.setControllers(1) .setControllers(1)
.setDisksPerBroker(1) .setDisksPerBroker(1)
.setAutoStart(true) .setAutoStart(true)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT) .setBrokerSecurityProtocol(DEFAULT_BROKER_SECURITY_PROTOCOL)
.setBrokerListenerName(ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME))
.setMetadataVersion(MetadataVersion.latestTesting()); .setMetadataVersion(MetadataVersion.latestTesting());
} }
@ -194,8 +199,8 @@ public class ClusterConfig {
.setControllers(clusterConfig.controllers) .setControllers(clusterConfig.controllers)
.setDisksPerBroker(clusterConfig.disksPerBroker) .setDisksPerBroker(clusterConfig.disksPerBroker)
.setAutoStart(clusterConfig.autoStart) .setAutoStart(clusterConfig.autoStart)
.setSecurityProtocol(clusterConfig.securityProtocol) .setBrokerSecurityProtocol(clusterConfig.brokerSecurityProtocol)
.setListenerName(clusterConfig.listenerName) .setBrokerListenerName(clusterConfig.brokerListenerName)
.setTrustStoreFile(clusterConfig.trustStoreFile) .setTrustStoreFile(clusterConfig.trustStoreFile)
.setMetadataVersion(clusterConfig.metadataVersion) .setMetadataVersion(clusterConfig.metadataVersion)
.setServerProperties(clusterConfig.serverProperties) .setServerProperties(clusterConfig.serverProperties)
@ -215,8 +220,8 @@ public class ClusterConfig {
private int controllers; private int controllers;
private int disksPerBroker; private int disksPerBroker;
private boolean autoStart; private boolean autoStart;
private SecurityProtocol securityProtocol; private SecurityProtocol brokerSecurityProtocol;
private String listenerName; private ListenerName brokerListenerName;
private File trustStoreFile; private File trustStoreFile;
private MetadataVersion metadataVersion; private MetadataVersion metadataVersion;
private Map<String, String> serverProperties = Collections.emptyMap(); private Map<String, String> serverProperties = Collections.emptyMap();
@ -256,13 +261,13 @@ public class ClusterConfig {
return this; return this;
} }
public Builder setSecurityProtocol(SecurityProtocol securityProtocol) { public Builder setBrokerSecurityProtocol(SecurityProtocol securityProtocol) {
this.securityProtocol = securityProtocol; this.brokerSecurityProtocol = securityProtocol;
return this; return this;
} }
public Builder setListenerName(String listenerName) { public Builder setBrokerListenerName(ListenerName listenerName) {
this.listenerName = listenerName; this.brokerListenerName = listenerName;
return this; return this;
} }
@ -324,7 +329,7 @@ public class ClusterConfig {
} }
public ClusterConfig build() { public ClusterConfig build() {
return new ClusterConfig(types, brokers, controllers, disksPerBroker, autoStart, securityProtocol, listenerName, return new ClusterConfig(types, brokers, controllers, disksPerBroker, autoStart, brokerSecurityProtocol, brokerListenerName,
trustStoreFile, metadataVersion, serverProperties, producerProperties, consumerProperties, trustStoreFile, metadataVersion, serverProperties, producerProperties, consumerProperties,
adminClientProperties, saslServerProperties, saslClientProperties, adminClientProperties, saslServerProperties, saslClientProperties,
perServerProperties, tags, features); perServerProperties, tags, features);

View File

@ -30,6 +30,7 @@ import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.METHOD; import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME;
@Documented @Documented
@Target({METHOD}) @Target({METHOD})
@ -43,8 +44,11 @@ public @interface ClusterTest {
int controllers() default 0; int controllers() default 0;
int disksPerBroker() default 0; int disksPerBroker() default 0;
AutoStart autoStart() default AutoStart.DEFAULT; AutoStart autoStart() default AutoStart.DEFAULT;
SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT; // The brokerListenerName and brokerSecurityProtocol configurations must
String listener() default ""; // be kept in sync with the default values in TestKitNodes, as many tests
// directly use TestKitNodes without relying on the ClusterTest annotation.
SecurityProtocol brokerSecurityProtocol() default SecurityProtocol.PLAINTEXT;
String brokerListener() default DEFAULT_BROKER_LISTENER_NAME;
MetadataVersion metadataVersion() default MetadataVersion.IBP_4_0_IV3; MetadataVersion metadataVersion() default MetadataVersion.IBP_4_0_IV3;
ClusterConfigProperty[] serverProperties() default {}; ClusterConfigProperty[] serverProperties() default {};
// users can add tags that they want to display in test // users can add tags that they want to display in test

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.kafka.common.test.api; package org.apache.kafka.common.test.api;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.util.timer.SystemTimer; import org.apache.kafka.server.util.timer.SystemTimer;
@ -248,10 +249,10 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
.setControllers(clusterTest.controllers() == 0 ? defaults.controllers() : clusterTest.controllers()) .setControllers(clusterTest.controllers() == 0 ? defaults.controllers() : clusterTest.controllers())
.setDisksPerBroker(clusterTest.disksPerBroker() == 0 ? defaults.disksPerBroker() : clusterTest.disksPerBroker()) .setDisksPerBroker(clusterTest.disksPerBroker() == 0 ? defaults.disksPerBroker() : clusterTest.disksPerBroker())
.setAutoStart(clusterTest.autoStart() == AutoStart.DEFAULT ? defaults.autoStart() : clusterTest.autoStart() == AutoStart.YES) .setAutoStart(clusterTest.autoStart() == AutoStart.DEFAULT ? defaults.autoStart() : clusterTest.autoStart() == AutoStart.YES)
.setListenerName(clusterTest.listener().trim().isEmpty() ? null : clusterTest.listener()) .setBrokerListenerName(ListenerName.normalised(clusterTest.brokerListener()))
.setServerProperties(serverProperties) .setServerProperties(serverProperties)
.setPerServerProperties(perServerProperties) .setPerServerProperties(perServerProperties)
.setSecurityProtocol(clusterTest.securityProtocol()) .setBrokerSecurityProtocol(clusterTest.brokerSecurityProtocol())
.setMetadataVersion(clusterTest.metadataVersion()) .setMetadataVersion(clusterTest.metadataVersion())
.setTags(Arrays.asList(clusterTest.tags())) .setTags(Arrays.asList(clusterTest.tags()))
.setFeatures(features) .setFeatures(features)

View File

@ -109,10 +109,12 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
private final ConcurrentLinkedQueue<Admin> admins = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<Admin> admins = new ConcurrentLinkedQueue<>();
private KafkaClusterTestKit clusterTestKit; private KafkaClusterTestKit clusterTestKit;
private final boolean isCombined; private final boolean isCombined;
private final ListenerName listenerName;
RaftClusterInstance(ClusterConfig clusterConfig, boolean isCombined) { RaftClusterInstance(ClusterConfig clusterConfig, boolean isCombined) {
this.clusterConfig = clusterConfig; this.clusterConfig = clusterConfig;
this.isCombined = isCombined; this.isCombined = isCombined;
this.listenerName = clusterConfig.brokerListenerName();
} }
@Override @Override
@ -127,7 +129,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override @Override
public ListenerName clientListener() { public ListenerName clientListener() {
return ListenerName.normalised("EXTERNAL"); return listenerName;
} }
@Override @Override
@ -282,7 +284,10 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
.setNumBrokerNodes(clusterConfig.numBrokers()) .setNumBrokerNodes(clusterConfig.numBrokers())
.setNumDisksPerBroker(clusterConfig.numDisksPerBroker()) .setNumDisksPerBroker(clusterConfig.numDisksPerBroker())
.setPerServerProperties(clusterConfig.perServerOverrideProperties()) .setPerServerProperties(clusterConfig.perServerOverrideProperties())
.setNumControllerNodes(clusterConfig.numControllers()).build(); .setNumControllerNodes(clusterConfig.numControllers())
.setBrokerListenerName(listenerName)
.setBrokerSecurityProtocol(clusterConfig.brokerSecurityProtocol())
.build();
KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes); KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes);
// Copy properties into the TestKit builder // Copy properties into the TestKit builder
clusterConfig.serverProperties().forEach(builder::setConfigProp); clusterConfig.serverProperties().forEach(builder::setConfigProp);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.test.api; package org.apache.kafka.common.test.api;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.test.TestUtils; import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
@ -34,6 +35,9 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_SECURITY_PROTOCOL;
public class ClusterConfigTest { public class ClusterConfigTest {
private static Map<String, Object> fields(ClusterConfig config) { private static Map<String, Object> fields(ClusterConfig config) {
@ -54,8 +58,8 @@ public class ClusterConfigTest {
.setDisksPerBroker(1) .setDisksPerBroker(1)
.setAutoStart(true) .setAutoStart(true)
.setTags(Arrays.asList("name", "Generated Test")) .setTags(Arrays.asList("name", "Generated Test"))
.setSecurityProtocol(SecurityProtocol.PLAINTEXT) .setBrokerSecurityProtocol(SecurityProtocol.PLAINTEXT)
.setListenerName("EXTERNAL") .setBrokerListenerName(ListenerName.normalised("EXTERNAL"))
.setTrustStoreFile(trustStoreFile) .setTrustStoreFile(trustStoreFile)
.setMetadataVersion(MetadataVersion.IBP_0_8_0) .setMetadataVersion(MetadataVersion.IBP_0_8_0)
.setServerProperties(Collections.singletonMap("broker", "broker_value")) .setServerProperties(Collections.singletonMap("broker", "broker_value"))
@ -110,6 +114,7 @@ public class ClusterConfigTest {
Assertions.assertTrue(expectedDisplayTags.contains("tag 2")); Assertions.assertTrue(expectedDisplayTags.contains("tag 2"));
Assertions.assertTrue(expectedDisplayTags.contains("tag 3")); Assertions.assertTrue(expectedDisplayTags.contains("tag 3"));
Assertions.assertTrue(expectedDisplayTags.contains("MetadataVersion=" + MetadataVersion.latestTesting())); Assertions.assertTrue(expectedDisplayTags.contains("MetadataVersion=" + MetadataVersion.latestTesting()));
Assertions.assertTrue(expectedDisplayTags.contains("Security=" + SecurityProtocol.PLAINTEXT)); Assertions.assertTrue(expectedDisplayTags.contains("BrokerSecurityProtocol=" + DEFAULT_BROKER_SECURITY_PROTOCOL));
Assertions.assertTrue(expectedDisplayTags.contains("BrokerListenerName=" + ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME)));
} }
} }

View File

@ -143,13 +143,9 @@ public class GetOffsetShellTest {
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "EXTERNAL"); serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "EXTERNAL");
return Collections.singletonList( return Collections.singletonList(
// we set REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP to EXTERNAL, so we need to
// align listener name here as KafkaClusterTestKit (KRAFT/CO_KRAFT) the default
// broker listener name is EXTERNAL while in ZK it is PLAINTEXT
ClusterConfig.defaultBuilder() ClusterConfig.defaultBuilder()
.setTypes(Stream.of(KRAFT, CO_KRAFT).collect(Collectors.toSet())) .setTypes(Stream.of(KRAFT, CO_KRAFT).collect(Collectors.toSet()))
.setServerProperties(serverProperties) .setServerProperties(serverProperties)
.setListenerName("EXTERNAL")
.build()); .build());
} }