KAFKA-18565 Cleanup SaslSetup (#18586)

Reviewers: Christo Lolov <lolovc@amazon.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2025-01-19 03:14:59 +08:00 committed by GitHub
parent a814e21da8
commit 029d9184c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 29 additions and 47 deletions

View File

@ -65,7 +65,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
@BeforeEach @BeforeEach
override def setUp(testInfo: TestInfo): Unit = { override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some("SCRAM-SHA-256"), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) startSasl(jaasSections(kafkaServerSaslMechanisms, Some("SCRAM-SHA-256"), JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
this.serverConfig.setProperty(QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, classOf[GroupedUserQuotaCallback].getName) this.serverConfig.setProperty(QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, classOf[GroupedUserQuotaCallback].getName)
this.serverConfig.setProperty(s"${listenerName.configPrefix}${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG}", this.serverConfig.setProperty(s"${listenerName.configPrefix}${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG}",
classOf[GroupedUserPrincipalBuilder].getName) classOf[GroupedUserPrincipalBuilder].getName)

View File

@ -117,7 +117,7 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
@BeforeEach @BeforeEach
override def setUp(testInfo: TestInfo): Unit = { override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), KafkaSasl)) startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism)))
super.setUp(testInfo) super.setUp(testInfo)
privilegedAdminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) privilegedAdminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
} }

View File

@ -111,7 +111,7 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
@BeforeEach @BeforeEach
override def setUp(testInfo: TestInfo): Unit = { override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
super.setUp(testInfo) super.setUp(testInfo)
TestUtils.waitUntilBrokerMetadataIsPropagated(servers) TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
client = Admin.create(createConfig()) client = Admin.create(createConfig())

View File

@ -65,7 +65,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
} }
this.consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic") this.consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic")
verifyNoRequestMetrics("Request metrics not removed in a previous test") verifyNoRequestMetrics("Request metrics not removed in a previous test")
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName)) startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), kafkaServerJaasEntryName))
super.setUp(testInfo) super.setUp(testInfo)
} }

View File

@ -75,7 +75,7 @@ class PlaintextEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
@BeforeEach @BeforeEach
override def setUp(testInfo: TestInfo): Unit = { override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(List.empty, None, KafkaSasl)) startSasl(jaasSections(List.empty, None))
super.setUp(testInfo) super.setUp(testInfo)
} }

View File

@ -73,7 +73,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
@BeforeEach @BeforeEach
override def setUp(testInfo: TestInfo): Unit = { override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism),
JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
val superuserLoginContext = jaasAdminLoginModule(kafkaClientSaslMechanism) val superuserLoginContext = jaasAdminLoginModule(kafkaClientSaslMechanism)
superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, superuserLoginContext) superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, superuserLoginContext)

View File

@ -39,7 +39,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
@BeforeEach @BeforeEach
override def setUp(testInfo: TestInfo): Unit = { override def setUp(testInfo: TestInfo): Unit = {
// create static config including client login context with credentials for JaasTestUtils 'client2' // create static config including client login context with credentials for JaasTestUtils 'client2'
startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both)) startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism)))
// set dynamic properties with credentials for JaasTestUtils 'client1' so that dynamic JAAS configuration is also // set dynamic properties with credentials for JaasTestUtils 'client1' so that dynamic JAAS configuration is also
// tested by this set of tests // tested by this set of tests
val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism) val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism)

View File

@ -34,7 +34,7 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup {
@BeforeEach @BeforeEach
override def setUp(testInfo: TestInfo): Unit = { override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism),
JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
super.setUp(testInfo) super.setUp(testInfo)
} }

View File

@ -34,7 +34,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
@BeforeEach @BeforeEach
override def setUp(testInfo: TestInfo): Unit = { override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, kafkaServerJaasEntryName)) startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), kafkaServerJaasEntryName))
super.setUp(testInfo) super.setUp(testInfo)
} }

View File

@ -22,7 +22,7 @@ class SaslPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
@BeforeEach @BeforeEach
override def setUp(testInfo: TestInfo): Unit = { override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
super.setUp(testInfo) super.setUp(testInfo)
} }

View File

@ -37,15 +37,6 @@ import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._ import scala.jdk.OptionConverters._
import scala.jdk.javaapi.OptionConverters import scala.jdk.javaapi.OptionConverters
/*
* Implements an enumeration for the modes enabled here:
* zk only, kafka only, both, custom KafkaServer.
*/
sealed trait SaslSetupMode
case object ZkSasl extends SaslSetupMode
case object KafkaSasl extends SaslSetupMode
case object Both extends SaslSetupMode
/* /*
* Trait used in SaslTestHarness and EndToEndAuthorizationTest to setup keytab and jaas files. * Trait used in SaslTestHarness and EndToEndAuthorizationTest to setup keytab and jaas files.
*/ */
@ -67,11 +58,6 @@ trait SaslSetup {
} }
writeJaasConfigurationToFile(jaasSections) writeJaasConfigurationToFile(jaasSections)
val hasZk = jaasSections.exists(_.getModules.asScala.exists(_.name() == "org.apache.zookeeper.server.auth.DigestLoginModule"))
if (hasZk)
System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
} }
protected def initializeKerberos(): Unit = { protected def initializeKerberos(): Unit = {
@ -93,11 +79,9 @@ trait SaslSetup {
} }
def jaasSections(kafkaServerSaslMechanisms: Seq[String], def jaasSections(kafkaServerSaslMechanisms: Seq[String],
kafkaClientSaslMechanism: Option[String], kafkaClientSaslMechanism: Option[String],
mode: SaslSetupMode = Both, kafkaServerEntryName: String = JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME): Seq[JaasSection] = {
kafkaServerEntryName: String = JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME): Seq[JaasSection] = { val hasKerberos = kafkaServerSaslMechanisms.contains("GSSAPI") || kafkaClientSaslMechanism.contains("GSSAPI")
val hasKerberos = mode != ZkSasl &&
(kafkaServerSaslMechanisms.contains("GSSAPI") || kafkaClientSaslMechanism.contains("GSSAPI"))
if (hasKerberos) if (hasKerberos)
maybeCreateEmptyKeytabFiles() maybeCreateEmptyKeytabFiles()
Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, kafkaServerSaslMechanisms.asJava, serverKeytabFile.toJava), Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, kafkaServerSaslMechanisms.asJava, serverKeytabFile.toJava),
@ -117,7 +101,6 @@ trait SaslSetup {
// Important if tests leak consumers, producers or brokers // Important if tests leak consumers, producers or brokers
LoginManager.closeAll() LoginManager.closeAll()
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
System.clearProperty("zookeeper.authProvider.1")
Configuration.setConfiguration(null) Configuration.setConfiguration(null)
} }

View File

@ -77,7 +77,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
} }
def setUpSasl(): Unit = { def setUpSasl(): Unit = {
startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
val loginContext = jaasAdminLoginModule("GSSAPI") val loginContext = jaasAdminLoginModule("GSSAPI")
superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, loginContext) superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, loginContext)

View File

@ -26,7 +26,7 @@ class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup {
@BeforeEach @BeforeEach
override def setUp(testInfo: TestInfo): Unit = { override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
super.setUp(testInfo) super.setUp(testInfo)
} }

View File

@ -127,7 +127,7 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
SslAdminIntegrationTest.executor = None SslAdminIntegrationTest.executor = None
SslAdminIntegrationTest.lastUpdateRequestContext = None SslAdminIntegrationTest.lastUpdateRequestContext = None
startSasl(jaasSections(List.empty, None, KafkaSasl)) startSasl(jaasSections(List.empty, None))
} }
override def createConfig: util.Map[String, Object] = { override def createConfig: util.Map[String, Object] = {

View File

@ -75,7 +75,7 @@ class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "server") override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "server")
@BeforeEach @BeforeEach
override def setUp(testInfo: TestInfo): Unit = { override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(List.empty, None, KafkaSasl)) startSasl(jaasSections(List.empty, None))
super.setUp(testInfo) super.setUp(testInfo)
} }

View File

@ -31,7 +31,7 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
@BeforeEach @BeforeEach
override def setUp(testInfo: TestInfo): Unit = { override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some("GSSAPI"), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) startSasl(jaasSections(kafkaServerSaslMechanisms, Some("GSSAPI"), JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
super.setUp(testInfo) super.setUp(testInfo)
quotaTestClients.alterClientQuotas( quotaTestClients.alterClientQuotas(
quotaTestClients.clientQuotaAlteration( quotaTestClients.clientQuotaAlteration(

View File

@ -23,7 +23,7 @@ import java.time.Duration
import java.util.{Collections, Properties} import java.util.{Collections, Properties}
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import javax.security.auth.login.LoginContext import javax.security.auth.login.LoginContext
import kafka.api.{Both, IntegrationTestHarness, SaslSetup} import kafka.api.{IntegrationTestHarness, SaslSetup}
import kafka.utils.{TestInfoUtils, TestUtils} import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
@ -64,7 +64,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
@BeforeEach @BeforeEach
override def setUp(testInfo: TestInfo): Unit = { override def setUp(testInfo: TestInfo): Unit = {
TestableKerberosLogin.reset() TestableKerberosLogin.reset()
startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both)) startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism)))
serverConfig.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required") serverConfig.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required")
serverConfig.put(SocketServerConfigs.FAILED_AUTHENTICATION_DELAY_MS_CONFIG, failedAuthenticationDelayMs.toString) serverConfig.put(SocketServerConfigs.FAILED_AUTHENTICATION_DELAY_MS_CONFIG, failedAuthenticationDelayMs.toString)
super.setUp(testInfo) super.setUp(testInfo)

View File

@ -19,13 +19,12 @@ package kafka.server
import java.util.Properties import java.util.Properties
import scala.collection.Seq import scala.collection.Seq
import kafka.api.Both
import kafka.security.JaasTestUtils.JaasSection import kafka.security.JaasTestUtils.JaasSection
class MultipleListenersWithDefaultJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest { class MultipleListenersWithDefaultJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest {
override def staticJaasSections: Seq[JaasSection] = override def staticJaasSections: Seq[JaasSection] =
jaasSections(kafkaServerSaslMechanisms.values.flatten.toSeq, Some(kafkaClientSaslMechanism), Both) jaasSections(kafkaServerSaslMechanisms.values.flatten.toSeq, Some(kafkaClientSaslMechanism))
override protected def dynamicJaasSections: Properties = new Properties override protected def dynamicJaasSections: Properties = new Properties

View File

@ -16,7 +16,7 @@
*/ */
package kafka.server package kafka.server
import kafka.api.{IntegrationTestHarness, KafkaSasl, SaslSetup} import kafka.api.{IntegrationTestHarness, SaslSetup}
import kafka.security.JaasTestUtils import kafka.security.JaasTestUtils
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateDelegationTokenOptions, DescribeDelegationTokenOptions} import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateDelegationTokenOptions, DescribeDelegationTokenOptions}
@ -52,7 +52,7 @@ class DelegationTokenRequestsTest extends IntegrationTestHarness with SaslSetup
@BeforeEach @BeforeEach
override def setUp(testInfo: TestInfo): Unit = { override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
super.setUp(testInfo) super.setUp(testInfo)
} }

View File

@ -16,7 +16,7 @@
*/ */
package kafka.server package kafka.server
import kafka.api.{KafkaSasl, SaslSetup} import kafka.api.SaslSetup
import kafka.security.JaasTestUtils import kafka.security.JaasTestUtils
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.errors.DelegationTokenDisabledException import org.apache.kafka.common.errors.DelegationTokenDisabledException
@ -42,7 +42,7 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest
@BeforeEach @BeforeEach
override def setUp(testInfo: TestInfo): Unit = { override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
super.setUp(testInfo) super.setUp(testInfo)
} }

View File

@ -16,7 +16,7 @@
*/ */
package kafka.server package kafka.server
import kafka.api.{KafkaSasl, SaslSetup} import kafka.api.SaslSetup
import kafka.security.JaasTestUtils import kafka.security.JaasTestUtils
import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, kafkaServerSaslMechanisms} import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, kafkaServerSaslMechanisms}
import org.apache.kafka.common.test.api.{ClusterTemplate, Type, ClusterTestExtensions, ClusterConfig, ClusterInstance} import org.apache.kafka.common.test.api.{ClusterTemplate, Type, ClusterTestExtensions, ClusterConfig, ClusterInstance}
@ -73,7 +73,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
@BeforeEach @BeforeEach
def setupSasl(): Unit = { def setupSasl(): Unit = {
sasl = new SaslSetup() {} sasl = new SaslSetup() {}
sasl.startSasl(sasl.jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) sasl.startSasl(sasl.jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
} }
@ClusterTemplate("saslApiVersionsRequestClusterConfig") @ClusterTemplate("saslApiVersionsRequestClusterConfig")

View File

@ -17,7 +17,6 @@
package org.apache.kafka.tools.consumer.group; package org.apache.kafka.tools.consumer.group;
import kafka.api.AbstractSaslTest; import kafka.api.AbstractSaslTest;
import kafka.api.Both$;
import kafka.security.JaasTestUtils; import kafka.security.JaasTestUtils;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
@ -113,7 +112,8 @@ public class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
@BeforeEach @BeforeEach
@Override @Override
public void setUp(TestInfo testInfo) { public void setUp(TestInfo testInfo) {
startSasl(jaasSections(KAFKA_SERVER_SASL_MECHANISMS, Some$.MODULE$.apply(KAFKA_CLIENT_SASL_MECHANISM), Both$.MODULE$, startSasl(jaasSections(KAFKA_SERVER_SASL_MECHANISMS,
Some$.MODULE$.apply(KAFKA_CLIENT_SASL_MECHANISM),
JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)); JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME));
String superuserLoginContext = jaasAdminLoginModule(KAFKA_CLIENT_SASL_MECHANISM, Option.empty()); String superuserLoginContext = jaasAdminLoginModule(KAFKA_CLIENT_SASL_MECHANISM, Option.empty());
this.superuserClientConfig().put(SaslConfigs.SASL_JAAS_CONFIG, superuserLoginContext); this.superuserClientConfig().put(SaslConfigs.SASL_JAAS_CONFIG, superuserLoginContext);