mirror of https://github.com/apache/kafka.git
KAFKA-17921 Support SASL_PLAINTEXT protocol with java.security.auth.login.config (#17671)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
9d93a4f68f
commit
228b3252f6
|
@ -24,4 +24,5 @@
|
||||||
<allow pkg="org" />
|
<allow pkg="org" />
|
||||||
<allow pkg="kafka" />
|
<allow pkg="kafka" />
|
||||||
<allow pkg="scala.jdk.javaapi" />
|
<allow pkg="scala.jdk.javaapi" />
|
||||||
|
<allow pkg="javax.security" />
|
||||||
</import-control>
|
</import-control>
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.common.test;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
public record JaasModule(String name, boolean debug, Map<String, String> entries) {
|
||||||
|
|
||||||
|
public static JaasModule plainLoginModule(String username, String password, boolean debug, Map<String, String> validUsers) {
|
||||||
|
String name = "org.apache.kafka.common.security.plain.PlainLoginModule";
|
||||||
|
|
||||||
|
Map<String, String> entries = new HashMap<>();
|
||||||
|
entries.put("username", username);
|
||||||
|
entries.put("password", password);
|
||||||
|
validUsers.forEach((user, pass) -> entries.put("user_" + user, pass));
|
||||||
|
|
||||||
|
return new JaasModule(
|
||||||
|
name,
|
||||||
|
debug,
|
||||||
|
entries
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%s required%n debug=%b%n %s;%n", name, debug, entries.entrySet().stream()
|
||||||
|
.map(e -> e.getKey() + "=\"" + e.getValue() + "\"")
|
||||||
|
.collect(Collectors.joining("\n ")));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,63 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.common.test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStreamWriter;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import javax.security.auth.login.Configuration;
|
||||||
|
|
||||||
|
public class JaasUtils {
|
||||||
|
public record JaasSection(String contextName, List<JaasModule> modules) {
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format(
|
||||||
|
"%s {%n %s%n};%n",
|
||||||
|
contextName,
|
||||||
|
modules.stream().map(Object::toString).collect(Collectors.joining("\n "))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final String KAFKA_SERVER_CONTEXT_NAME = "KafkaServer";
|
||||||
|
|
||||||
|
public static final String KAFKA_PLAIN_USER1 = "plain-user1";
|
||||||
|
public static final String KAFKA_PLAIN_USER1_PASSWORD = "plain-user1-secret";
|
||||||
|
public static final String KAFKA_PLAIN_ADMIN = "plain-admin";
|
||||||
|
public static final String KAFKA_PLAIN_ADMIN_PASSWORD = "plain-admin-secret";
|
||||||
|
|
||||||
|
public static File writeJaasContextsToFile(Set<JaasSection> jaasSections) throws IOException {
|
||||||
|
File jaasFile = TestUtils.tempFile();
|
||||||
|
try (FileOutputStream fileStream = new FileOutputStream(jaasFile);
|
||||||
|
OutputStreamWriter writer = new OutputStreamWriter(fileStream, StandardCharsets.UTF_8);) {
|
||||||
|
writer.write(String.join("", jaasSections.stream().map(Object::toString).toArray(String[]::new)));
|
||||||
|
}
|
||||||
|
return jaasFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void refreshJavaLoginConfigParam(File file) {
|
||||||
|
System.setProperty(org.apache.kafka.common.security.JaasUtils.JAVA_LOGIN_CONFIG_PARAM, file.getAbsolutePath());
|
||||||
|
// This will cause a reload of the Configuration singleton when `getConfiguration` is called
|
||||||
|
Configuration.setConfiguration(null);
|
||||||
|
}
|
||||||
|
}
|
|
@ -27,12 +27,15 @@ import kafka.server.SharedServer;
|
||||||
|
|
||||||
import org.apache.kafka.clients.CommonClientConfigs;
|
import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||||
|
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.network.ListenerName;
|
import org.apache.kafka.common.network.ListenerName;
|
||||||
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
import org.apache.kafka.common.utils.ThreadUtils;
|
import org.apache.kafka.common.utils.ThreadUtils;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
import org.apache.kafka.controller.Controller;
|
import org.apache.kafka.controller.Controller;
|
||||||
|
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
|
||||||
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
|
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
|
||||||
import org.apache.kafka.metadata.storage.Formatter;
|
import org.apache.kafka.metadata.storage.Formatter;
|
||||||
import org.apache.kafka.network.SocketServerConfigs;
|
import org.apache.kafka.network.SocketServerConfigs;
|
||||||
|
@ -63,6 +66,7 @@ import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
@ -138,6 +142,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
if (controllerNode != null) {
|
if (controllerNode != null) {
|
||||||
props.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG,
|
props.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG,
|
||||||
controllerNode.metadataDirectory());
|
controllerNode.metadataDirectory());
|
||||||
|
setSecurityProtocolProps(props, controllerSecurityProtocol);
|
||||||
} else {
|
} else {
|
||||||
props.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG,
|
props.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG,
|
||||||
node.metadataDirectory());
|
node.metadataDirectory());
|
||||||
|
@ -146,6 +151,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
// Set the log.dirs according to the broker node setting (if there is a broker node)
|
// Set the log.dirs according to the broker node setting (if there is a broker node)
|
||||||
props.put(LOG_DIRS_CONFIG,
|
props.put(LOG_DIRS_CONFIG,
|
||||||
String.join(",", brokerNode.logDataDirectories()));
|
String.join(",", brokerNode.logDataDirectories()));
|
||||||
|
setSecurityProtocolProps(props, brokerSecurityProtocol);
|
||||||
} else {
|
} else {
|
||||||
// Set log.dirs equal to the metadata directory if there is just a controller.
|
// Set log.dirs equal to the metadata directory if there is just a controller.
|
||||||
props.put(LOG_DIRS_CONFIG,
|
props.put(LOG_DIRS_CONFIG,
|
||||||
|
@ -189,11 +195,40 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
return new KafkaConfig(props, false);
|
return new KafkaConfig(props, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setSecurityProtocolProps(Map<String, Object> props, String securityProtocol) {
|
||||||
|
if (securityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) {
|
||||||
|
props.putIfAbsent(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, "PLAIN");
|
||||||
|
props.putIfAbsent(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, "PLAIN");
|
||||||
|
props.putIfAbsent(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, "PLAIN");
|
||||||
|
props.putIfAbsent(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, StandardAuthorizer.class.getName());
|
||||||
|
props.putIfAbsent(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "false");
|
||||||
|
props.putIfAbsent(StandardAuthorizer.SUPER_USERS_CONFIG, "User:" + JaasUtils.KAFKA_PLAIN_ADMIN);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public KafkaClusterTestKit build() throws Exception {
|
public KafkaClusterTestKit build() throws Exception {
|
||||||
Map<Integer, ControllerServer> controllers = new HashMap<>();
|
Map<Integer, ControllerServer> controllers = new HashMap<>();
|
||||||
Map<Integer, BrokerServer> brokers = new HashMap<>();
|
Map<Integer, BrokerServer> brokers = new HashMap<>();
|
||||||
Map<Integer, SharedServer> jointServers = new HashMap<>();
|
Map<Integer, SharedServer> jointServers = new HashMap<>();
|
||||||
File baseDirectory = null;
|
File baseDirectory = null;
|
||||||
|
File jaasFile = null;
|
||||||
|
|
||||||
|
if (brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) {
|
||||||
|
jaasFile = JaasUtils.writeJaasContextsToFile(Set.of(
|
||||||
|
new JaasUtils.JaasSection(JaasUtils.KAFKA_SERVER_CONTEXT_NAME,
|
||||||
|
List.of(
|
||||||
|
JaasModule.plainLoginModule(
|
||||||
|
JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD,
|
||||||
|
true,
|
||||||
|
Map.of(
|
||||||
|
JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD,
|
||||||
|
JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
));
|
||||||
|
JaasUtils.refreshJavaLoginConfigParam(jaasFile);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
baseDirectory = new File(nodes.baseDirectory());
|
baseDirectory = new File(nodes.baseDirectory());
|
||||||
|
@ -272,7 +307,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
brokers,
|
brokers,
|
||||||
baseDirectory,
|
baseDirectory,
|
||||||
faultHandlerFactory,
|
faultHandlerFactory,
|
||||||
socketFactoryManager);
|
socketFactoryManager,
|
||||||
|
jaasFile == null ? Optional.empty() : Optional.of(jaasFile));
|
||||||
}
|
}
|
||||||
|
|
||||||
private String listeners(int node) {
|
private String listeners(int node) {
|
||||||
|
@ -316,6 +352,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
private final SimpleFaultHandlerFactory faultHandlerFactory;
|
private final SimpleFaultHandlerFactory faultHandlerFactory;
|
||||||
private final PreboundSocketFactoryManager socketFactoryManager;
|
private final PreboundSocketFactoryManager socketFactoryManager;
|
||||||
private final String controllerListenerName;
|
private final String controllerListenerName;
|
||||||
|
private final Optional<File> jaasFile;
|
||||||
|
|
||||||
private KafkaClusterTestKit(
|
private KafkaClusterTestKit(
|
||||||
TestKitNodes nodes,
|
TestKitNodes nodes,
|
||||||
|
@ -323,7 +360,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
Map<Integer, BrokerServer> brokers,
|
Map<Integer, BrokerServer> brokers,
|
||||||
File baseDirectory,
|
File baseDirectory,
|
||||||
SimpleFaultHandlerFactory faultHandlerFactory,
|
SimpleFaultHandlerFactory faultHandlerFactory,
|
||||||
PreboundSocketFactoryManager socketFactoryManager
|
PreboundSocketFactoryManager socketFactoryManager,
|
||||||
|
Optional<File> jaasFile
|
||||||
) {
|
) {
|
||||||
/*
|
/*
|
||||||
Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
|
Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
|
||||||
|
@ -339,6 +377,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
this.faultHandlerFactory = faultHandlerFactory;
|
this.faultHandlerFactory = faultHandlerFactory;
|
||||||
this.socketFactoryManager = socketFactoryManager;
|
this.socketFactoryManager = socketFactoryManager;
|
||||||
this.controllerListenerName = nodes.controllerListenerName().value();
|
this.controllerListenerName = nodes.controllerListenerName().value();
|
||||||
|
this.jaasFile = jaasFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void format() throws Exception {
|
public void format() throws Exception {
|
||||||
|
@ -602,6 +641,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
waitForAllFutures(futureEntries);
|
waitForAllFutures(futureEntries);
|
||||||
futureEntries.clear();
|
futureEntries.clear();
|
||||||
Utils.delete(baseDirectory);
|
Utils.delete(baseDirectory);
|
||||||
|
if (jaasFile.isPresent()) {
|
||||||
|
Utils.delete(jaasFile.get());
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
for (Entry<String, Future<?>> entry : futureEntries) {
|
for (Entry<String, Future<?>> entry : futureEntries) {
|
||||||
entry.getValue().cancel(true);
|
entry.getValue().cancel(true);
|
||||||
|
|
|
@ -158,8 +158,9 @@ public class TestKitNodes {
|
||||||
throw new IllegalArgumentException("Invalid value for numDisksPerBroker");
|
throw new IllegalArgumentException("Invalid value for numDisksPerBroker");
|
||||||
}
|
}
|
||||||
// TODO: remove this assertion after https://issues.apache.org/jira/browse/KAFKA-16680 is finished
|
// TODO: remove this assertion after https://issues.apache.org/jira/browse/KAFKA-16680 is finished
|
||||||
if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT || controllerSecurityProtocol != SecurityProtocol.PLAINTEXT) {
|
if ((brokerSecurityProtocol != SecurityProtocol.PLAINTEXT && brokerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT) ||
|
||||||
throw new IllegalArgumentException("Currently only support PLAINTEXT security protocol");
|
(controllerSecurityProtocol != SecurityProtocol.PLAINTEXT && controllerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT)) {
|
||||||
|
throw new IllegalArgumentException("Currently only support PLAINTEXT / SASL_PLAINTEXT security protocol");
|
||||||
}
|
}
|
||||||
if (baseDirectory == null) {
|
if (baseDirectory == null) {
|
||||||
this.baseDirectory = TestUtils.tempDirectory().toPath();
|
this.baseDirectory = TestUtils.tempDirectory().toPath();
|
||||||
|
|
|
@ -32,11 +32,11 @@ public class TestKitNodeTest {
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@EnumSource(SecurityProtocol.class)
|
@EnumSource(SecurityProtocol.class)
|
||||||
public void testSecurityProtocol(SecurityProtocol securityProtocol) {
|
public void testSecurityProtocol(SecurityProtocol securityProtocol) {
|
||||||
if (securityProtocol != SecurityProtocol.PLAINTEXT) {
|
if (securityProtocol != SecurityProtocol.PLAINTEXT && securityProtocol != SecurityProtocol.SASL_PLAINTEXT) {
|
||||||
assertEquals("Currently only support PLAINTEXT security protocol",
|
assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT security protocol",
|
||||||
assertThrows(IllegalArgumentException.class,
|
assertThrows(IllegalArgumentException.class,
|
||||||
() -> new TestKitNodes.Builder().setBrokerSecurityProtocol(securityProtocol).build()).getMessage());
|
() -> new TestKitNodes.Builder().setBrokerSecurityProtocol(securityProtocol).build()).getMessage());
|
||||||
assertEquals("Currently only support PLAINTEXT security protocol",
|
assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT security protocol",
|
||||||
assertThrows(IllegalArgumentException.class,
|
assertThrows(IllegalArgumentException.class,
|
||||||
() -> new TestKitNodes.Builder().setControllerSecurityProtocol(securityProtocol).build()).getMessage());
|
() -> new TestKitNodes.Builder().setControllerSecurityProtocol(securityProtocol).build()).getMessage());
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,9 +38,12 @@ import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.common.acl.AccessControlEntry;
|
import org.apache.kafka.common.acl.AccessControlEntry;
|
||||||
import org.apache.kafka.common.acl.AclBindingFilter;
|
import org.apache.kafka.common.acl.AclBindingFilter;
|
||||||
|
import org.apache.kafka.common.config.SaslConfigs;
|
||||||
import org.apache.kafka.common.network.ListenerName;
|
import org.apache.kafka.common.network.ListenerName;
|
||||||
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||||
|
import org.apache.kafka.common.test.JaasUtils;
|
||||||
import org.apache.kafka.common.test.TestUtils;
|
import org.apache.kafka.common.test.TestUtils;
|
||||||
import org.apache.kafka.server.authorizer.Authorizer;
|
import org.apache.kafka.server.authorizer.Authorizer;
|
||||||
import org.apache.kafka.server.fault.FaultHandlerException;
|
import org.apache.kafka.server.fault.FaultHandlerException;
|
||||||
|
@ -164,7 +167,7 @@ public interface ClusterInstance {
|
||||||
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
|
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
|
||||||
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
|
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
|
||||||
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
|
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
|
||||||
return new KafkaProducer<>(props);
|
return new KafkaProducer<>(setClientSaslConfig(props));
|
||||||
}
|
}
|
||||||
|
|
||||||
default <K, V> Producer<K, V> producer() {
|
default <K, V> Producer<K, V> producer() {
|
||||||
|
@ -178,7 +181,7 @@ public interface ClusterInstance {
|
||||||
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + TestUtils.randomString(5));
|
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + TestUtils.randomString(5));
|
||||||
props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
|
props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
|
||||||
return new KafkaConsumer<>(props);
|
return new KafkaConsumer<>(setClientSaslConfig(props));
|
||||||
}
|
}
|
||||||
|
|
||||||
default <K, V> Consumer<K, V> consumer() {
|
default <K, V> Consumer<K, V> consumer() {
|
||||||
|
@ -194,7 +197,23 @@ public interface ClusterInstance {
|
||||||
props.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
|
props.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
|
||||||
props.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
|
props.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
|
||||||
}
|
}
|
||||||
return Admin.create(props);
|
return Admin.create(setClientSaslConfig(props));
|
||||||
|
}
|
||||||
|
|
||||||
|
default Map<String, Object> setClientSaslConfig(Map<String, Object> configs) {
|
||||||
|
Map<String, Object> props = new HashMap<>(configs);
|
||||||
|
if (config().brokerSecurityProtocol() == SecurityProtocol.SASL_PLAINTEXT) {
|
||||||
|
props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
|
||||||
|
props.putIfAbsent(SaslConfigs.SASL_MECHANISM, "PLAIN");
|
||||||
|
props.putIfAbsent(
|
||||||
|
SaslConfigs.SASL_JAAS_CONFIG,
|
||||||
|
String.format(
|
||||||
|
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
|
||||||
|
JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return props;
|
||||||
}
|
}
|
||||||
|
|
||||||
default Admin admin(Map<String, Object> configs) {
|
default Admin admin(Map<String, Object> configs) {
|
||||||
|
|
|
@ -20,18 +20,29 @@ package org.apache.kafka.common.test.api;
|
||||||
import org.apache.kafka.clients.admin.Admin;
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||||
import org.apache.kafka.clients.admin.Config;
|
import org.apache.kafka.clients.admin.Config;
|
||||||
|
import org.apache.kafka.clients.admin.DescribeAclsOptions;
|
||||||
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
|
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
|
||||||
import org.apache.kafka.clients.admin.NewTopic;
|
import org.apache.kafka.clients.admin.NewTopic;
|
||||||
import org.apache.kafka.clients.consumer.Consumer;
|
import org.apache.kafka.clients.consumer.Consumer;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
import org.apache.kafka.clients.consumer.GroupProtocol;
|
import org.apache.kafka.clients.consumer.GroupProtocol;
|
||||||
import org.apache.kafka.clients.producer.Producer;
|
import org.apache.kafka.clients.producer.Producer;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
import org.apache.kafka.common.TopicPartitionInfo;
|
import org.apache.kafka.common.TopicPartitionInfo;
|
||||||
|
import org.apache.kafka.common.acl.AclBindingFilter;
|
||||||
import org.apache.kafka.common.config.ConfigResource;
|
import org.apache.kafka.common.config.ConfigResource;
|
||||||
|
import org.apache.kafka.common.config.SaslConfigs;
|
||||||
|
import org.apache.kafka.common.errors.ClusterAuthorizationException;
|
||||||
|
import org.apache.kafka.common.errors.SaslAuthenticationException;
|
||||||
|
import org.apache.kafka.common.errors.TimeoutException;
|
||||||
|
import org.apache.kafka.common.errors.TopicAuthorizationException;
|
||||||
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
import org.apache.kafka.common.serialization.StringSerializer;
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
|
import org.apache.kafka.common.test.JaasUtils;
|
||||||
import org.apache.kafka.common.test.TestUtils;
|
import org.apache.kafka.common.test.TestUtils;
|
||||||
|
import org.apache.kafka.common.utils.Utils;
|
||||||
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
|
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
|
||||||
import org.apache.kafka.server.common.MetadataVersion;
|
import org.apache.kafka.server.common.MetadataVersion;
|
||||||
|
|
||||||
|
@ -48,7 +59,9 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CancellationException;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import static java.util.Collections.singleton;
|
import static java.util.Collections.singleton;
|
||||||
import static java.util.Collections.singletonList;
|
import static java.util.Collections.singletonList;
|
||||||
|
@ -63,7 +76,9 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_CO
|
||||||
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
|
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
|
||||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
|
||||||
@ClusterTestDefaults(types = {Type.KRAFT}, serverProperties = {
|
@ClusterTestDefaults(types = {Type.KRAFT}, serverProperties = {
|
||||||
@ClusterConfigProperty(key = "default.key", value = "default.value"),
|
@ClusterConfigProperty(key = "default.key", value = "default.value"),
|
||||||
|
@ -331,4 +346,143 @@ public class ClusterTestExtensionsTest {
|
||||||
assertEquals(1, admin.describeMetadataQuorum().quorumInfo().get().nodes().size());
|
assertEquals(1, admin.describeMetadataQuorum().quorumInfo().get().nodes().size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ClusterTest(
|
||||||
|
types = {Type.KRAFT, Type.CO_KRAFT},
|
||||||
|
brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
|
||||||
|
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
|
||||||
|
serverProperties = {
|
||||||
|
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
|
||||||
|
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
|
||||||
|
}
|
||||||
|
)
|
||||||
|
public void testSaslPlaintext(ClusterInstance clusterInstance) throws CancellationException, ExecutionException, InterruptedException {
|
||||||
|
Assertions.assertEquals(SecurityProtocol.SASL_PLAINTEXT, clusterInstance.config().brokerSecurityProtocol());
|
||||||
|
|
||||||
|
// default ClusterInstance#admin helper with admin credentials
|
||||||
|
try (Admin admin = clusterInstance.admin()) {
|
||||||
|
admin.describeAcls(AclBindingFilter.ANY).values().get();
|
||||||
|
}
|
||||||
|
String topic = "sasl-plaintext-topic";
|
||||||
|
clusterInstance.createTopic(topic, 1, (short) 1);
|
||||||
|
try (Producer<byte[], byte[]> producer = clusterInstance.producer()) {
|
||||||
|
producer.send(new ProducerRecord<>(topic, Utils.utf8("key"), Utils.utf8("value"))).get();
|
||||||
|
producer.flush();
|
||||||
|
}
|
||||||
|
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer()) {
|
||||||
|
consumer.subscribe(List.of(topic));
|
||||||
|
TestUtils.waitForCondition(() -> {
|
||||||
|
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
|
||||||
|
return records.count() == 1;
|
||||||
|
}, "Failed to receive message");
|
||||||
|
}
|
||||||
|
|
||||||
|
// client with non-admin credentials
|
||||||
|
Map<String, Object> nonAdminConfig = Map.of(
|
||||||
|
SaslConfigs.SASL_JAAS_CONFIG,
|
||||||
|
String.format(
|
||||||
|
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
|
||||||
|
JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD
|
||||||
|
)
|
||||||
|
);
|
||||||
|
try (Admin admin = clusterInstance.admin(nonAdminConfig)) {
|
||||||
|
ExecutionException exception = assertThrows(
|
||||||
|
ExecutionException.class,
|
||||||
|
() -> admin.describeAcls(AclBindingFilter.ANY).values().get()
|
||||||
|
);
|
||||||
|
assertInstanceOf(ClusterAuthorizationException.class, exception.getCause());
|
||||||
|
}
|
||||||
|
try (Producer<byte[], byte[]> producer = clusterInstance.producer(nonAdminConfig)) {
|
||||||
|
ExecutionException exception = assertThrows(
|
||||||
|
ExecutionException.class,
|
||||||
|
() -> producer.send(new ProducerRecord<>(topic, Utils.utf8("key"), Utils.utf8("value"))).get()
|
||||||
|
);
|
||||||
|
assertInstanceOf(TopicAuthorizationException.class, exception.getCause());
|
||||||
|
}
|
||||||
|
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(nonAdminConfig)) {
|
||||||
|
consumer.subscribe(List.of(topic));
|
||||||
|
AtomicBoolean hasException = new AtomicBoolean(false);
|
||||||
|
TestUtils.waitForCondition(() -> {
|
||||||
|
if (hasException.get()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
consumer.poll(Duration.ofMillis(100));
|
||||||
|
} catch (TopicAuthorizationException e) {
|
||||||
|
hasException.set(true);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}, "Failed to get exception");
|
||||||
|
}
|
||||||
|
|
||||||
|
// client with unknown credentials
|
||||||
|
Map<String, Object> unknownUserConfig = Map.of(
|
||||||
|
SaslConfigs.SASL_JAAS_CONFIG,
|
||||||
|
String.format(
|
||||||
|
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
|
||||||
|
"unknown", "unknown"
|
||||||
|
)
|
||||||
|
);
|
||||||
|
try (Admin admin = clusterInstance.admin(unknownUserConfig)) {
|
||||||
|
ExecutionException exception = assertThrows(
|
||||||
|
ExecutionException.class,
|
||||||
|
() -> admin.describeAcls(AclBindingFilter.ANY).values().get()
|
||||||
|
);
|
||||||
|
assertInstanceOf(SaslAuthenticationException.class, exception.getCause());
|
||||||
|
}
|
||||||
|
try (Producer<byte[], byte[]> producer = clusterInstance.producer(unknownUserConfig)) {
|
||||||
|
ExecutionException exception = assertThrows(
|
||||||
|
ExecutionException.class,
|
||||||
|
() -> producer.send(new ProducerRecord<>(topic, Utils.utf8("key"), Utils.utf8("value"))).get()
|
||||||
|
);
|
||||||
|
assertInstanceOf(SaslAuthenticationException.class, exception.getCause());
|
||||||
|
}
|
||||||
|
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(unknownUserConfig)) {
|
||||||
|
consumer.subscribe(List.of(topic));
|
||||||
|
AtomicBoolean hasException = new AtomicBoolean(false);
|
||||||
|
TestUtils.waitForCondition(() -> {
|
||||||
|
if (hasException.get()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
consumer.poll(Duration.ofMillis(100));
|
||||||
|
} catch (SaslAuthenticationException e) {
|
||||||
|
hasException.set(true);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}, "Failed to get exception");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest(
|
||||||
|
types = {Type.KRAFT, Type.CO_KRAFT},
|
||||||
|
brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
|
||||||
|
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
|
||||||
|
serverProperties = {
|
||||||
|
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
|
||||||
|
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
|
||||||
|
}
|
||||||
|
)
|
||||||
|
public void testSaslPlaintextWithController(ClusterInstance clusterInstance) throws CancellationException, ExecutionException, InterruptedException {
|
||||||
|
// test with admin
|
||||||
|
try (Admin admin = clusterInstance.admin(Map.of(), true)) {
|
||||||
|
admin.describeAcls(AclBindingFilter.ANY).values().get();
|
||||||
|
}
|
||||||
|
|
||||||
|
// test with non-admin
|
||||||
|
Map<String, Object> nonAdminConfig = Map.of(
|
||||||
|
SaslConfigs.SASL_JAAS_CONFIG,
|
||||||
|
String.format(
|
||||||
|
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
|
||||||
|
JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD
|
||||||
|
)
|
||||||
|
);
|
||||||
|
try (Admin admin = clusterInstance.admin(nonAdminConfig, true)) {
|
||||||
|
ExecutionException exception = assertThrows(
|
||||||
|
ExecutionException.class,
|
||||||
|
() -> admin.describeAcls(AclBindingFilter.ANY, new DescribeAclsOptions().timeoutMs(5000)).values().get()
|
||||||
|
);
|
||||||
|
assertInstanceOf(TimeoutException.class, exception.getCause());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue