diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index df79a17ccbe..9f5b098d800 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -364,4 +364,6 @@ + + diff --git a/core/src/test/java/kafka/security/JaasModule.java b/core/src/test/java/kafka/security/JaasModule.java new file mode 100644 index 00000000000..46527b18609 --- /dev/null +++ b/core/src/test/java/kafka/security/JaasModule.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +public class JaasModule { + public static JaasModule zkDigestModule(boolean debug, Map entries) { + String name = "org.apache.zookeeper.server.auth.DigestLoginModule"; + return new JaasModule( + name, + debug, + entries + ); + } + + public static JaasModule krb5LoginModule(boolean useKeyTab, boolean storeKey, String keyTab, String principal, boolean debug, Optional serviceName, boolean isIbmSecurity) { + String name = isIbmSecurity ? "com.ibm.security.auth.module.Krb5LoginModule" : "com.sun.security.auth.module.Krb5LoginModule"; + + Map entries = new HashMap<>(); + if (isIbmSecurity) { + entries.put("principal", principal); + entries.put("credsType", "both"); + if (useKeyTab) { + entries.put("useKeytab", "file:" + keyTab); + } + } else { + entries.put("useKeyTab", Boolean.toString(useKeyTab)); + entries.put("storeKey", Boolean.toString(storeKey)); + entries.put("keyTab", keyTab); + entries.put("principal", principal); + serviceName.ifPresent(s -> entries.put("serviceName", s)); + } + + return new JaasModule( + name, + debug, + entries + ); + } + + public static JaasModule oAuthBearerLoginModule(String username, boolean debug) { + String name = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule"; + + Map entries = new HashMap<>(); + entries.put("unsecuredLoginStringClaim_sub", username); + + return new JaasModule( + name, + debug, + entries + ); + } + + public static JaasModule plainLoginModule(String username, String password) { + return plainLoginModule(username, password, false, Collections.emptyMap()); + } + + public static JaasModule plainLoginModule(String username, String password, boolean debug, Map validUsers) { + String name = "org.apache.kafka.common.security.plain.PlainLoginModule"; + + Map entries = new HashMap<>(); + entries.put("username", username); + entries.put("password", password); + validUsers.forEach((user, pass) -> entries.put("user_" + user, pass)); + + return new JaasModule( + name, + debug, + entries + ); + } + + public static JaasModule scramLoginModule(String username, String password) { + return scramLoginModule(username, password, false, Collections.emptyMap()); + } + + public static JaasModule scramLoginModule(String username, String password, boolean debug, Map tokenProps) { + String name = "org.apache.kafka.common.security.scram.ScramLoginModule"; + + Map entries = new HashMap<>(); + entries.put("username", username); + entries.put("password", password); + entries.putAll(tokenProps); + + return new JaasModule( + name, + debug, + entries + ); + } + + private final String name; + + private final boolean debug; + + private final Map entries; + + private JaasModule(String name, boolean debug, Map entries) { + this.name = name; + this.debug = debug; + this.entries = entries; + } + + public String name() { + return name; + } + + public boolean debug() { + return debug; + } + + @Override + public String toString() { + return String.format("%s required\n debug=%b\n %s;\n", name, debug, entries.entrySet().stream() + .map(e -> e.getKey() + "=\"" + e.getValue() + "\"") + .collect(Collectors.joining("\n "))); + } +} diff --git a/core/src/test/java/kafka/security/JaasTestUtils.java b/core/src/test/java/kafka/security/JaasTestUtils.java new file mode 100644 index 00000000000..e80208b77d2 --- /dev/null +++ b/core/src/test/java/kafka/security/JaasTestUtils.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security; + +import kafka.utils.TestUtils; + +import org.apache.kafka.clients.admin.ScramMechanism; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.utils.Java; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.config.SaslConfigs.GSSAPI_MECHANISM; +import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM; +import static org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM; + +public class JaasTestUtils { + public static class JaasSection { + private final String contextName; + private final List modules; + + public JaasSection(String contextName, List modules) { + this.contextName = contextName; + this.modules = modules; + } + + public List getModules() { + return modules; + } + + public String getContextName() { + return contextName; + } + + @Override + public String toString() { + return String.format("%s {\n %s\n};\n", + contextName, + modules.stream().map(Object::toString).collect(Collectors.joining("\n "))); + } + } + + private static final boolean IS_IBM_SECURITY = Java.isIbmJdk() && !Java.isIbmJdkSemeru(); + + private static final String ZK_SERVER_CONTEXT_NAME = "Server"; + private static final String ZK_CLIENT_CONTEXT_NAME = "Client"; + private static final String ZK_USER_SUPER_PASSWD = "adminpasswd"; + private static final String ZK_USER = "fpj"; + private static final String ZK_USER_PASSWORD = "fpjsecret"; + + public static final String KAFKA_SERVER_CONTEXT_NAME = "KafkaServer"; + public static final String KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME = "kafka"; + private static final String KAFKA_SERVER_PRINCIPAL = KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME + "/localhost@EXAMPLE.COM"; + public static final String KAFKA_CLIENT_CONTEXT_NAME = "KafkaClient"; + public static final String KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME = "client"; + private static final String KAFKA_CLIENT_PRINCIPAL = KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME + "@EXAMPLE.COM"; + public static final String KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2 = "client2"; + private static final String KAFKA_CLIENT_PRINCIPAL_2 = KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2 + "@EXAMPLE.COM"; + + public static final String KAFKA_PLAIN_USER = "plain-user"; + private static final String KAFKA_PLAIN_PASSWORD = "plain-user-secret"; + public static final String KAFKA_PLAIN_USER_2 = "plain-user2"; + public static final String KAFKA_PLAIN_PASSWORD_2 = "plain-user2-secret"; + public static final String KAFKA_PLAIN_ADMIN = "plain-admin"; + private static final String KAFKA_PLAIN_ADMIN_PASSWORD = "plain-admin-secret"; + + public static final String KAFKA_SCRAM_USER = "scram-user"; + public static final String KAFKA_SCRAM_PASSWORD = "scram-user-secret"; + public static final String KAFKA_SCRAM_USER_2 = "scram-user2"; + public static final String KAFKA_SCRAM_PASSWORD_2 = "scram-user2-secret"; + public static final String KAFKA_SCRAM_ADMIN = "scram-admin"; + public static final String KAFKA_SCRAM_ADMIN_PASSWORD = "scram-admin-secret"; + + public static final String KAFKA_OAUTH_BEARER_USER = "oauthbearer-user"; + public static final String KAFKA_OAUTH_BEARER_USER_2 = "oauthbearer-user2"; + public static final String KAFKA_OAUTH_BEARER_ADMIN = "oauthbearer-admin"; + + public static final String SERVICE_NAME = "kafka"; + + public static Properties saslConfigs(Optional saslProperties) { + Properties result = saslProperties.orElse(new Properties()); + if (IS_IBM_SECURITY && !result.containsKey(SaslConfigs.SASL_KERBEROS_SERVICE_NAME)) { + result.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, SERVICE_NAME); + } + return result; + } + + public static File writeJaasContextsToFile(List jaasSections) throws IOException { + File jaasFile = TestUtils.tempFile(); + writeToFile(jaasFile, jaasSections); + return jaasFile; + } + + public static String scramClientLoginModule(String mechanism, String scramUser, String scramPassword) { + if (ScramMechanism.fromMechanismName(mechanism) == ScramMechanism.UNKNOWN) { + throw new IllegalArgumentException("Unsupported SCRAM mechanism " + mechanism); + } + return JaasModule.scramLoginModule(scramUser, scramPassword, false, new HashMap<>()).toString(); + } + + public static String clientLoginModule(String mechanism, Optional keytabLocation, String serviceName) { + return kafkaClientModule( + mechanism, + keytabLocation, + KAFKA_CLIENT_PRINCIPAL, + KAFKA_PLAIN_USER, + KAFKA_PLAIN_PASSWORD, + KAFKA_SCRAM_USER, + KAFKA_SCRAM_PASSWORD, + KAFKA_OAUTH_BEARER_USER, + serviceName + ).toString(); + } + + public static String clientLoginModule(String mechanism, Optional keytabLocation) { + return clientLoginModule(mechanism, keytabLocation, SERVICE_NAME); + } + + public static String adminLoginModule(String mechanism, Optional keytabLocation, String serviceName) { + return kafkaClientModule( + mechanism, + keytabLocation, + KAFKA_SERVER_PRINCIPAL, + KAFKA_PLAIN_ADMIN, + KAFKA_PLAIN_ADMIN_PASSWORD, + KAFKA_SCRAM_ADMIN, + KAFKA_SCRAM_ADMIN_PASSWORD, + KAFKA_OAUTH_BEARER_ADMIN, + serviceName + ).toString(); + } + + public static String adminLoginModule(String mechanism, Optional keytabLocation) { + return adminLoginModule(mechanism, keytabLocation, SERVICE_NAME); + } + + public static String tokenClientLoginModule(String tokenId, String password) { + Map tokenProps = new HashMap<>(); + tokenProps.put("tokenauth", "true"); + return JaasModule.scramLoginModule(tokenId, password, false, tokenProps).toString(); + } + + public static List zkSections() { + Map zkServerEntries = new HashMap<>(); + zkServerEntries.put("user_super", ZK_USER_SUPER_PASSWD); + zkServerEntries.put("user_" + ZK_USER, ZK_USER_PASSWORD); + JaasSection zkServerSection = new JaasSection(ZK_SERVER_CONTEXT_NAME, Collections.singletonList(JaasModule.zkDigestModule(false, zkServerEntries))); + + Map zkClientEntries = new HashMap<>(); + zkClientEntries.put("username", ZK_USER); + zkClientEntries.put("password", ZK_USER_PASSWORD); + JaasSection zkClientSection = new JaasSection(ZK_CLIENT_CONTEXT_NAME, Collections.singletonList(JaasModule.zkDigestModule(false, zkClientEntries))); + + return Arrays.asList(zkServerSection, zkClientSection); + } + + public static JaasSection kafkaServerSection(String contextName, List mechanisms, Optional keytabLocation) { + List modules = new ArrayList<>(); + for (String mechanism : mechanisms) { + switch (mechanism) { + case GSSAPI_MECHANISM: + modules.add(JaasModule.krb5LoginModule( + true, + true, + keytabLocation.orElseThrow(() -> new IllegalArgumentException("Keytab location not specified for GSSAPI")).getAbsolutePath(), + KAFKA_SERVER_PRINCIPAL, + true, + Optional.of(SERVICE_NAME), + IS_IBM_SECURITY + )); + break; + case PLAIN_MECHANISM: + Map validUsers = new HashMap<>(); + validUsers.put(KAFKA_PLAIN_ADMIN, KAFKA_PLAIN_ADMIN_PASSWORD); + validUsers.put(KAFKA_PLAIN_USER, KAFKA_PLAIN_PASSWORD); + validUsers.put(KAFKA_PLAIN_USER_2, KAFKA_PLAIN_PASSWORD_2); + modules.add(JaasModule.plainLoginModule(KAFKA_PLAIN_ADMIN, KAFKA_PLAIN_ADMIN_PASSWORD, false, validUsers)); + break; + case OAUTHBEARER_MECHANISM: + modules.add(JaasModule.oAuthBearerLoginModule(KAFKA_OAUTH_BEARER_ADMIN, false)); + break; + default: + if (ScramMechanism.fromMechanismName(mechanism) != ScramMechanism.UNKNOWN) { + modules.add(JaasModule.scramLoginModule(KAFKA_SCRAM_ADMIN, KAFKA_SCRAM_ADMIN_PASSWORD, false, new HashMap<>())); + } else { + throw new IllegalArgumentException("Unsupported server mechanism " + mechanism); + } + break; + } + } + return new JaasSection(contextName, modules); + } + + private static JaasModule kafkaClientModule(String mechanism, + Optional keytabLocation, + String clientPrincipal, + String plainUser, + String plainPassword, + String scramUser, + String scramPassword, + String oauthBearerUser, + String serviceName) { + switch (mechanism) { + case GSSAPI_MECHANISM: + return JaasModule.krb5LoginModule( + true, + true, + keytabLocation.orElseThrow(() -> new IllegalArgumentException("Keytab location not specified for GSSAPI")).getAbsolutePath(), + clientPrincipal, + true, + Optional.of(serviceName), + IS_IBM_SECURITY + ); + case PLAIN_MECHANISM: + return JaasModule.plainLoginModule(plainUser, plainPassword, false, new HashMap<>()); + case OAUTHBEARER_MECHANISM: + return JaasModule.oAuthBearerLoginModule(oauthBearerUser, false); + default: + if (ScramMechanism.fromMechanismName(mechanism) != ScramMechanism.UNKNOWN) { + return JaasModule.scramLoginModule(scramUser, scramPassword, false, new HashMap<>()); + } else { + throw new IllegalArgumentException("Unsupported client mechanism " + mechanism); + } + } + } + + public static JaasSection kafkaClientSection(Optional mechanism, Optional keytabLocation) { + return new JaasSection(KAFKA_CLIENT_CONTEXT_NAME, + mechanism.map(m -> kafkaClientModule(m, + keytabLocation, + KAFKA_CLIENT_PRINCIPAL_2, + KAFKA_PLAIN_USER_2, + KAFKA_PLAIN_PASSWORD_2, + KAFKA_SCRAM_USER_2, + KAFKA_SCRAM_PASSWORD_2, + KAFKA_OAUTH_BEARER_USER_2, + SERVICE_NAME) + ).map(Collections::singletonList).orElse(Collections.emptyList())); + } + + private static void writeToFile(File file, List jaasSections) throws IOException { + try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { + writer.write(String.join("", jaasSections.stream().map(Object::toString).toArray(String[]::new))); + } + } +} diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala index d6a2d105e8d..7403f8e6e05 100644 --- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala +++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala @@ -14,29 +14,29 @@ package kafka.api -import java.{lang, util} -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} -import java.util.Properties import kafka.api.GroupedUserPrincipalBuilder._ import kafka.api.GroupedUserQuotaCallback._ +import kafka.security.{JaasModule, JaasTestUtils} import kafka.server._ -import kafka.utils.JaasTestUtils.ScramLoginModule -import kafka.utils.{JaasTestUtils, Logging, TestUtils} +import kafka.utils.{Logging, TestUtils} import kafka.zk.ConfigEntityChangeNotificationZNode import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} -import org.apache.kafka.common.{Cluster, Reconfigurable} import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth._ -import org.apache.kafka.server.config.{ServerConfigs, QuotaConfigs} +import org.apache.kafka.common.{Cluster, Reconfigurable} +import org.apache.kafka.server.config.{QuotaConfigs, ServerConfigs} import org.apache.kafka.server.quota._ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} +import java.util.Properties +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} +import java.{lang, util} import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ @@ -62,7 +62,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { @BeforeEach override def setUp(testInfo: TestInfo): Unit = { - startSasl(jaasSections(kafkaServerSaslMechanisms, Some("SCRAM-SHA-256"), KafkaSasl, JaasTestUtils.KafkaServerContextName)) + startSasl(jaasSections(kafkaServerSaslMechanisms, Some("SCRAM-SHA-256"), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) this.serverConfig.setProperty(QuotaConfigs.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, classOf[GroupedUserQuotaCallback].getName) this.serverConfig.setProperty(s"${listenerName.configPrefix}${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG}", classOf[GroupedUserPrincipalBuilder].getName) @@ -70,7 +70,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { super.setUp(testInfo) producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, - ScramLoginModule(JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword).toString) + JaasModule.scramLoginModule(JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD).toString) producerWithoutQuota = createProducer() } @@ -85,7 +85,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = { super.configureSecurityBeforeServersStart(testInfo) zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) - createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword) + createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD) } @Test @@ -188,7 +188,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { config.put(key.toString, value) } config.put(SaslConfigs.SASL_JAAS_CONFIG, - ScramLoginModule(JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword).toString) + JaasModule.scramLoginModule(JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD).toString) val adminClient = Admin.create(config) adminClients += adminClient adminClient @@ -220,12 +220,13 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { val consumerClientId = s"$user:consumer-client-id" producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, producerClientId) - producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, ScramLoginModule(user, password).toString) + + producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, JaasModule.scramLoginModule(user, password).toString) consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId) consumerConfig.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString) consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, s"$user-group") - consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, ScramLoginModule(user, password).toString) + consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, JaasModule.scramLoginModule(user, password).toString) GroupedUser(user, userGroup, topic, servers(leader), producerClientId, consumerClientId, createProducer(), createConsumer(), adminClient) diff --git a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala index 6f775196193..84bdc727f37 100644 --- a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala @@ -16,6 +16,8 @@ */ package kafka.api +import kafka.security.JaasTestUtils + import java.util.Properties import kafka.utils._ import kafka.zk.ConfigEntityChangeNotificationZNode @@ -43,11 +45,11 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) - override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramUser) - private val clientPassword = JaasTestUtils.KafkaScramPassword + override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_SCRAM_USER) + private val clientPassword = JaasTestUtils.KAFKA_SCRAM_PASSWORD - override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramAdmin) - protected val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword + override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_SCRAM_ADMIN) + protected val kafkaPassword = JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD protected val privilegedAdminClientConfig = new Properties() @@ -71,7 +73,7 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest override def addFormatterSettings(formatter: Formatter): Unit = { formatter.setClusterId("XcZZOzUqS4yHOjhMQB6JLQ") formatter.setScramArguments( - List(s"SCRAM-SHA-256=[name=${JaasTestUtils.KafkaScramAdmin},password=${JaasTestUtils.KafkaScramAdminPassword}]").asJava) + List(s"SCRAM-SHA-256=[name=${JaasTestUtils.KAFKA_SCRAM_ADMIN},password=${JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD}]").asJava) } override def createPrivilegedAdminClient(): Admin = createScramAdminClient(kafkaClientSaslMechanism, kafkaPrincipal.getName, kafkaPassword) diff --git a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala index e57e96c10a8..0378f2c53d0 100644 --- a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala +++ b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala @@ -16,6 +16,7 @@ */ package kafka.api +import kafka.security.JaasTestUtils import kafka.utils._ import org.apache.kafka.clients.admin.{Admin, CreateDelegationTokenOptions, DescribeDelegationTokenOptions} import org.apache.kafka.common.acl._ @@ -51,8 +52,8 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest extends DelegationTokenE override def createDelegationTokenOptions(): CreateDelegationTokenOptions = new CreateDelegationTokenOptions().owner(clientPrincipal) - private val tokenRequesterPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramUser2) - private val tokenRequesterPassword = JaasTestUtils.KafkaScramPassword2 + private val tokenRequesterPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_SCRAM_USER_2) + private val tokenRequesterPassword = JaasTestUtils.KAFKA_SCRAM_PASSWORD_2 private val otherClientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "other-client-principal") private val otherClientPassword = "other-client-password" diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala index c150b70415d..a9c54039750 100644 --- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala +++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala @@ -12,10 +12,11 @@ */ package kafka.api +import kafka.security.JaasTestUtils import java.util import java.util.Properties import kafka.security.authorizer.AclAuthorizer -import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils} +import kafka.utils.{CoreUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, CLUSTER_ACTION, DELETE, DESCRIBE} import org.apache.kafka.common.acl.AclPermissionType.ALLOW @@ -40,27 +41,27 @@ object DescribeAuthorizedOperationsTest { val Group1Acl = new AclBinding( new ResourcePattern(ResourceType.GROUP, Group1, PatternType.LITERAL), - accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALL)) + accessControlEntry(JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2, ALL)) val Group2Acl = new AclBinding( new ResourcePattern(ResourceType.GROUP, Group2, PatternType.LITERAL), - accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DESCRIBE)) + accessControlEntry(JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2, DESCRIBE)) val Group3Acl = new AclBinding( new ResourcePattern(ResourceType.GROUP, Group3, PatternType.LITERAL), - accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DELETE)) + accessControlEntry(JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2, DELETE)) val ClusterAllAcl = new AclBinding( new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL), - accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALL)) + accessControlEntry(JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2, ALL)) val Topic1Acl = new AclBinding( new ResourcePattern(ResourceType.TOPIC, Topic1, PatternType.LITERAL), - accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALL)) + accessControlEntry(JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2, ALL)) val Topic2All = new AclBinding( new ResourcePattern(ResourceType.TOPIC, Topic2, PatternType.LITERAL), - accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DELETE)) + accessControlEntry(JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2, DELETE)) private def accessControlEntry( userName: String, @@ -93,11 +94,11 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS authorizer.configure(this.configs.head.originals()) val result = authorizer.createAcls(null, List( new AclBinding(clusterResource, accessControlEntry( - JaasTestUtils.KafkaServerPrincipalUnqualifiedName, CLUSTER_ACTION)), + JaasTestUtils.KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME, CLUSTER_ACTION)), new AclBinding(clusterResource, accessControlEntry( - JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALTER)), + JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2, ALTER)), new AclBinding(topicResource, accessControlEntry( - JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DESCRIBE)) + JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2, DESCRIBE)) ).asJava) result.asScala.map(_.toCompletableFuture.get).foreach(result => assertFalse(result.exception.isPresent)) } finally { @@ -107,7 +108,7 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS @BeforeEach override def setUp(testInfo: TestInfo): Unit = { - startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName)) + startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) super.setUp(testInfo) TestUtils.waitUntilBrokerMetadataIsPropagated(servers) client = Admin.create(createConfig()) diff --git a/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala index 8bd639389cd..875adb4600e 100644 --- a/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala @@ -17,7 +17,7 @@ package kafka.api import kafka.api.GroupEndToEndAuthorizationTest._ -import kafka.utils.JaasTestUtils +import kafka.security.JaasTestUtils import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, SaslAuthenticationContext} import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder @@ -29,7 +29,7 @@ object GroupEndToEndAuthorizationTest { override def build(context: AuthenticationContext): KafkaPrincipal = { context match { case ctx: SaslAuthenticationContext => - if (ctx.server.getAuthorizationID == JaasTestUtils.KafkaScramUser) + if (ctx.server.getAuthorizationID == JaasTestUtils.KAFKA_SCRAM_USER) new KafkaPrincipal(GroupPrincipalType, ClientGroup) else new KafkaPrincipal(GroupPrincipalType, ctx.server.getAuthorizationID) @@ -42,6 +42,6 @@ object GroupEndToEndAuthorizationTest { class GroupEndToEndAuthorizationTest extends SaslScramSslEndToEndAuthorizationTest { override val clientPrincipal = new KafkaPrincipal(GroupPrincipalType, ClientGroup) - override val kafkaPrincipal = new KafkaPrincipal(GroupPrincipalType, JaasTestUtils.KafkaScramAdmin) + override val kafkaPrincipal = new KafkaPrincipal(GroupPrincipalType, JaasTestUtils.KAFKA_SCRAM_ADMIN) this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[GroupPrincipalBuilder].getName) } diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala index 1b72cb895ed..00b22ca5277 100644 --- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala +++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala @@ -12,27 +12,27 @@ */ package kafka.api -import java.util.{Locale, Properties} -import kafka.server.KafkaServer -import kafka.utils.{JaasTestUtils, TestUtils} import com.yammer.metrics.core.{Gauge, Histogram, Meter} +import kafka.security.JaasTestUtils +import kafka.server.KafkaServer +import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} -import org.apache.kafka.common.{Metric, MetricName, TopicPartition} import org.apache.kafka.common.config.{SaslConfigs, TopicConfig} import org.apache.kafka.common.errors.{InvalidTopicException, UnknownTopicOrPartitionException} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.authenticator.TestJaasConfig -import org.apache.kafka.server.config.ZkConfigs -import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs} +import org.apache.kafka.common.{Metric, MetricName, TopicPartition} +import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs, ZkConfigs} import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics} import org.apache.kafka.server.metrics.KafkaYammerMetrics -import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util.{Locale, Properties} import scala.annotation.nowarn import scala.jdk.CollectionConverters._ @@ -44,7 +44,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { private val kafkaClientSaslMechanism = "PLAIN" private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism) private val kafkaServerJaasEntryName = - s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}" + s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME}" this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "false") this.serverConfig.setProperty(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false") this.serverConfig.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "2.8") diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala index f3b6977f6b7..f8440226caa 100644 --- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala @@ -12,6 +12,7 @@ */ package kafka.api +import kafka.security.JaasTestUtils import java.time.Duration import java.util.{Collections, Properties} import java.util.concurrent.{ExecutionException, TimeUnit} @@ -23,7 +24,7 @@ import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.SaslAuthenticationException import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} import org.junit.jupiter.api.Assertions._ -import kafka.utils.{JaasTestUtils, TestUtils} +import kafka.utils.TestUtils import kafka.zk.ConfigEntityChangeNotificationZNode import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.coordinator.transaction.TransactionLogConfigs @@ -54,18 +55,18 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { super.configureSecurityBeforeServersStart(testInfo) zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) // Create broker credentials before starting brokers - createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword) + createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD) } override def createPrivilegedAdminClient() = { createAdminClient(bootstrapServers(), securityProtocol, trustStoreFile, clientSaslProperties, - kafkaClientSaslMechanism, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword) + kafkaClientSaslMechanism, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD) } @BeforeEach override def setUp(testInfo: TestInfo): Unit = { startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, - JaasTestUtils.KafkaServerContextName)) + JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) super.setUp(testInfo) createTopic(topic, numPartitions, brokerCount) } @@ -148,7 +149,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { try { val response = adminClient.describeTopics(Collections.singleton(topic)).allTopicNames.get assertEquals(1, response.size) - response.forEach { (topic, description) => + response.forEach { (_, description) => assertEquals(numPartitions, description.partitions.size) } } catch { @@ -167,7 +168,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { } private def createClientCredential(): Unit = { - createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2) + createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KAFKA_SCRAM_USER_2, JaasTestUtils.KAFKA_SCRAM_PASSWORD_2) } private def sendOneRecord(producer: KafkaProducer[Array[Byte], Array[Byte]], maxWaitMs: Long = 15000): Unit = { diff --git a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala index e1f1638fa0a..dd76dc03153 100644 --- a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala @@ -16,20 +16,18 @@ */ package kafka.api +import kafka.security.JaasTestUtils import kafka.security.authorizer.AclAuthorizer -import kafka.utils.JaasTestUtils import org.apache.kafka.common.config.SslConfigs import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.security.auth._ import org.junit.jupiter.api.Assertions.assertNull -import scala.collection.immutable.List - class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest { override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, - JaasTestUtils.KafkaClientPrincipalUnqualifiedName) + JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME) override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, - JaasTestUtils.KafkaServerPrincipalUnqualifiedName) + JaasTestUtils.KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME) override protected def kafkaClientSaslMechanism = "GSSAPI" override protected def kafkaServerSaslMechanisms = List("GSSAPI") diff --git a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala index df21372e465..6fbc0d265cd 100644 --- a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala @@ -12,10 +12,11 @@ */ package kafka.api -import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} -import kafka.utils.{JaasTestUtils, TestUtils} +import kafka.security.JaasTestUtils +import kafka.utils.TestUtils import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.server.config.ZkConfigs +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -34,7 +35,7 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup { @BeforeEach override def setUp(testInfo: TestInfo): Unit = { startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, - JaasTestUtils.KafkaServerContextName)) + JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) super.setUp(testInfo) } diff --git a/core/src/test/scala/integration/kafka/api/SaslOAuthBearerSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslOAuthBearerSslEndToEndAuthorizationTest.scala index 904c35f31f5..7282480a9c3 100644 --- a/core/src/test/scala/integration/kafka/api/SaslOAuthBearerSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslOAuthBearerSslEndToEndAuthorizationTest.scala @@ -16,12 +16,12 @@ */ package kafka.api -import kafka.utils.JaasTestUtils +import kafka.security.JaasTestUtils import org.apache.kafka.common.security.auth._ class SaslOAuthBearerSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest { override protected def kafkaClientSaslMechanism = "OAUTHBEARER" override protected def kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism) - override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaOAuthBearerUser) - override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaOAuthBearerAdmin) + override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_OAUTH_BEARER_USER) + override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_OAUTH_BEARER_ADMIN) } diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala index bdbae528f75..aed896e141a 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala @@ -12,8 +12,9 @@ */ package kafka.api +import kafka.security.JaasTestUtils +import kafka.utils.TestUtils import kafka.utils.TestUtils.{isAclUnsecure, secureZkPaths} -import kafka.utils.{JaasTestUtils, TestUtils} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.server.config.ZkConfigs @@ -28,7 +29,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup { private val kafkaClientSaslMechanism = "PLAIN" private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism) private val kafkaServerJaasEntryName = - s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}" + s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME}" this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "false") // disable secure acls of zkClient in QuorumTestHarness override protected def zkAclsEnabled = Some(false) diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala index b4772aa3cc5..aa85d9eb531 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala @@ -16,7 +16,8 @@ */ package kafka.api -import kafka.utils.JaasTestUtils._ +import kafka.security.JaasModule +import kafka.security.JaasTestUtils._ import kafka.utils.TestUtils import kafka.utils.TestUtils.isAclSecure import kafka.zk.ZkData @@ -30,11 +31,12 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.Test import java.security.AccessController -import java.util.Properties +import java.util.{Collections, Properties} import javax.security.auth.Subject import javax.security.auth.callback._ import javax.security.auth.login.AppConfigurationEntry import scala.collection.Seq +import scala.jdk.CollectionConverters._ object SaslPlainSslEndToEndAuthorizationTest { @@ -50,9 +52,9 @@ object SaslPlainSslEndToEndAuthorizationTest { assertTrue(sslPrincipal.endsWith(s"CN=${TestUtils.SslCertificateCn}"), s"Unexpected SSL principal $sslPrincipal") saslContext.server.getAuthorizationID match { - case KafkaPlainAdmin => + case KAFKA_PLAIN_ADMIN => new KafkaPrincipal(KafkaPrincipal.USER_TYPE, controllerPrincipalName) - case KafkaPlainUser => + case KAFKA_PLAIN_USER => new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user") case _ => KafkaPrincipal.ANONYMOUS @@ -61,9 +63,9 @@ object SaslPlainSslEndToEndAuthorizationTest { } object Credentials { - val allUsers = Map(KafkaPlainUser -> "user1-password", - KafkaPlainUser2 -> KafkaPlainPassword2, - KafkaPlainAdmin -> "broker-password") + val allUsers = Map(KAFKA_PLAIN_USER -> "user1-password", + KAFKA_PLAIN_USER_2 -> KAFKA_PLAIN_PASSWORD_2, + KAFKA_PLAIN_ADMIN -> "broker-password") } class TestServerCallbackHandler extends AuthenticateCallbackHandler { @@ -91,7 +93,7 @@ object SaslPlainSslEndToEndAuthorizationTest { callback match { case nameCallback: NameCallback => nameCallback.setName(username) case passwordCallback: PasswordCallback => - if (username == KafkaPlainUser || username == KafkaPlainAdmin) + if (username == KAFKA_PLAIN_USER || username == KAFKA_PLAIN_ADMIN) passwordCallback.setPassword(Credentials.allUsers(username).toCharArray) case _ => throw new UnsupportedCallbackException(callback) } @@ -130,10 +132,10 @@ class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes kafkaClientSaslMechanism: Option[String], mode: SaslSetupMode, kafkaServerEntryName: String): Seq[JaasSection] = { - val brokerLogin = PlainLoginModule(KafkaPlainAdmin, "") // Password provided by callback handler - val clientLogin = PlainLoginModule(KafkaPlainUser2, KafkaPlainPassword2) - Seq(JaasSection(kafkaServerEntryName, Seq(brokerLogin)), - JaasSection(KafkaClientContextName, Seq(clientLogin))) ++ zkSections + val brokerLogin = JaasModule.plainLoginModule(KAFKA_PLAIN_ADMIN, "") // Password provided by callback handler + val clientLogin = JaasModule.plainLoginModule(KAFKA_PLAIN_USER_2, KAFKA_PLAIN_PASSWORD_2) + Seq(new JaasSection(kafkaServerEntryName, Collections.singletonList(brokerLogin)), + new JaasSection(KAFKA_CLIENT_CONTEXT_NAME, Collections.singletonList(clientLogin))) ++ zkSections.asScala } // Generate SSL certificates for clients since we are enabling TLS mutual authentication diff --git a/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala index cfe245fabe0..76bec482f82 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala @@ -12,7 +12,7 @@ */ package kafka.api -import kafka.utils.JaasTestUtils +import kafka.security.JaasTestUtils import org.apache.kafka.common.security.auth.SecurityProtocol import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} @@ -22,7 +22,7 @@ class SaslPlaintextConsumerTest extends BaseConsumerTest with SaslSetup { @BeforeEach override def setUp(testInfo: TestInfo): Unit = { - startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), KafkaSasl, JaasTestUtils.KafkaServerContextName)) + startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) super.setUp(testInfo) } diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala index 1412fb85bff..49ae2bed321 100644 --- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala @@ -16,6 +16,8 @@ */ package kafka.api +import kafka.security.JaasTestUtils + import java.util.Properties import kafka.utils._ import kafka.zk.ConfigEntityChangeNotificationZNode @@ -33,9 +35,9 @@ import org.junit.jupiter.params.provider.ValueSource class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest { override protected def kafkaClientSaslMechanism = "SCRAM-SHA-256" override protected def kafkaServerSaslMechanisms = ScramMechanism.mechanismNames.asScala.toList - override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramUser) - override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramAdmin) - private val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword + override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_SCRAM_USER) + override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_SCRAM_ADMIN) + private val kafkaPassword = JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = { super.configureSecurityBeforeServersStart(testInfo) @@ -54,7 +56,7 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes override def addFormatterSettings(formatter: Formatter): Unit = { formatter.setClusterId("XcZZOzUqS4yHOjhMQB6JLQ") formatter.setScramArguments(List( - s"SCRAM-SHA-256=[name=${JaasTestUtils.KafkaScramAdmin},password=${JaasTestUtils.KafkaScramAdminPassword}]").asJava) + s"SCRAM-SHA-256=[name=${JaasTestUtils.KAFKA_SCRAM_ADMIN},password=${JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD}]").asJava) } override def configureListeners(props: collection.Seq[Properties]): Unit = { @@ -68,8 +70,8 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) // Create client credentials after starting brokers so that dynamic credential creation is also tested - createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword) - createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2) + createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KAFKA_SCRAM_USER, JaasTestUtils.KAFKA_SCRAM_PASSWORD) + createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KAFKA_SCRAM_USER_2, JaasTestUtils.KAFKA_SCRAM_PASSWORD_2) } @ParameterizedTest diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index 5a9bd4b9db2..3cb1bb24f25 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -17,15 +17,11 @@ package kafka.api -import java.io.File -import java.util -import java.util.Properties -import javax.security.auth.login.Configuration -import scala.collection.Seq +import kafka.security.JaasTestUtils +import kafka.security.JaasTestUtils.JaasSection import kafka.security.minikdc.MiniKdc import kafka.server.KafkaConfig -import kafka.utils.JaasTestUtils.{JaasSection, Krb5LoginModule, ZkDigestModule} -import kafka.utils.{JaasTestUtils, TestUtils} +import kafka.utils.TestUtils import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, ScramCredentialInfo, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism} import org.apache.kafka.common.config.SaslConfigs @@ -38,6 +34,13 @@ import org.apache.kafka.common.utils.Time import org.apache.kafka.server.config.ConfigType import org.apache.zookeeper.client.ZKClientConfig +import java.io.File +import java.util +import java.util.Properties +import javax.security.auth.login.Configuration +import scala.collection.Seq +import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters._ import scala.util.Using /* @@ -62,18 +65,17 @@ trait SaslSetup { def startSasl(jaasSections: Seq[JaasSection]): Unit = { // Important if tests leak consumers, producers or brokers LoginManager.closeAll() - val hasKerberos = jaasSections.exists(_.modules.exists { - case _: Krb5LoginModule => true - case _ => false - }) + + val hasKerberos = jaasSections.exists(_.getModules.asScala.exists(_.name().endsWith("Krb5LoginModule"))) + if (hasKerberos) { initializeKerberos() } + writeJaasConfigurationToFile(jaasSections) - val hasZk = jaasSections.exists(_.modules.exists { - case _: ZkDigestModule => true - case _ => false - }) + + val hasZk = jaasSections.exists(_.getModules.asScala.exists(_.name() == "org.apache.zookeeper.server.auth.DigestLoginModule")) + if (hasZk) System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider") } @@ -82,9 +84,9 @@ trait SaslSetup { val (serverKeytabFile, clientKeytabFile) = maybeCreateEmptyKeytabFiles() kdc = new MiniKdc(kdcConf, workDir) kdc.start() - kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost") + kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME + "/localhost") kdc.createPrincipal(clientKeytabFile, - JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2) + JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME, JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2) } /** Return a tuple with the path to the server keytab file and client keytab file */ @@ -99,23 +101,23 @@ trait SaslSetup { def jaasSections(kafkaServerSaslMechanisms: Seq[String], kafkaClientSaslMechanism: Option[String], mode: SaslSetupMode = Both, - kafkaServerEntryName: String = JaasTestUtils.KafkaServerContextName): Seq[JaasSection] = { + kafkaServerEntryName: String = JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME): Seq[JaasSection] = { val hasKerberos = mode != ZkSasl && (kafkaServerSaslMechanisms.contains("GSSAPI") || kafkaClientSaslMechanism.contains("GSSAPI")) if (hasKerberos) maybeCreateEmptyKeytabFiles() mode match { - case ZkSasl => JaasTestUtils.zkSections + case ZkSasl => JaasTestUtils.zkSections.asScala case KafkaSasl => - Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, kafkaServerSaslMechanisms, serverKeytabFile), - JaasTestUtils.kafkaClientSection(kafkaClientSaslMechanism, clientKeytabFile)) - case Both => Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, kafkaServerSaslMechanisms, serverKeytabFile), - JaasTestUtils.kafkaClientSection(kafkaClientSaslMechanism, clientKeytabFile)) ++ JaasTestUtils.zkSections + Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, kafkaServerSaslMechanisms.asJava, serverKeytabFile.toJava), + JaasTestUtils.kafkaClientSection(kafkaClientSaslMechanism.toJava, clientKeytabFile.toJava)) + case Both => Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, kafkaServerSaslMechanisms.asJava, serverKeytabFile.toJava), + JaasTestUtils.kafkaClientSection(kafkaClientSaslMechanism.toJava, clientKeytabFile.toJava)) ++ JaasTestUtils.zkSections.asScala } } private def writeJaasConfigurationToFile(jaasSections: Seq[JaasSection]): Unit = { - val file = JaasTestUtils.writeJaasContextsToFile(jaasSections) + val file = JaasTestUtils.writeJaasContextsToFile(jaasSections.asJava) System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, file.getAbsolutePath) // This will cause a reload of the Configuration singleton when `getConfiguration` is called Configuration.setConfiguration(null) @@ -148,16 +150,16 @@ trait SaslSetup { def jaasClientLoginModule(clientSaslMechanism: String, serviceName: Option[String] = None): String = { if (serviceName.isDefined) - JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile, serviceName.get) + JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile.toJava, serviceName.get) else - JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile) + JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile.toJava) } def jaasAdminLoginModule(clientSaslMechanism: String, serviceName: Option[String] = None): String = { if (serviceName.isDefined) - JaasTestUtils.adminLoginModule(clientSaslMechanism, serverKeytabFile, serviceName.get) + JaasTestUtils.adminLoginModule(clientSaslMechanism, serverKeytabFile.toJava, serviceName.get) else - JaasTestUtils.adminLoginModule(clientSaslMechanism, serverKeytabFile) + JaasTestUtils.adminLoginModule(clientSaslMechanism, serverKeytabFile.toJava) } def jaasScramClientLoginModule(clientSaslScramMechanism: String, scramUser: String, scramPassword: String): String = { diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala index 642bb4b69b1..24947cde26c 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala @@ -12,9 +12,10 @@ */ package kafka.api +import kafka.security.JaasTestUtils import kafka.security.authorizer.AclAuthorizer import kafka.utils.TestUtils._ -import kafka.utils.{JaasTestUtils, TestInfoUtils, TestUtils} +import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.common.Uuid import org.apache.kafka.common.acl._ @@ -47,7 +48,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL) val zkAuthorizerClassName = classOf[AclAuthorizer].getName val kraftAuthorizerClassName = classOf[StandardAuthorizer].getName - val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaServerPrincipalUnqualifiedName) + val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME) var superUserAdmin: Admin = _ override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) @@ -72,7 +73,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu } def setUpSasl(): Unit = { - startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName)) + startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) val loginContext = jaasAdminLoginModule("GSSAPI") superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, loginContext) diff --git a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala index fe276fdd719..0bfbb81cc63 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala @@ -12,7 +12,8 @@ */ package kafka.api -import kafka.utils.{JaasTestUtils, TestUtils} +import kafka.security.JaasTestUtils +import kafka.utils.TestUtils import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} @@ -25,7 +26,7 @@ class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup { @BeforeEach override def setUp(testInfo: TestInfo): Unit = { - startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName)) + startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) super.setUp(testInfo) } diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala index 4e724c33bb7..5a5fb83aab5 100644 --- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala @@ -14,8 +14,9 @@ package kafka.api +import kafka.security.JaasTestUtils import kafka.server.KafkaBroker -import kafka.utils.{JaasTestUtils, TestUtils} +import kafka.utils.TestUtils import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} @@ -30,7 +31,7 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup { @BeforeEach override def setUp(testInfo: TestInfo): Unit = { - startSasl(jaasSections(kafkaServerSaslMechanisms, Some("GSSAPI"), KafkaSasl, JaasTestUtils.KafkaServerContextName)) + startSasl(jaasSections(kafkaServerSaslMechanisms, Some("GSSAPI"), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) super.setUp(testInfo) quotaTestClients.alterClientQuotas( quotaTestClients.clientQuotaAlteration( @@ -53,7 +54,7 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup { val adminClient = createAdminClient() new QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producer, consumer, adminClient) { - override val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2) + override val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2) override def quotaMetricTags(clientId: String): Map[String, String] = { Map("user" -> userPrincipal.getName, "client-id" -> "") diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index acbbd6cfc60..01aef960dd4 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -33,6 +33,7 @@ import kafka.api.{KafkaSasl, SaslSetup} import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager} import kafka.log.UnifiedLog import kafka.network.{DataPlaneAcceptor, Processor, RequestChannel} +import kafka.security.JaasTestUtils import kafka.utils._ import kafka.utils.Implicits._ import kafka.utils.TestUtils.TestControllerRequestCompletionHandler @@ -53,7 +54,7 @@ import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestEx import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.MetadataRequestData import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext, MetricsReporter, Quota} -import org.apache.kafka.common.network.{ListenerName, ConnectionMode} +import org.apache.kafka.common.network.{ConnectionMode, ListenerName} import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS, TRUSTSTORE_PROPS} import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.requests.MetadataRequest @@ -1146,12 +1147,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup @Test def testAddRemoveSaslListeners(): Unit = { - createScramCredentials(adminClients.head, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword) - createScramCredentials(adminClients.head, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword) + createScramCredentials(adminClients.head, JaasTestUtils.KAFKA_SCRAM_USER, JaasTestUtils.KAFKA_SCRAM_PASSWORD) + createScramCredentials(adminClients.head, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD) initializeKerberos() // make sure each server's credential cache has all the created credentials // (check after initializing Kerberos to minimize delays) - List(JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramAdmin).foreach { scramUser => + List(JaasTestUtils.KAFKA_SCRAM_USER, JaasTestUtils.KAFKA_SCRAM_ADMIN).foreach { scramUser => servers.foreach { server => ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach(mechanism => TestUtils.waitUntilTrue(() => server.credentialProvider.credentialCache.cache( @@ -1395,7 +1396,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup private def awaitInitialPositions(consumer: Consumer[_, _]): Unit = { TestUtils.pollUntilTrue(consumer, () => !consumer.assignment.isEmpty, "Timed out while waiting for assignment") - consumer.assignment.forEach(consumer.position(_)) + consumer.assignment.forEach(tp => consumer.position(tp)) } private def clientProps(securityProtocol: SecurityProtocol, saslMechanism: Option[String] = None): Properties = { @@ -1748,7 +1749,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup props.put(prefix + SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka") mechanisms.foreach { mechanism => val jaasSection = jaasSections(Seq(mechanism), None, KafkaSasl, "").head - val jaasConfig = jaasSection.modules.head.toString + val jaasConfig = jaasSection.getModules.get(0).toString props.put(listenerName.saslMechanismConfigPrefix(mechanism) + SaslConfigs.SASL_JAAS_CONFIG, jaasConfig) } } diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala index 39aa3c3639c..de07e044fc0 100644 --- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala @@ -17,12 +17,14 @@ */ package kafka.server +import kafka.security.JaasTestUtils +import kafka.security.JaasTestUtils.JaasSection + import java.util.Properties - import scala.collection.Seq +import scala.jdk.OptionConverters._ +import scala.jdk.CollectionConverters._ -import kafka.utils.JaasTestUtils -import kafka.utils.JaasTestUtils.JaasSection class MultipleListenersWithAdditionalJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest { @@ -30,15 +32,15 @@ class MultipleListenersWithAdditionalJaasContextTest extends MultipleListenersWi override def staticJaasSections: Seq[JaasSection] = { val (serverKeytabFile, _) = maybeCreateEmptyKeytabFiles() - JaasTestUtils.zkSections :+ - JaasTestUtils.kafkaServerSection("secure_external.KafkaServer", kafkaServerSaslMechanisms(SecureExternal), Some(serverKeytabFile)) + JaasTestUtils.zkSections.asScala :+ + JaasTestUtils.kafkaServerSection("secure_external.KafkaServer", kafkaServerSaslMechanisms(SecureExternal).asJava, Some(serverKeytabFile).toJava) } override protected def dynamicJaasSections: Properties = { val props = new Properties kafkaServerSaslMechanisms(SecureInternal).foreach { mechanism => addDynamicJaasSection(props, SecureInternal, mechanism, - JaasTestUtils.kafkaServerSection("secure_internal.KafkaServer", Seq(mechanism), None)) + JaasTestUtils.kafkaServerSection("secure_internal.KafkaServer", Seq(mechanism).asJava, None.toJava)) } props } diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala index 23746c49557..e78bb73fe6b 100644 --- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala @@ -18,16 +18,14 @@ package kafka.server import java.util.Properties - import scala.collection.Seq - import kafka.api.Both -import kafka.utils.JaasTestUtils.JaasSection +import kafka.security.JaasTestUtils.JaasSection class MultipleListenersWithDefaultJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest { override def staticJaasSections: Seq[JaasSection] = - jaasSections(kafkaServerSaslMechanisms.values.flatMap(identity).toSeq, Some(kafkaClientSaslMechanism), Both) + jaasSections(kafkaServerSaslMechanisms.values.flatten.toSeq, Some(kafkaClientSaslMechanism), Both) override protected def dynamicJaasSections: Properties = new Properties diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala index ec29d3d4d94..93dcce79826 100644 --- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala @@ -21,15 +21,16 @@ package kafka.server import java.util.{Collections, Objects, Properties} import java.util.concurrent.TimeUnit import kafka.api.SaslSetup -import kafka.utils.JaasTestUtils.JaasSection -import kafka.utils.{JaasTestUtils, TestUtils} +import kafka.security.JaasTestUtils +import kafka.security.JaasTestUtils.JaasSection +import kafka.utils.TestUtils import kafka.utils.Implicits._ import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.config.{SaslConfigs, SslConfigs} import org.apache.kafka.common.internals.Topic -import org.apache.kafka.common.network.{ListenerName, ConnectionMode} +import org.apache.kafka.common.network.{ConnectionMode, ListenerName} import org.apache.kafka.server.config.{ReplicationConfigs, ZkConfigs} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.network.SocketServerConfigs @@ -113,7 +114,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_DEFAULT, replicationFactor = 2, servers, servers.head.groupCoordinator.groupMetadataTopicConfigs) - createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword) + createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_USER, JaasTestUtils.KAFKA_SCRAM_PASSWORD) servers.head.config.listeners.foreach { endPoint => val listenerName = endPoint.listenerName @@ -176,7 +177,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT protected def addDynamicJaasSection(props: Properties, listener: String, mechanism: String, jaasSection: JaasSection): Unit = { val listenerName = new ListenerName(listener) val prefix = listenerName.saslMechanismConfigPrefix(mechanism) - val jaasConfig = jaasSection.modules.head.toString + val jaasConfig = jaasSection.getModules.get(0).toString props.put(s"${prefix}${SaslConfigs.SASL_JAAS_CONFIG}", jaasConfig) } diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala index 4f5bec4d527..9b2b3df668a 100644 --- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala @@ -18,7 +18,6 @@ package kafka.security.auth import java.nio.charset.StandardCharsets - import kafka.admin.ZkSecurityMigrator import kafka.server.QuorumTestHarness import kafka.utils.{Logging, TestUtils} @@ -33,6 +32,7 @@ import scala.util.{Failure, Success, Try} import javax.security.auth.login.Configuration import kafka.cluster.{Broker, EndPoint} import kafka.controller.ReplicaAssignment +import kafka.security.JaasTestUtils import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.Time @@ -43,7 +43,7 @@ import scala.jdk.CollectionConverters._ import scala.collection.Seq class ZkAuthorizationTest extends QuorumTestHarness with Logging { - val jaasFile = kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections) + val jaasFile = JaasTestUtils.writeJaasContextsToFile(JaasTestUtils.zkSections) val authProvider = "zookeeper.authProvider.1" @BeforeEach diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerWithZkSaslTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerWithZkSaslTest.scala index da79a3c100a..c68f710444f 100644 --- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerWithZkSaslTest.scala +++ b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerWithZkSaslTest.scala @@ -16,22 +16,16 @@ */ package kafka.security.authorizer -import java.net.InetAddress -import java.util -import java.util.UUID -import java.util.concurrent.{Executors, TimeUnit} - -import javax.security.auth.Subject -import javax.security.auth.callback.CallbackHandler import kafka.api.SaslSetup +import kafka.security.{JaasModule, JaasTestUtils} import kafka.server.{KafkaConfig, QuorumTestHarness} -import kafka.utils.JaasTestUtils.{JaasModule, JaasSection} -import kafka.utils.{JaasTestUtils, TestUtils} +import kafka.security.JaasTestUtils.JaasSection +import kafka.utils.TestUtils import kafka.zk.KafkaZkClient import kafka.zookeeper.ZooKeeperClient -import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter} import org.apache.kafka.common.acl.AclOperation.{READ, WRITE} import org.apache.kafka.common.acl.AclPermissionType.ALLOW +import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter} import org.apache.kafka.common.network.{ClientInformation, ListenerName} import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{RequestContext, RequestHeader} @@ -39,14 +33,19 @@ import org.apache.kafka.common.resource.PatternType.LITERAL import org.apache.kafka.common.resource.ResourcePattern import org.apache.kafka.common.resource.ResourceType.TOPIC import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} -import org.apache.kafka.test.{TestUtils => JTestUtils} import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST +import org.apache.kafka.test.{TestUtils => JTestUtils} import org.apache.zookeeper.server.auth.DigestLoginModule import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} +import java.net.InetAddress +import java.util +import java.util.UUID +import java.util.concurrent.{Executors, TimeUnit} +import javax.security.auth.Subject +import javax.security.auth.callback.CallbackHandler import scala.jdk.CollectionConverters._ -import scala.collection.Seq class AclAuthorizerWithZkSaslTest extends QuorumTestHarness with SaslSetup { @@ -67,9 +66,9 @@ class AclAuthorizerWithZkSaslTest extends QuorumTestHarness with SaslSetup { // Configure ZK SASL with TestableDigestLoginModule for clients to inject failures TestableDigestLoginModule.reset() val jaasSections = JaasTestUtils.zkSections - val serverJaas = jaasSections.filter(_.contextName == "Server") - val clientJaas = jaasSections.filter(_.contextName == "Client") - .map(section => new TestableJaasSection(section.contextName, section.modules)) + val serverJaas = jaasSections.asScala.filter(section => section.getContextName == "Server") + val clientJaas = jaasSections.asScala.filter(section => section.getContextName == "Client") + .map(section => new TestableJaasSection(section.getContextName, section.getModules)) startSasl(serverJaas ++ clientJaas) // Increase maxUpdateRetries to avoid transient failures @@ -180,7 +179,7 @@ class TestableDigestLoginModule extends DigestLoginModule { } } -class TestableJaasSection(contextName: String, modules: Seq[JaasModule]) extends JaasSection(contextName, modules) { +class TestableJaasSection(contextName: String, modules: util.List[JaasModule]) extends JaasSection(contextName, modules) { override def toString: String = { super.toString.replaceFirst(classOf[DigestLoginModule].getName, classOf[TestableDigestLoginModule].getName) } diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala index 6081a0fe3d0..bf134c5a4bc 100644 --- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala @@ -16,19 +16,18 @@ */ package kafka.server -import kafka.api.IntegrationTestHarness -import kafka.api.{KafkaSasl, SaslSetup} -import kafka.utils.{JaasTestUtils, TestUtils} +import kafka.api.{IntegrationTestHarness, KafkaSasl, SaslSetup} +import kafka.security.JaasTestUtils +import kafka.utils.TestUtils import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateDelegationTokenOptions, DescribeDelegationTokenOptions} -import org.apache.kafka.common.errors.InvalidPrincipalTypeException -import org.apache.kafka.common.errors.DelegationTokenNotFoundException +import org.apache.kafka.common.errors.{DelegationTokenNotFoundException, InvalidPrincipalTypeException} import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.SecurityUtils import org.apache.kafka.server.config.DelegationTokenManagerConfigs import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource -import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import java.util import scala.concurrent.ExecutionException @@ -52,7 +51,7 @@ class DelegationTokenRequestsTest extends IntegrationTestHarness with SaslSetup @BeforeEach override def setUp(testInfo: TestInfo): Unit = { - startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName)) + startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) super.setUp(testInfo) } diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala index 37f3e587605..ff662159178 100644 --- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala @@ -17,7 +17,8 @@ package kafka.server import kafka.api.{KafkaSasl, SaslSetup} -import kafka.utils.{JaasTestUtils, TestUtils} +import kafka.security.JaasTestUtils +import kafka.utils.TestUtils import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} import org.apache.kafka.common.errors.DelegationTokenDisabledException import org.apache.kafka.common.security.auth.SecurityProtocol @@ -41,7 +42,7 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest @BeforeEach override def setUp(testInfo: TestInfo): Unit = { - startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName)) + startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) super.setUp(testInfo) } diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala index 6feab4b4303..5f1e1465059 100644 --- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala @@ -16,14 +16,12 @@ */ package kafka.server -import java.net.Socket -import java.util.Collections import kafka.api.{KafkaSasl, SaslSetup} +import kafka.security.JaasTestUtils import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, kafkaServerSaslMechanisms} import kafka.test.annotation.{ClusterTemplate, Type} import kafka.test.junit.ClusterTestExtensions import kafka.test.{ClusterConfig, ClusterInstance} -import kafka.utils.JaasTestUtils import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.message.SaslHandshakeRequestData @@ -35,6 +33,8 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.{AfterEach, BeforeEach} +import java.net.Socket +import java.util.Collections import scala.jdk.CollectionConverters._ object SaslApiVersionsRequestTest { @@ -75,7 +75,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe @BeforeEach def setupSasl(): Unit = { sasl = new SaslSetup() {} - sasl.startSasl(sasl.jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName)) + sasl.startSasl(sasl.jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) } @ClusterTemplate("saslApiVersionsRequestClusterConfig") diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala deleted file mode 100644 index 4a24a8884c9..00000000000 --- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala +++ /dev/null @@ -1,303 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.utils - -import java.io.{BufferedWriter, File, FileWriter} -import java.util.Properties -import scala.collection.Seq -import org.apache.kafka.clients.admin.ScramMechanism -import org.apache.kafka.common.config.SaslConfigs -import org.apache.kafka.common.utils.Java - -object JaasTestUtils { - - case class Krb5LoginModule(useKeyTab: Boolean, - storeKey: Boolean, - keyTab: String, - principal: String, - debug: Boolean, - serviceName: Option[String], - isIbmSecurity: Boolean) extends JaasModule { - - def name = - if (isIbmSecurity) - "com.ibm.security.auth.module.Krb5LoginModule" - else - "com.sun.security.auth.module.Krb5LoginModule" - - def entries: Map[String, String] = - if (isIbmSecurity) - Map( - "principal" -> principal, - "credsType" -> "both" - ) ++ (if (useKeyTab) Map("useKeytab" -> s"file:$keyTab") else Map.empty) - else - Map( - "useKeyTab" -> useKeyTab.toString, - "storeKey" -> storeKey.toString, - "keyTab" -> keyTab, - "principal" -> principal - ) ++ serviceName.map(s => Map("serviceName" -> s)).getOrElse(Map.empty) - } - - case class PlainLoginModule(username: String, - password: String, - debug: Boolean = false, - validUsers: Map[String, String] = Map.empty) extends JaasModule { - - def name = "org.apache.kafka.common.security.plain.PlainLoginModule" - - def entries: Map[String, String] = Map( - "username" -> username, - "password" -> password - ) ++ validUsers.map { case (user, pass) => s"user_$user" -> pass } - - } - - case class ZkDigestModule(debug: Boolean = false, - entries: Map[String, String] = Map.empty) extends JaasModule { - def name = "org.apache.zookeeper.server.auth.DigestLoginModule" - } - - case class ScramLoginModule(username: String, - password: String, - debug: Boolean = false, - tokenProps: Map[String, String] = Map.empty) extends JaasModule { - - def name = "org.apache.kafka.common.security.scram.ScramLoginModule" - - def entries: Map[String, String] = Map( - "username" -> username, - "password" -> password - ) ++ tokenProps.map { case (name, value) => name -> value } - } - - case class OAuthBearerLoginModule(username: String, - debug: Boolean = false) extends JaasModule { - - def name = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule" - - def entries: Map[String, String] = Map( - "unsecuredLoginStringClaim_sub" -> username - ) - - } - - sealed trait JaasModule { - def name: String - def debug: Boolean - def entries: Map[String, String] - - override def toString: String = { - s"""$name required - | debug=$debug - | ${entries.map { case (k, v) => s"""$k="$v"""" }.mkString("", "\n| ", ";")} - |""".stripMargin - } - } - - case class JaasSection(contextName: String, modules: Seq[JaasModule]) { - override def toString: String = { - s"""|$contextName { - | ${modules.mkString("\n ")} - |}; - |""".stripMargin - } - } - - private val isIbmSecurity = Java.isIbmJdk && !Java.isIbmJdkSemeru - - private val ZkServerContextName = "Server" - private val ZkClientContextName = "Client" - private val ZkUserSuperPasswd = "adminpasswd" - private val ZkUser = "fpj" - private val ZkUserPassword = "fpjsecret" - - val KafkaServerContextName = "KafkaServer" - val KafkaServerPrincipalUnqualifiedName = "kafka" - private val KafkaServerPrincipal = KafkaServerPrincipalUnqualifiedName + "/localhost@EXAMPLE.COM" - val KafkaClientContextName = "KafkaClient" - val KafkaClientPrincipalUnqualifiedName = "client" - private val KafkaClientPrincipal = KafkaClientPrincipalUnqualifiedName + "@EXAMPLE.COM" - val KafkaClientPrincipalUnqualifiedName2 = "client2" - private val KafkaClientPrincipal2 = KafkaClientPrincipalUnqualifiedName2 + "@EXAMPLE.COM" - - val KafkaPlainUser = "plain-user" - private val KafkaPlainPassword = "plain-user-secret" - val KafkaPlainUser2 = "plain-user2" - val KafkaPlainPassword2 = "plain-user2-secret" - val KafkaPlainAdmin = "plain-admin" - private val KafkaPlainAdminPassword = "plain-admin-secret" - - val KafkaScramUser = "scram-user" - val KafkaScramPassword = "scram-user-secret" - val KafkaScramUser2 = "scram-user2" - val KafkaScramPassword2 = "scram-user2-secret" - val KafkaScramAdmin = "scram-admin" - val KafkaScramAdminPassword = "scram-admin-secret" - - val KafkaOAuthBearerUser = "oauthbearer-user" - val KafkaOAuthBearerUser2 = "oauthbearer-user2" - val KafkaOAuthBearerAdmin = "oauthbearer-admin" - - val serviceName = "kafka" - - def saslConfigs(saslProperties: Option[Properties]): Properties = { - val result = saslProperties.getOrElse(new Properties) - // IBM Kerberos module doesn't support the serviceName JAAS property, hence it needs to be - // passed as a Kafka property - if (isIbmSecurity && !result.contains(SaslConfigs.SASL_KERBEROS_SERVICE_NAME)) - result.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, serviceName) - result - } - - def writeJaasContextsToFile(jaasSections: Seq[JaasSection]): File = { - val jaasFile = TestUtils.tempFile() - writeToFile(jaasFile, jaasSections) - jaasFile - } - - // Returns a SASL/SCRAM configuration using credentials for the given user and password - def scramClientLoginModule(mechanism: String, scramUser: String, scramPassword: String): String = { - if (ScramMechanism.fromMechanismName(mechanism) == ScramMechanism.UNKNOWN) { - throw new IllegalArgumentException("Unsupported SCRAM mechanism " + mechanism) - } - ScramLoginModule( - scramUser, - scramPassword - ).toString - } - - // Returns the dynamic configuration, using credentials for user #1 - def clientLoginModule(mechanism: String, keytabLocation: Option[File], serviceName: String = serviceName): String = - kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword, KafkaOAuthBearerUser, serviceName).toString - - // Returns the dynamic configuration, using credentials for admin - def adminLoginModule(mechanism: String, keytabLocation: Option[File], serviceName: String = serviceName): String = - kafkaClientModule(mechanism, keytabLocation, KafkaServerPrincipal, KafkaPlainAdmin, KafkaPlainAdminPassword, - KafkaScramAdmin, KafkaScramAdminPassword, KafkaOAuthBearerAdmin, serviceName).toString - - def tokenClientLoginModule(tokenId: String, password: String): String = { - ScramLoginModule( - tokenId, - password, - debug = false, - Map( - "tokenauth" -> "true" - )).toString - } - - def zkSections: Seq[JaasSection] = Seq( - JaasSection(ZkServerContextName, Seq(ZkDigestModule(debug = false, - Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)))), - JaasSection(ZkClientContextName, Seq(ZkDigestModule(debug = false, - Map("username" -> ZkUser, "password" -> ZkUserPassword)))) - ) - - def kafkaServerSection(contextName: String, mechanisms: Seq[String], keytabLocation: Option[File]): JaasSection = { - val modules = mechanisms.map { - case "GSSAPI" => - Krb5LoginModule( - useKeyTab = true, - storeKey = true, - keyTab = keytabLocation.getOrElse(throw new IllegalArgumentException("Keytab location not specified for GSSAPI")).getAbsolutePath, - principal = KafkaServerPrincipal, - debug = true, - serviceName = Some(serviceName), - isIbmSecurity) - case "PLAIN" => - PlainLoginModule( - KafkaPlainAdmin, - KafkaPlainAdminPassword, - debug = false, - Map( - KafkaPlainAdmin -> KafkaPlainAdminPassword, - KafkaPlainUser -> KafkaPlainPassword, - KafkaPlainUser2 -> KafkaPlainPassword2 - )) - case "OAUTHBEARER" => - OAuthBearerLoginModule(KafkaOAuthBearerAdmin) - case mechanism => { - if (ScramMechanism.fromMechanismName(mechanism) != ScramMechanism.UNKNOWN) { - ScramLoginModule( - KafkaScramAdmin, - KafkaScramAdminPassword) - } else { - throw new IllegalArgumentException("Unsupported server mechanism " + mechanism) - } - } - } - JaasSection(contextName, modules) - } - - // consider refactoring if more mechanisms are added - private def kafkaClientModule(mechanism: String, - keytabLocation: Option[File], clientPrincipal: String, - plainUser: String, plainPassword: String, - scramUser: String, scramPassword: String, - oauthBearerUser: String, serviceName: String = serviceName): JaasModule = { - mechanism match { - case "GSSAPI" => - Krb5LoginModule( - useKeyTab = true, - storeKey = true, - keyTab = keytabLocation.getOrElse(throw new IllegalArgumentException("Keytab location not specified for GSSAPI")).getAbsolutePath, - principal = clientPrincipal, - debug = true, - serviceName = Some(serviceName), - isIbmSecurity - ) - case "PLAIN" => - PlainLoginModule( - plainUser, - plainPassword - ) - case "OAUTHBEARER" => - OAuthBearerLoginModule( - oauthBearerUser - ) - case mechanism => { - if (ScramMechanism.fromMechanismName(mechanism) != ScramMechanism.UNKNOWN) { - ScramLoginModule( - scramUser, - scramPassword - ) - } else { - throw new IllegalArgumentException("Unsupported client mechanism " + mechanism) - } - } - } - } - - /* - * Used for the static JAAS configuration and it uses the credentials for client#2 - */ - def kafkaClientSection(mechanism: Option[String], keytabLocation: Option[File]): JaasSection = { - JaasSection(KafkaClientContextName, mechanism.map(m => - kafkaClientModule(m, keytabLocation, KafkaClientPrincipal2, KafkaPlainUser2, KafkaPlainPassword2, KafkaScramUser2, KafkaScramPassword2, KafkaOAuthBearerUser2)).toSeq) - } - - private def jaasSectionsToString(jaasSections: Seq[JaasSection]): String = - jaasSections.mkString - - private def writeToFile(file: File, jaasSections: Seq[JaasSection]): Unit = { - val writer = new BufferedWriter(new FileWriter(file)) - try writer.write(jaasSectionsToString(jaasSections)) - finally writer.close() - } - -} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 3046682d16b..37784213ab7 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -21,6 +21,7 @@ import kafka.api._ import kafka.controller.ControllerEventManager import kafka.log._ import kafka.network.RequestChannel +import kafka.security.JaasTestUtils import kafka.server._ import kafka.server.checkpoints.OffsetCheckpointFile import kafka.server.metadata.{ConfigRepository, MockConfigRepository} @@ -42,7 +43,7 @@ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.network.{ClientInformation, ListenerName, ConnectionMode} +import org.apache.kafka.common.network.{ClientInformation, ConnectionMode, ListenerName} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record._ import org.apache.kafka.common.requests._ @@ -88,6 +89,7 @@ import scala.collection.{Map, Seq, mutable} import scala.concurrent.duration.FiniteDuration import scala.concurrent.{Await, ExecutionContext, Future} import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOption import scala.util.{Failure, Success, Try} /** @@ -360,7 +362,7 @@ object TestUtils extends Logging { props ++= sslConfigs(ConnectionMode.SERVER, false, trustStoreFile, s"server$nodeId") if (protocolAndPorts.exists { case (protocol, _) => usesSaslAuthentication(protocol) }) - props ++= JaasTestUtils.saslConfigs(saslProperties) + props ++= JaasTestUtils.saslConfigs(saslProperties.toJava) interBrokerSecurityProtocol.foreach { protocol => props.put(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG, protocol.name) @@ -670,7 +672,7 @@ object TestUtils extends Logging { } if (usesSaslAuthentication(securityProtocol)) - props ++= JaasTestUtils.saslConfigs(saslProperties) + props ++= JaasTestUtils.saslConfigs(saslProperties.toJava) props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name) props } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java index ed13dfffdb1..953609c1c83 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java @@ -18,7 +18,7 @@ package org.apache.kafka.tools.consumer.group; import kafka.api.AbstractSaslTest; import kafka.api.Both$; -import kafka.utils.JaasTestUtils; +import kafka.security.JaasTestUtils; import kafka.zk.ConfigEntityChangeNotificationZNode; import org.apache.kafka.clients.admin.Admin; @@ -91,20 +91,20 @@ public class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { super.configureSecurityBeforeServersStart(testInfo); zkClient().makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path()); // Create broker credentials before starting brokers - createScramCredentials(zkConnect(), JaasTestUtils.KafkaScramAdmin(), JaasTestUtils.KafkaScramAdminPassword()); + createScramCredentials(zkConnect(), JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD); } @Override public Admin createPrivilegedAdminClient() { return createAdminClient(bootstrapServers(listenerName()), securityProtocol(), trustStoreFile(), clientSaslProperties(), - KAFKA_CLIENT_SASL_MECHANISM, JaasTestUtils.KafkaScramAdmin(), JaasTestUtils.KafkaScramAdminPassword()); + KAFKA_CLIENT_SASL_MECHANISM, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD); } @BeforeEach @Override public void setUp(TestInfo testInfo) { startSasl(jaasSections(KAFKA_SERVER_SASL_MECHANISMS, Some$.MODULE$.apply(KAFKA_CLIENT_SASL_MECHANISM), Both$.MODULE$, - JaasTestUtils.KafkaServerContextName())); + JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)); super.setUp(testInfo); createTopic( TOPIC, @@ -136,7 +136,7 @@ public class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { @Test public void testConsumerGroupServiceWithAuthenticationSuccess() throws Exception { - createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser2(), JaasTestUtils.KafkaScramPassword2()); + createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KAFKA_SCRAM_USER_2, JaasTestUtils.KAFKA_SCRAM_PASSWORD_2); ConsumerGroupCommand.ConsumerGroupService consumerGroupService = prepareConsumerGroupService(); try (Consumer consumer = createConsumer()) { consumer.subscribe(Collections.singletonList(TOPIC));