KAFKA-15751, KAFKA-15752: Enable KRaft for BaseAdminIntegrationTest and SaslSslAdminIntegrationTest (#15175)

Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Gantigmaa Selenge 2024-06-17 18:38:52 +01:00 committed by GitHub
parent 1a7ba667ad
commit 166d9e8059
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 116 additions and 50 deletions

View File

@ -32,7 +32,9 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
import org.junit.jupiter.api.{AfterEach, BeforeEach , TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
import scala.collection.Seq
@ -68,8 +70,9 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
super.tearDown()
}
@Test
def testCreateDeleteTopics(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testCreateDeleteTopics(quorum: String): Unit = {
client = createAdminClient
val topics = Seq("mytopic", "mytopic2", "mytopic3")
val newTopics = Seq(
@ -160,8 +163,9 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
waitForTopics(client, List(), topics)
}
@Test
def testAuthorizedOperations(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testAuthorizedOperations(quorum: String): Unit = {
client = createAdminClient
// without includeAuthorizedOperations flag

View File

@ -14,7 +14,7 @@ package kafka.api
import kafka.security.authorizer.AclAuthorizer
import kafka.utils.TestUtils._
import kafka.utils.{JaasTestUtils, TestUtils}
import kafka.utils.{JaasTestUtils, TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.acl._
@ -28,9 +28,12 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST, WILDCARD_PRINCIPAL_STRING}
import org.apache.kafka.server.config.{ServerConfigs, ZkConfigs}
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.storage.internals.log.LogConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.util
import scala.collection.Seq
@ -42,19 +45,26 @@ import scala.util.{Failure, Success, Try}
@Timeout(120)
class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup {
val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
val aclAuthorizerClassName: String = classOf[AclAuthorizer].getName
def kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaServerPrincipalUnqualifiedName)
val zkAuthorizerClassName = classOf[AclAuthorizer].getName
val kraftAuthorizerClassName = classOf[StandardAuthorizer].getName
val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaServerPrincipalUnqualifiedName)
var superUserAdmin: Admin = _
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, aclAuthorizerClassName)
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString)
if (TestInfoUtils.isKRaft(testInfo)) {
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, kraftAuthorizerClassName)
this.controllerConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, kraftAuthorizerClassName)
// controllers talk to brokers as User:ANONYMOUS therefore it needs to be super user
this.serverConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString + ";" + KafkaPrincipal.ANONYMOUS.toString)
this.controllerConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString)
} else {
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, zkAuthorizerClassName)
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString)
}
setUpSasl()
super.setUp(testInfo)
@ -116,8 +126,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
val groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL),
new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW))
@Test
def testAclOperations(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testAclOperations(quorum: String): Unit = {
client = createAdminClient
val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
@ -137,8 +148,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
assertEquals(Set(acl3), results3.get(acl3.toFilter).get.values.asScala.map(_.binding).toSet)
}
@Test
def testAclOperations2(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testAclOperations2(quorum: String): Unit = {
client = createAdminClient
val results = client.createAcls(List(acl2, acl2, transactionalIdAcl).asJava)
assertEquals(Set(acl2, acl2, transactionalIdAcl), results.values.keySet.asScala)
@ -163,8 +175,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
waitForDescribeAcls(client, filterC, Set())
}
@Test
def testAclDescribe(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testAclDescribe(quorum: String): Unit = {
client = createAdminClient
ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl))
@ -190,8 +203,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
assertEquals(Set(anyAcl, acl2, fooAcl, prefixAcl), getAcls(allTopicAcls))
}
@Test
def testAclDelete(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testAclDelete(quorum: String): Unit = {
client = createAdminClient
ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl))
@ -201,47 +215,65 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
// Delete only ACLs on literal 'mytopic2' topic
var deleted = client.deleteAcls(List(acl2.toFilter).asJava).all().get().asScala.toSet
assertEquals(Set(acl2), deleted)
brokers.foreach { b =>
TestUtils.waitAndVerifyRemovedAcl(acl2.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl2.pattern())
}
assertEquals(Set(anyAcl, fooAcl, prefixAcl), getAcls(allTopicAcls))
ensureAcls(deleted)
// Delete only ACLs on literal '*' topic
deleted = client.deleteAcls(List(anyAcl.toFilter).asJava).all().get().asScala.toSet
assertEquals(Set(anyAcl), deleted)
brokers.foreach { b =>
TestUtils.waitAndVerifyRemovedAcl(anyAcl.entry(), b.dataPlaneRequestProcessor.authorizer.get, anyAcl.pattern())
}
assertEquals(Set(acl2, fooAcl, prefixAcl), getAcls(allTopicAcls))
ensureAcls(deleted)
// Delete only ACLs on specific prefixed 'mytopic' topics:
deleted = client.deleteAcls(List(prefixAcl.toFilter).asJava).all().get().asScala.toSet
assertEquals(Set(prefixAcl), deleted)
brokers.foreach { b =>
TestUtils.waitAndVerifyRemovedAcl(prefixAcl.entry(), b.dataPlaneRequestProcessor.authorizer.get, prefixAcl.pattern())
}
assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(allTopicAcls))
ensureAcls(deleted)
// Delete all literal ACLs:
deleted = client.deleteAcls(List(allLiteralTopicAcls).asJava).all().get().asScala.toSet
assertEquals(Set(anyAcl, acl2, fooAcl), deleted)
brokers.foreach { b =>
Set(anyAcl, acl2, fooAcl).foreach(acl =>
TestUtils.waitAndVerifyRemovedAcl(acl.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl.pattern())
)
}
assertEquals(Set(prefixAcl), getAcls(allTopicAcls))
ensureAcls(deleted)
// Delete all prefixed ACLs:
deleted = client.deleteAcls(List(allPrefixedTopicAcls).asJava).all().get().asScala.toSet
assertEquals(Set(prefixAcl), deleted)
brokers.foreach { b =>
TestUtils.waitAndVerifyRemovedAcl(prefixAcl.entry(), b.dataPlaneRequestProcessor.authorizer.get, prefixAcl.pattern())
}
assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(allTopicAcls))
ensureAcls(deleted)
// Delete all topic ACLs:
deleted = client.deleteAcls(List(allTopicAcls).asJava).all().get().asScala.toSet
brokers.foreach { b =>
Set(anyAcl, acl2, fooAcl, prefixAcl).foreach(acl =>
TestUtils.waitAndVerifyRemovedAcl(acl.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl.pattern())
)
}
assertEquals(Set(), getAcls(allTopicAcls))
}
//noinspection ScalaDeprecation - test explicitly covers clients using legacy / deprecated constructors
@Test
def testLegacyAclOpsNeverAffectOrReturnPrefixed(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testLegacyAclOpsNeverAffectOrReturnPrefixed(quorum: String): Unit = {
client = createAdminClient
ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) // <-- prefixed exists, but should never be returned.
@ -258,27 +290,36 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
// Delete only (legacy) ACLs on 'mytopic2' topic
var deleted = client.deleteAcls(List(legacyMyTopic2Acls).asJava).all().get().asScala.toSet
assertEquals(Set(acl2), deleted)
brokers.foreach { b =>
TestUtils.waitAndVerifyRemovedAcl(acl2.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl2.pattern())
}
assertEquals(Set(anyAcl, fooAcl, prefixAcl), getAcls(allTopicAcls))
ensureAcls(deleted)
// Delete only (legacy) ACLs on '*' topic
deleted = client.deleteAcls(List(legacyAnyTopicAcls).asJava).all().get().asScala.toSet
assertEquals(Set(anyAcl), deleted)
brokers.foreach { b =>
TestUtils.waitAndVerifyRemovedAcl(anyAcl.entry(), b.dataPlaneRequestProcessor.authorizer.get, anyAcl.pattern())
}
assertEquals(Set(acl2, fooAcl, prefixAcl), getAcls(allTopicAcls))
ensureAcls(deleted)
// Delete all (legacy) topic ACLs:
deleted = client.deleteAcls(List(legacyAllTopicAcls).asJava).all().get().asScala.toSet
assertEquals(Set(anyAcl, acl2, fooAcl), deleted)
brokers.foreach { b =>
Set(anyAcl, acl2, fooAcl).foreach(acl =>
TestUtils.waitAndVerifyRemovedAcl(acl.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl.pattern())
)
}
assertEquals(Set(), getAcls(legacyAllTopicAcls))
assertEquals(Set(prefixAcl), getAcls(allTopicAcls))
}
@Test
def testAttemptToCreateInvalidAcls(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testAttemptToCreateInvalidAcls(quorum: String): Unit = {
client = createAdminClient
val clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "foobar", PatternType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
@ -367,8 +408,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}, "timed out waiting for describeAcls to " + (if (expectAuth) "succeed" else "fail"))
}
@Test
def testAclAuthorizationDenied(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testAclAuthorizationDenied(quorum: String): Unit = {
client = createAdminClient
// Test that we cannot create or delete ACLs when ALTER is denied.
@ -416,8 +458,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}
}
@Test
def testCreateTopicsResponseMetadataAndConfig(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testCreateTopicsResponseMetadataAndConfig(quorum: String): Unit = {
val topic1 = "mytopic1"
val topic2 = "mytopic2"
val denyAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic2, PatternType.LITERAL),

View File

@ -42,6 +42,7 @@ object SslAdminIntegrationTest {
@volatile var lastUpdateRequestContext: Option[AuthorizableRequestContext] = None
val superuserCn = "super-user"
val serverUser = "server"
val clientCn = "client"
class TestableAclAuthorizer extends AclAuthorizer {
override def createAcls(requestContext: AuthorizableRequestContext,
@ -86,27 +87,34 @@ object SslAdminIntegrationTest {
class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
private val Pattern = "O=A (.*?),CN=(.*?)".r
// Use fields from DN as server principal to grant authorisation for servers and super admin client
// Use fields from DN as principal to grant appropriate permissions
override def build(context: AuthenticationContext): KafkaPrincipal = {
val peerPrincipal = context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName
peerPrincipal match {
case Pattern(name, cn) =>
val principal = if ((name == "server") || (cn == superuserCn)) "server" else KafkaPrincipal.ANONYMOUS.getName
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal)
case _ =>
KafkaPrincipal.ANONYMOUS
if (context.securityProtocol().equals(SecurityProtocol.PLAINTEXT)) {
KafkaPrincipal.ANONYMOUS
} else {
val peerPrincipal = context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName
peerPrincipal match {
case Pattern(name, cn) =>
val principal =
if ((name == serverUser) || (cn == superuserCn)) serverUser
else if (cn == clientCn) clientCn
else KafkaPrincipal.ANONYMOUS.getName
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal)
case _ =>
KafkaPrincipal.ANONYMOUS
}
}
}
}
}
class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
override val aclAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName
override val zkAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName
this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required")
this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[SslAdminIntegrationTest.TestPrincipalBuilder].getName)
override protected def securityProtocol = SecurityProtocol.SSL
override def kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SslAdminIntegrationTest.serverUser)
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SslAdminIntegrationTest.serverUser)
override def setUpSasl(): Unit = {
SslAdminIntegrationTest.semaphore = None
@ -213,11 +221,12 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
private def verifyAclUpdates(): Unit = {
val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SslAdminIntegrationTest.clientCn)
def validateRequestContext(context: AuthorizableRequestContext, apiKey: ApiKeys): Unit = {
assertEquals(SecurityProtocol.SSL, context.securityProtocol)
assertEquals("SSL", context.listenerName)
assertEquals(KafkaPrincipal.ANONYMOUS, context.principal)
assertEquals(clientPrincipal, context.principal)
assertEquals(apiKey.id.toInt, context.requestType)
assertEquals(apiKey.latestVersion.toInt, context.requestVersion)
assertTrue(context.correlationId > 0, s"Invalid correlation id: ${context.correlationId}")
@ -280,4 +289,12 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
props
}
// Override the CN to create a principal based on it
override def clientSecurityProps(certAlias: String): Properties = {
val props = TestUtils.securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, certAlias, SslAdminIntegrationTest.clientCn,
clientSaslProperties)
props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
props
}
}

View File

@ -38,7 +38,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, MetadataVersion}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
@ -366,6 +366,8 @@ abstract class QuorumTestHarness extends Logging {
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"CONTROLLER://localhost:0")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:0")
// Setting the configuration to the same value set on the brokers via TestUtils to keep KRaft based and Zk based controller configs are consistent.
props.setProperty(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000")
val config = new KafkaConfig(props)
val controllerQuorumVotersFuture = new CompletableFuture[util.Map[Integer, InetSocketAddress]]
val metaPropertiesEnsemble = new MetaPropertiesEnsemble.Loader().