diff --git a/checkstyle/import-control-test-common.xml b/checkstyle/import-control-test-common.xml
index 9fe7f4d4844..9520c0b21b8 100644
--- a/checkstyle/import-control-test-common.xml
+++ b/checkstyle/import-control-test-common.xml
@@ -24,4 +24,5 @@
+
diff --git a/test-common/src/main/java/org/apache/kafka/common/test/JaasModule.java b/test-common/src/main/java/org/apache/kafka/common/test/JaasModule.java
new file mode 100644
index 00000000000..8d70b36dc23
--- /dev/null
+++ b/test-common/src/main/java/org/apache/kafka/common/test/JaasModule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public record JaasModule(String name, boolean debug, Map entries) {
+
+ public static JaasModule plainLoginModule(String username, String password, boolean debug, Map validUsers) {
+ String name = "org.apache.kafka.common.security.plain.PlainLoginModule";
+
+ Map entries = new HashMap<>();
+ entries.put("username", username);
+ entries.put("password", password);
+ validUsers.forEach((user, pass) -> entries.put("user_" + user, pass));
+
+ return new JaasModule(
+ name,
+ debug,
+ entries
+ );
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s required%n debug=%b%n %s;%n", name, debug, entries.entrySet().stream()
+ .map(e -> e.getKey() + "=\"" + e.getValue() + "\"")
+ .collect(Collectors.joining("\n ")));
+ }
+}
diff --git a/test-common/src/main/java/org/apache/kafka/common/test/JaasUtils.java b/test-common/src/main/java/org/apache/kafka/common/test/JaasUtils.java
new file mode 100644
index 00000000000..77e904784f6
--- /dev/null
+++ b/test-common/src/main/java/org/apache/kafka/common/test/JaasUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.security.auth.login.Configuration;
+
+public class JaasUtils {
+ public record JaasSection(String contextName, List modules) {
+ @Override
+ public String toString() {
+ return String.format(
+ "%s {%n %s%n};%n",
+ contextName,
+ modules.stream().map(Object::toString).collect(Collectors.joining("\n "))
+ );
+ }
+ }
+
+ public static final String KAFKA_SERVER_CONTEXT_NAME = "KafkaServer";
+
+ public static final String KAFKA_PLAIN_USER1 = "plain-user1";
+ public static final String KAFKA_PLAIN_USER1_PASSWORD = "plain-user1-secret";
+ public static final String KAFKA_PLAIN_ADMIN = "plain-admin";
+ public static final String KAFKA_PLAIN_ADMIN_PASSWORD = "plain-admin-secret";
+
+ public static File writeJaasContextsToFile(Set jaasSections) throws IOException {
+ File jaasFile = TestUtils.tempFile();
+ try (FileOutputStream fileStream = new FileOutputStream(jaasFile);
+ OutputStreamWriter writer = new OutputStreamWriter(fileStream, StandardCharsets.UTF_8);) {
+ writer.write(String.join("", jaasSections.stream().map(Object::toString).toArray(String[]::new)));
+ }
+ return jaasFile;
+ }
+
+ public static void refreshJavaLoginConfigParam(File file) {
+ System.setProperty(org.apache.kafka.common.security.JaasUtils.JAVA_LOGIN_CONFIG_PARAM, file.getAbsolutePath());
+ // This will cause a reload of the Configuration singleton when `getConfiguration` is called
+ Configuration.setConfiguration(null);
+ }
+}
diff --git a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 2face50ca2f..097f8c3e26d 100644
--- a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++ b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -27,12 +27,15 @@ import kafka.server.SharedServer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.metadata.storage.Formatter;
import org.apache.kafka.network.SocketServerConfigs;
@@ -63,6 +66,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -138,6 +142,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
if (controllerNode != null) {
props.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG,
controllerNode.metadataDirectory());
+ setSecurityProtocolProps(props, controllerSecurityProtocol);
} else {
props.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG,
node.metadataDirectory());
@@ -146,6 +151,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
// Set the log.dirs according to the broker node setting (if there is a broker node)
props.put(LOG_DIRS_CONFIG,
String.join(",", brokerNode.logDataDirectories()));
+ setSecurityProtocolProps(props, brokerSecurityProtocol);
} else {
// Set log.dirs equal to the metadata directory if there is just a controller.
props.put(LOG_DIRS_CONFIG,
@@ -189,11 +195,40 @@ public class KafkaClusterTestKit implements AutoCloseable {
return new KafkaConfig(props, false);
}
+ private void setSecurityProtocolProps(Map props, String securityProtocol) {
+ if (securityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) {
+ props.putIfAbsent(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, "PLAIN");
+ props.putIfAbsent(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, "PLAIN");
+ props.putIfAbsent(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, "PLAIN");
+ props.putIfAbsent(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, StandardAuthorizer.class.getName());
+ props.putIfAbsent(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "false");
+ props.putIfAbsent(StandardAuthorizer.SUPER_USERS_CONFIG, "User:" + JaasUtils.KAFKA_PLAIN_ADMIN);
+ }
+ }
+
public KafkaClusterTestKit build() throws Exception {
Map controllers = new HashMap<>();
Map brokers = new HashMap<>();
Map jointServers = new HashMap<>();
File baseDirectory = null;
+ File jaasFile = null;
+
+ if (brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) {
+ jaasFile = JaasUtils.writeJaasContextsToFile(Set.of(
+ new JaasUtils.JaasSection(JaasUtils.KAFKA_SERVER_CONTEXT_NAME,
+ List.of(
+ JaasModule.plainLoginModule(
+ JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD,
+ true,
+ Map.of(
+ JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD,
+ JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD)
+ )
+ )
+ )
+ ));
+ JaasUtils.refreshJavaLoginConfigParam(jaasFile);
+ }
try {
baseDirectory = new File(nodes.baseDirectory());
@@ -272,7 +307,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
brokers,
baseDirectory,
faultHandlerFactory,
- socketFactoryManager);
+ socketFactoryManager,
+ jaasFile == null ? Optional.empty() : Optional.of(jaasFile));
}
private String listeners(int node) {
@@ -316,6 +352,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
private final SimpleFaultHandlerFactory faultHandlerFactory;
private final PreboundSocketFactoryManager socketFactoryManager;
private final String controllerListenerName;
+ private final Optional jaasFile;
private KafkaClusterTestKit(
TestKitNodes nodes,
@@ -323,7 +360,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
Map brokers,
File baseDirectory,
SimpleFaultHandlerFactory faultHandlerFactory,
- PreboundSocketFactoryManager socketFactoryManager
+ PreboundSocketFactoryManager socketFactoryManager,
+ Optional jaasFile
) {
/*
Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
@@ -339,6 +377,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
this.faultHandlerFactory = faultHandlerFactory;
this.socketFactoryManager = socketFactoryManager;
this.controllerListenerName = nodes.controllerListenerName().value();
+ this.jaasFile = jaasFile;
}
public void format() throws Exception {
@@ -602,6 +641,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
waitForAllFutures(futureEntries);
futureEntries.clear();
Utils.delete(baseDirectory);
+ if (jaasFile.isPresent()) {
+ Utils.delete(jaasFile.get());
+ }
} catch (Exception e) {
for (Entry> entry : futureEntries) {
entry.getValue().cancel(true);
diff --git a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index ad0c43fdbbf..b6d6f9f69fc 100644
--- a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++ b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -158,8 +158,9 @@ public class TestKitNodes {
throw new IllegalArgumentException("Invalid value for numDisksPerBroker");
}
// TODO: remove this assertion after https://issues.apache.org/jira/browse/KAFKA-16680 is finished
- if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT || controllerSecurityProtocol != SecurityProtocol.PLAINTEXT) {
- throw new IllegalArgumentException("Currently only support PLAINTEXT security protocol");
+ if ((brokerSecurityProtocol != SecurityProtocol.PLAINTEXT && brokerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT) ||
+ (controllerSecurityProtocol != SecurityProtocol.PLAINTEXT && controllerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT)) {
+ throw new IllegalArgumentException("Currently only support PLAINTEXT / SASL_PLAINTEXT security protocol");
}
if (baseDirectory == null) {
this.baseDirectory = TestUtils.tempDirectory().toPath();
diff --git a/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java b/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
index c9adbe3431b..b0bb8afa22c 100644
--- a/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
+++ b/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
@@ -32,11 +32,11 @@ public class TestKitNodeTest {
@ParameterizedTest
@EnumSource(SecurityProtocol.class)
public void testSecurityProtocol(SecurityProtocol securityProtocol) {
- if (securityProtocol != SecurityProtocol.PLAINTEXT) {
- assertEquals("Currently only support PLAINTEXT security protocol",
+ if (securityProtocol != SecurityProtocol.PLAINTEXT && securityProtocol != SecurityProtocol.SASL_PLAINTEXT) {
+ assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT security protocol",
assertThrows(IllegalArgumentException.class,
() -> new TestKitNodes.Builder().setBrokerSecurityProtocol(securityProtocol).build()).getMessage());
- assertEquals("Currently only support PLAINTEXT security protocol",
+ assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT security protocol",
assertThrows(IllegalArgumentException.class,
() -> new TestKitNodes.Builder().setControllerSecurityProtocol(securityProtocol).build()).getMessage());
}
diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
index 126975a6719..1c8551bf9e6 100644
--- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
+++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
@@ -38,9 +38,12 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.test.JaasUtils;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.fault.FaultHandlerException;
@@ -164,7 +167,7 @@ public interface ClusterInstance {
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
- return new KafkaProducer<>(props);
+ return new KafkaProducer<>(setClientSaslConfig(props));
}
default Producer producer() {
@@ -178,7 +181,7 @@ public interface ClusterInstance {
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + TestUtils.randomString(5));
props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
- return new KafkaConsumer<>(props);
+ return new KafkaConsumer<>(setClientSaslConfig(props));
}
default Consumer consumer() {
@@ -194,7 +197,23 @@ public interface ClusterInstance {
props.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
props.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
}
- return Admin.create(props);
+ return Admin.create(setClientSaslConfig(props));
+ }
+
+ default Map setClientSaslConfig(Map configs) {
+ Map props = new HashMap<>(configs);
+ if (config().brokerSecurityProtocol() == SecurityProtocol.SASL_PLAINTEXT) {
+ props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
+ props.putIfAbsent(SaslConfigs.SASL_MECHANISM, "PLAIN");
+ props.putIfAbsent(
+ SaslConfigs.SASL_JAAS_CONFIG,
+ String.format(
+ "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
+ JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD
+ )
+ );
+ }
+ return props;
}
default Admin admin(Map configs) {
diff --git a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
index 2a08d4e58eb..20d2d3fd696 100644
--- a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
+++ b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
@@ -20,18 +20,29 @@ package org.apache.kafka.common.test.api;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.DescribeAclsOptions;
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.test.JaasUtils;
import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.server.common.MetadataVersion;
@@ -48,7 +59,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
@@ -63,7 +76,9 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_CO
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
@ClusterTestDefaults(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = "default.key", value = "default.value"),
@@ -273,12 +288,12 @@ public class ClusterTestExtensionsTest {
String value = "value";
try (Admin adminClient = cluster.admin();
Producer producer = cluster.producer(Map.of(
- ACKS_CONFIG, "all",
- KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
- VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()));
+ ACKS_CONFIG, "all",
+ KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+ VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()));
Consumer consumer = cluster.consumer(Map.of(
- KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
- VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()))
+ KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
+ VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()))
) {
adminClient.createTopics(singleton(new NewTopic(topic, 1, (short) 1)));
assertNotNull(producer);
@@ -331,4 +346,143 @@ public class ClusterTestExtensionsTest {
assertEquals(1, admin.describeMetadataQuorum().quorumInfo().get().nodes().size());
}
}
+
+ @ClusterTest(
+ types = {Type.KRAFT, Type.CO_KRAFT},
+ brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
+ controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
+ serverProperties = {
+ @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ }
+ )
+ public void testSaslPlaintext(ClusterInstance clusterInstance) throws CancellationException, ExecutionException, InterruptedException {
+ Assertions.assertEquals(SecurityProtocol.SASL_PLAINTEXT, clusterInstance.config().brokerSecurityProtocol());
+
+ // default ClusterInstance#admin helper with admin credentials
+ try (Admin admin = clusterInstance.admin()) {
+ admin.describeAcls(AclBindingFilter.ANY).values().get();
+ }
+ String topic = "sasl-plaintext-topic";
+ clusterInstance.createTopic(topic, 1, (short) 1);
+ try (Producer producer = clusterInstance.producer()) {
+ producer.send(new ProducerRecord<>(topic, Utils.utf8("key"), Utils.utf8("value"))).get();
+ producer.flush();
+ }
+ try (Consumer consumer = clusterInstance.consumer()) {
+ consumer.subscribe(List.of(topic));
+ TestUtils.waitForCondition(() -> {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+ return records.count() == 1;
+ }, "Failed to receive message");
+ }
+
+ // client with non-admin credentials
+ Map nonAdminConfig = Map.of(
+ SaslConfigs.SASL_JAAS_CONFIG,
+ String.format(
+ "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
+ JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD
+ )
+ );
+ try (Admin admin = clusterInstance.admin(nonAdminConfig)) {
+ ExecutionException exception = assertThrows(
+ ExecutionException.class,
+ () -> admin.describeAcls(AclBindingFilter.ANY).values().get()
+ );
+ assertInstanceOf(ClusterAuthorizationException.class, exception.getCause());
+ }
+ try (Producer producer = clusterInstance.producer(nonAdminConfig)) {
+ ExecutionException exception = assertThrows(
+ ExecutionException.class,
+ () -> producer.send(new ProducerRecord<>(topic, Utils.utf8("key"), Utils.utf8("value"))).get()
+ );
+ assertInstanceOf(TopicAuthorizationException.class, exception.getCause());
+ }
+ try (Consumer consumer = clusterInstance.consumer(nonAdminConfig)) {
+ consumer.subscribe(List.of(topic));
+ AtomicBoolean hasException = new AtomicBoolean(false);
+ TestUtils.waitForCondition(() -> {
+ if (hasException.get()) {
+ return true;
+ }
+ try {
+ consumer.poll(Duration.ofMillis(100));
+ } catch (TopicAuthorizationException e) {
+ hasException.set(true);
+ }
+ return false;
+ }, "Failed to get exception");
+ }
+
+ // client with unknown credentials
+ Map unknownUserConfig = Map.of(
+ SaslConfigs.SASL_JAAS_CONFIG,
+ String.format(
+ "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
+ "unknown", "unknown"
+ )
+ );
+ try (Admin admin = clusterInstance.admin(unknownUserConfig)) {
+ ExecutionException exception = assertThrows(
+ ExecutionException.class,
+ () -> admin.describeAcls(AclBindingFilter.ANY).values().get()
+ );
+ assertInstanceOf(SaslAuthenticationException.class, exception.getCause());
+ }
+ try (Producer producer = clusterInstance.producer(unknownUserConfig)) {
+ ExecutionException exception = assertThrows(
+ ExecutionException.class,
+ () -> producer.send(new ProducerRecord<>(topic, Utils.utf8("key"), Utils.utf8("value"))).get()
+ );
+ assertInstanceOf(SaslAuthenticationException.class, exception.getCause());
+ }
+ try (Consumer consumer = clusterInstance.consumer(unknownUserConfig)) {
+ consumer.subscribe(List.of(topic));
+ AtomicBoolean hasException = new AtomicBoolean(false);
+ TestUtils.waitForCondition(() -> {
+ if (hasException.get()) {
+ return true;
+ }
+ try {
+ consumer.poll(Duration.ofMillis(100));
+ } catch (SaslAuthenticationException e) {
+ hasException.set(true);
+ }
+ return false;
+ }, "Failed to get exception");
+ }
+ }
+
+ @ClusterTest(
+ types = {Type.KRAFT, Type.CO_KRAFT},
+ brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
+ controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
+ serverProperties = {
+ @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ }
+ )
+ public void testSaslPlaintextWithController(ClusterInstance clusterInstance) throws CancellationException, ExecutionException, InterruptedException {
+ // test with admin
+ try (Admin admin = clusterInstance.admin(Map.of(), true)) {
+ admin.describeAcls(AclBindingFilter.ANY).values().get();
+ }
+
+ // test with non-admin
+ Map nonAdminConfig = Map.of(
+ SaslConfigs.SASL_JAAS_CONFIG,
+ String.format(
+ "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
+ JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD
+ )
+ );
+ try (Admin admin = clusterInstance.admin(nonAdminConfig, true)) {
+ ExecutionException exception = assertThrows(
+ ExecutionException.class,
+ () -> admin.describeAcls(AclBindingFilter.ANY, new DescribeAclsOptions().timeoutMs(5000)).values().get()
+ );
+ assertInstanceOf(TimeoutException.class, exception.getCause());
+ }
+ }
}