diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index c6dd0150e25..1a516336745 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -32,7 +32,9 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, ServerLogConfigs} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout} +import org.junit.jupiter.api.{AfterEach, BeforeEach , TestInfo, Timeout} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.jdk.CollectionConverters._ import scala.collection.Seq @@ -68,8 +70,9 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg super.tearDown() } - @Test - def testCreateDeleteTopics(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testCreateDeleteTopics(quorum: String): Unit = { client = createAdminClient val topics = Seq("mytopic", "mytopic2", "mytopic3") val newTopics = Seq( @@ -160,8 +163,9 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg waitForTopics(client, List(), topics) } - @Test - def testAuthorizedOperations(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testAuthorizedOperations(quorum: String): Unit = { client = createAdminClient // without includeAuthorizedOperations flag diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala index 1fc4c3e152a..642bb4b69b1 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala @@ -14,7 +14,7 @@ package kafka.api import kafka.security.authorizer.AclAuthorizer import kafka.utils.TestUtils._ -import kafka.utils.{JaasTestUtils, TestUtils} +import kafka.utils.{JaasTestUtils, TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.common.Uuid import org.apache.kafka.common.acl._ @@ -28,9 +28,12 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST, WILDCARD_PRINCIPAL_STRING} import org.apache.kafka.server.config.{ServerConfigs, ZkConfigs} +import org.apache.kafka.metadata.authorizer.StandardAuthorizer import org.apache.kafka.storage.internals.log.LogConfig import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout} +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import java.util import scala.collection.Seq @@ -42,19 +45,26 @@ import scala.util.{Failure, Success, Try} @Timeout(120) class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup { val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL) - - val aclAuthorizerClassName: String = classOf[AclAuthorizer].getName - def kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaServerPrincipalUnqualifiedName) - + val zkAuthorizerClassName = classOf[AclAuthorizer].getName + val kraftAuthorizerClassName = classOf[StandardAuthorizer].getName + val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaServerPrincipalUnqualifiedName) var superUserAdmin: Admin = _ override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) @BeforeEach override def setUp(testInfo: TestInfo): Unit = { - this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, aclAuthorizerClassName) - this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true") - this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString) + if (TestInfoUtils.isKRaft(testInfo)) { + this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, kraftAuthorizerClassName) + this.controllerConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, kraftAuthorizerClassName) + // controllers talk to brokers as User:ANONYMOUS therefore it needs to be super user + this.serverConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString + ";" + KafkaPrincipal.ANONYMOUS.toString) + this.controllerConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString) + } else { + this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, zkAuthorizerClassName) + this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true") + this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString) + } setUpSasl() super.setUp(testInfo) @@ -116,8 +126,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu val groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW)) - @Test - def testAclOperations(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testAclOperations(quorum: String): Unit = { client = createAdminClient val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) @@ -137,8 +148,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu assertEquals(Set(acl3), results3.get(acl3.toFilter).get.values.asScala.map(_.binding).toSet) } - @Test - def testAclOperations2(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testAclOperations2(quorum: String): Unit = { client = createAdminClient val results = client.createAcls(List(acl2, acl2, transactionalIdAcl).asJava) assertEquals(Set(acl2, acl2, transactionalIdAcl), results.values.keySet.asScala) @@ -163,8 +175,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu waitForDescribeAcls(client, filterC, Set()) } - @Test - def testAclDescribe(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testAclDescribe(quorum: String): Unit = { client = createAdminClient ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) @@ -190,8 +203,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu assertEquals(Set(anyAcl, acl2, fooAcl, prefixAcl), getAcls(allTopicAcls)) } - @Test - def testAclDelete(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testAclDelete(quorum: String): Unit = { client = createAdminClient ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) @@ -201,47 +215,65 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu // Delete only ACLs on literal 'mytopic2' topic var deleted = client.deleteAcls(List(acl2.toFilter).asJava).all().get().asScala.toSet - assertEquals(Set(acl2), deleted) + brokers.foreach { b => + TestUtils.waitAndVerifyRemovedAcl(acl2.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl2.pattern()) + } assertEquals(Set(anyAcl, fooAcl, prefixAcl), getAcls(allTopicAcls)) ensureAcls(deleted) // Delete only ACLs on literal '*' topic deleted = client.deleteAcls(List(anyAcl.toFilter).asJava).all().get().asScala.toSet - assertEquals(Set(anyAcl), deleted) + brokers.foreach { b => + TestUtils.waitAndVerifyRemovedAcl(anyAcl.entry(), b.dataPlaneRequestProcessor.authorizer.get, anyAcl.pattern()) + } assertEquals(Set(acl2, fooAcl, prefixAcl), getAcls(allTopicAcls)) ensureAcls(deleted) // Delete only ACLs on specific prefixed 'mytopic' topics: deleted = client.deleteAcls(List(prefixAcl.toFilter).asJava).all().get().asScala.toSet - assertEquals(Set(prefixAcl), deleted) + brokers.foreach { b => + TestUtils.waitAndVerifyRemovedAcl(prefixAcl.entry(), b.dataPlaneRequestProcessor.authorizer.get, prefixAcl.pattern()) + } assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(allTopicAcls)) ensureAcls(deleted) // Delete all literal ACLs: deleted = client.deleteAcls(List(allLiteralTopicAcls).asJava).all().get().asScala.toSet - assertEquals(Set(anyAcl, acl2, fooAcl), deleted) + brokers.foreach { b => + Set(anyAcl, acl2, fooAcl).foreach(acl => + TestUtils.waitAndVerifyRemovedAcl(acl.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl.pattern()) + ) + } assertEquals(Set(prefixAcl), getAcls(allTopicAcls)) ensureAcls(deleted) // Delete all prefixed ACLs: deleted = client.deleteAcls(List(allPrefixedTopicAcls).asJava).all().get().asScala.toSet - assertEquals(Set(prefixAcl), deleted) + brokers.foreach { b => + TestUtils.waitAndVerifyRemovedAcl(prefixAcl.entry(), b.dataPlaneRequestProcessor.authorizer.get, prefixAcl.pattern()) + } assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(allTopicAcls)) ensureAcls(deleted) // Delete all topic ACLs: deleted = client.deleteAcls(List(allTopicAcls).asJava).all().get().asScala.toSet + brokers.foreach { b => + Set(anyAcl, acl2, fooAcl, prefixAcl).foreach(acl => + TestUtils.waitAndVerifyRemovedAcl(acl.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl.pattern()) + ) + } assertEquals(Set(), getAcls(allTopicAcls)) } //noinspection ScalaDeprecation - test explicitly covers clients using legacy / deprecated constructors - @Test - def testLegacyAclOpsNeverAffectOrReturnPrefixed(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testLegacyAclOpsNeverAffectOrReturnPrefixed(quorum: String): Unit = { client = createAdminClient ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) // <-- prefixed exists, but should never be returned. @@ -258,27 +290,36 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu // Delete only (legacy) ACLs on 'mytopic2' topic var deleted = client.deleteAcls(List(legacyMyTopic2Acls).asJava).all().get().asScala.toSet - assertEquals(Set(acl2), deleted) + brokers.foreach { b => + TestUtils.waitAndVerifyRemovedAcl(acl2.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl2.pattern()) + } assertEquals(Set(anyAcl, fooAcl, prefixAcl), getAcls(allTopicAcls)) ensureAcls(deleted) // Delete only (legacy) ACLs on '*' topic deleted = client.deleteAcls(List(legacyAnyTopicAcls).asJava).all().get().asScala.toSet - assertEquals(Set(anyAcl), deleted) + brokers.foreach { b => + TestUtils.waitAndVerifyRemovedAcl(anyAcl.entry(), b.dataPlaneRequestProcessor.authorizer.get, anyAcl.pattern()) + } assertEquals(Set(acl2, fooAcl, prefixAcl), getAcls(allTopicAcls)) ensureAcls(deleted) // Delete all (legacy) topic ACLs: deleted = client.deleteAcls(List(legacyAllTopicAcls).asJava).all().get().asScala.toSet - assertEquals(Set(anyAcl, acl2, fooAcl), deleted) + brokers.foreach { b => + Set(anyAcl, acl2, fooAcl).foreach(acl => + TestUtils.waitAndVerifyRemovedAcl(acl.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl.pattern()) + ) + } assertEquals(Set(), getAcls(legacyAllTopicAcls)) assertEquals(Set(prefixAcl), getAcls(allTopicAcls)) } - @Test - def testAttemptToCreateInvalidAcls(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testAttemptToCreateInvalidAcls(quorum: String): Unit = { client = createAdminClient val clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "foobar", PatternType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) @@ -367,8 +408,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu }, "timed out waiting for describeAcls to " + (if (expectAuth) "succeed" else "fail")) } - @Test - def testAclAuthorizationDenied(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testAclAuthorizationDenied(quorum: String): Unit = { client = createAdminClient // Test that we cannot create or delete ACLs when ALTER is denied. @@ -416,8 +458,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu } } - @Test - def testCreateTopicsResponseMetadataAndConfig(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testCreateTopicsResponseMetadataAndConfig(quorum: String): Unit = { val topic1 = "mytopic1" val topic2 = "mytopic2" val denyAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic2, PatternType.LITERAL), diff --git a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala index 65906032ca4..58c131556d7 100644 --- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala @@ -42,6 +42,7 @@ object SslAdminIntegrationTest { @volatile var lastUpdateRequestContext: Option[AuthorizableRequestContext] = None val superuserCn = "super-user" val serverUser = "server" + val clientCn = "client" class TestableAclAuthorizer extends AclAuthorizer { override def createAcls(requestContext: AuthorizableRequestContext, @@ -86,27 +87,34 @@ object SslAdminIntegrationTest { class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { private val Pattern = "O=A (.*?),CN=(.*?)".r - // Use fields from DN as server principal to grant authorisation for servers and super admin client + // Use fields from DN as principal to grant appropriate permissions override def build(context: AuthenticationContext): KafkaPrincipal = { - val peerPrincipal = context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName - peerPrincipal match { - case Pattern(name, cn) => - val principal = if ((name == "server") || (cn == superuserCn)) "server" else KafkaPrincipal.ANONYMOUS.getName - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal) - case _ => - KafkaPrincipal.ANONYMOUS + if (context.securityProtocol().equals(SecurityProtocol.PLAINTEXT)) { + KafkaPrincipal.ANONYMOUS + } else { + val peerPrincipal = context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName + peerPrincipal match { + case Pattern(name, cn) => + val principal = + if ((name == serverUser) || (cn == superuserCn)) serverUser + else if (cn == clientCn) clientCn + else KafkaPrincipal.ANONYMOUS.getName + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal) + case _ => + KafkaPrincipal.ANONYMOUS + } } } } } class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { - override val aclAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName + override val zkAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required") this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[SslAdminIntegrationTest.TestPrincipalBuilder].getName) override protected def securityProtocol = SecurityProtocol.SSL - override def kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SslAdminIntegrationTest.serverUser) + override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SslAdminIntegrationTest.serverUser) override def setUpSasl(): Unit = { SslAdminIntegrationTest.semaphore = None @@ -213,11 +221,12 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { private def verifyAclUpdates(): Unit = { val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) + val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SslAdminIntegrationTest.clientCn) def validateRequestContext(context: AuthorizableRequestContext, apiKey: ApiKeys): Unit = { assertEquals(SecurityProtocol.SSL, context.securityProtocol) assertEquals("SSL", context.listenerName) - assertEquals(KafkaPrincipal.ANONYMOUS, context.principal) + assertEquals(clientPrincipal, context.principal) assertEquals(apiKey.id.toInt, context.requestType) assertEquals(apiKey.latestVersion.toInt, context.requestVersion) assertTrue(context.correlationId > 0, s"Invalid correlation id: ${context.correlationId}") @@ -280,4 +289,12 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG) props } + + // Override the CN to create a principal based on it + override def clientSecurityProps(certAlias: String): Properties = { + val props = TestUtils.securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, certAlias, SslAdminIntegrationTest.clientCn, + clientSaslProperties) + props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG) + props + } } diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index 82b5b4cfd1e..ac6a8bcbd41 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -38,7 +38,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, MetadataVersion} -import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs} +import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler} import org.apache.zookeeper.client.ZKClientConfig import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper} @@ -366,6 +366,8 @@ abstract class QuorumTestHarness extends Logging { props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"CONTROLLER://localhost:0") props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:0") + // Setting the configuration to the same value set on the brokers via TestUtils to keep KRaft based and Zk based controller configs are consistent. + props.setProperty(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000") val config = new KafkaConfig(props) val controllerQuorumVotersFuture = new CompletableFuture[util.Map[Integer, InetSocketAddress]] val metaPropertiesEnsemble = new MetaPropertiesEnsemble.Loader().