mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-15751, KAFKA-15752: Enable KRaft for BaseAdminIntegrationTest and SaslSslAdminIntegrationTest (#15175)
Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
		
							parent
							
								
									1a7ba667ad
								
							
						
					
					
						commit
						166d9e8059
					
				|  | @ -32,7 +32,9 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig | ||||||
| import org.apache.kafka.security.authorizer.AclEntry | import org.apache.kafka.security.authorizer.AclEntry | ||||||
| import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, ServerLogConfigs} | import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, ServerLogConfigs} | ||||||
| import org.junit.jupiter.api.Assertions._ | import org.junit.jupiter.api.Assertions._ | ||||||
| import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout} | import org.junit.jupiter.api.{AfterEach, BeforeEach , TestInfo, Timeout} | ||||||
|  | import org.junit.jupiter.params.ParameterizedTest | ||||||
|  | import org.junit.jupiter.params.provider.ValueSource | ||||||
| 
 | 
 | ||||||
| import scala.jdk.CollectionConverters._ | import scala.jdk.CollectionConverters._ | ||||||
| import scala.collection.Seq | import scala.collection.Seq | ||||||
|  | @ -68,8 +70,9 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg | ||||||
|     super.tearDown() |     super.tearDown() | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   @Test |   @ParameterizedTest | ||||||
|   def testCreateDeleteTopics(): Unit = { |   @ValueSource(strings = Array("zk", "kraft")) | ||||||
|  |   def testCreateDeleteTopics(quorum: String): Unit = { | ||||||
|     client = createAdminClient |     client = createAdminClient | ||||||
|     val topics = Seq("mytopic", "mytopic2", "mytopic3") |     val topics = Seq("mytopic", "mytopic2", "mytopic3") | ||||||
|     val newTopics = Seq( |     val newTopics = Seq( | ||||||
|  | @ -160,8 +163,9 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg | ||||||
|     waitForTopics(client, List(), topics) |     waitForTopics(client, List(), topics) | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   @Test |   @ParameterizedTest | ||||||
|   def testAuthorizedOperations(): Unit = { |   @ValueSource(strings = Array("zk", "kraft")) | ||||||
|  |   def testAuthorizedOperations(quorum: String): Unit = { | ||||||
|     client = createAdminClient |     client = createAdminClient | ||||||
| 
 | 
 | ||||||
|     // without includeAuthorizedOperations flag |     // without includeAuthorizedOperations flag | ||||||
|  |  | ||||||
|  | @ -14,7 +14,7 @@ package kafka.api | ||||||
| 
 | 
 | ||||||
| import kafka.security.authorizer.AclAuthorizer | import kafka.security.authorizer.AclAuthorizer | ||||||
| import kafka.utils.TestUtils._ | import kafka.utils.TestUtils._ | ||||||
| import kafka.utils.{JaasTestUtils, TestUtils} | import kafka.utils.{JaasTestUtils, TestInfoUtils, TestUtils} | ||||||
| import org.apache.kafka.clients.admin._ | import org.apache.kafka.clients.admin._ | ||||||
| import org.apache.kafka.common.Uuid | import org.apache.kafka.common.Uuid | ||||||
| import org.apache.kafka.common.acl._ | import org.apache.kafka.common.acl._ | ||||||
|  | @ -28,9 +28,12 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, | ||||||
| import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} | import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} | ||||||
| import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST, WILDCARD_PRINCIPAL_STRING} | import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST, WILDCARD_PRINCIPAL_STRING} | ||||||
| import org.apache.kafka.server.config.{ServerConfigs, ZkConfigs} | import org.apache.kafka.server.config.{ServerConfigs, ZkConfigs} | ||||||
|  | import org.apache.kafka.metadata.authorizer.StandardAuthorizer | ||||||
| import org.apache.kafka.storage.internals.log.LogConfig | import org.apache.kafka.storage.internals.log.LogConfig | ||||||
| import org.junit.jupiter.api.Assertions._ | import org.junit.jupiter.api.Assertions._ | ||||||
| import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout} | import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} | ||||||
|  | import org.junit.jupiter.params.ParameterizedTest | ||||||
|  | import org.junit.jupiter.params.provider.ValueSource | ||||||
| 
 | 
 | ||||||
| import java.util | import java.util | ||||||
| import scala.collection.Seq | import scala.collection.Seq | ||||||
|  | @ -42,19 +45,26 @@ import scala.util.{Failure, Success, Try} | ||||||
| @Timeout(120) | @Timeout(120) | ||||||
| class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup { | class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup { | ||||||
|   val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL) |   val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL) | ||||||
| 
 |   val zkAuthorizerClassName = classOf[AclAuthorizer].getName | ||||||
|   val aclAuthorizerClassName: String = classOf[AclAuthorizer].getName |   val kraftAuthorizerClassName = classOf[StandardAuthorizer].getName | ||||||
|   def kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaServerPrincipalUnqualifiedName) |   val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaServerPrincipalUnqualifiedName) | ||||||
| 
 |  | ||||||
|   var superUserAdmin: Admin = _ |   var superUserAdmin: Admin = _ | ||||||
|   override protected def securityProtocol = SecurityProtocol.SASL_SSL |   override protected def securityProtocol = SecurityProtocol.SASL_SSL | ||||||
|   override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) |   override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) | ||||||
| 
 | 
 | ||||||
|   @BeforeEach |   @BeforeEach | ||||||
|   override def setUp(testInfo: TestInfo): Unit = { |   override def setUp(testInfo: TestInfo): Unit = { | ||||||
|     this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, aclAuthorizerClassName) |     if (TestInfoUtils.isKRaft(testInfo)) { | ||||||
|     this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true") |       this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, kraftAuthorizerClassName) | ||||||
|     this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString) |       this.controllerConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, kraftAuthorizerClassName) | ||||||
|  |       // controllers talk to brokers as User:ANONYMOUS therefore it needs to be super user | ||||||
|  |       this.serverConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString + ";" + KafkaPrincipal.ANONYMOUS.toString) | ||||||
|  |       this.controllerConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString) | ||||||
|  |     } else { | ||||||
|  |       this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, zkAuthorizerClassName) | ||||||
|  |       this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true") | ||||||
|  |       this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString) | ||||||
|  |     } | ||||||
| 
 | 
 | ||||||
|     setUpSasl() |     setUpSasl() | ||||||
|     super.setUp(testInfo) |     super.setUp(testInfo) | ||||||
|  | @ -116,8 +126,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu | ||||||
|   val groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), |   val groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), | ||||||
|     new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW)) |     new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW)) | ||||||
| 
 | 
 | ||||||
|   @Test |   @ParameterizedTest | ||||||
|   def testAclOperations(): Unit = { |   @ValueSource(strings = Array("zk", "kraft")) | ||||||
|  |   def testAclOperations(quorum: String): Unit = { | ||||||
|     client = createAdminClient |     client = createAdminClient | ||||||
|     val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL), |     val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL), | ||||||
|       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) |       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) | ||||||
|  | @ -137,8 +148,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu | ||||||
|     assertEquals(Set(acl3), results3.get(acl3.toFilter).get.values.asScala.map(_.binding).toSet) |     assertEquals(Set(acl3), results3.get(acl3.toFilter).get.values.asScala.map(_.binding).toSet) | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   @Test |   @ParameterizedTest | ||||||
|   def testAclOperations2(): Unit = { |   @ValueSource(strings = Array("zk", "kraft")) | ||||||
|  |   def testAclOperations2(quorum: String): Unit = { | ||||||
|     client = createAdminClient |     client = createAdminClient | ||||||
|     val results = client.createAcls(List(acl2, acl2, transactionalIdAcl).asJava) |     val results = client.createAcls(List(acl2, acl2, transactionalIdAcl).asJava) | ||||||
|     assertEquals(Set(acl2, acl2, transactionalIdAcl), results.values.keySet.asScala) |     assertEquals(Set(acl2, acl2, transactionalIdAcl), results.values.keySet.asScala) | ||||||
|  | @ -163,8 +175,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu | ||||||
|     waitForDescribeAcls(client, filterC, Set()) |     waitForDescribeAcls(client, filterC, Set()) | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   @Test |   @ParameterizedTest | ||||||
|   def testAclDescribe(): Unit = { |   @ValueSource(strings = Array("zk", "kraft")) | ||||||
|  |   def testAclDescribe(quorum: String): Unit = { | ||||||
|     client = createAdminClient |     client = createAdminClient | ||||||
|     ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) |     ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) | ||||||
| 
 | 
 | ||||||
|  | @ -190,8 +203,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu | ||||||
|     assertEquals(Set(anyAcl, acl2, fooAcl, prefixAcl), getAcls(allTopicAcls)) |     assertEquals(Set(anyAcl, acl2, fooAcl, prefixAcl), getAcls(allTopicAcls)) | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   @Test |   @ParameterizedTest | ||||||
|   def testAclDelete(): Unit = { |   @ValueSource(strings = Array("zk", "kraft")) | ||||||
|  |   def testAclDelete(quorum: String): Unit = { | ||||||
|     client = createAdminClient |     client = createAdminClient | ||||||
|     ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) |     ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) | ||||||
| 
 | 
 | ||||||
|  | @ -201,47 +215,65 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu | ||||||
| 
 | 
 | ||||||
|     // Delete only ACLs on literal 'mytopic2' topic |     // Delete only ACLs on literal 'mytopic2' topic | ||||||
|     var deleted = client.deleteAcls(List(acl2.toFilter).asJava).all().get().asScala.toSet |     var deleted = client.deleteAcls(List(acl2.toFilter).asJava).all().get().asScala.toSet | ||||||
|     assertEquals(Set(acl2), deleted) |     brokers.foreach { b => | ||||||
|  |       TestUtils.waitAndVerifyRemovedAcl(acl2.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl2.pattern()) | ||||||
|  |     } | ||||||
|     assertEquals(Set(anyAcl, fooAcl, prefixAcl), getAcls(allTopicAcls)) |     assertEquals(Set(anyAcl, fooAcl, prefixAcl), getAcls(allTopicAcls)) | ||||||
| 
 | 
 | ||||||
|     ensureAcls(deleted) |     ensureAcls(deleted) | ||||||
| 
 | 
 | ||||||
|     // Delete only ACLs on literal '*' topic |     // Delete only ACLs on literal '*' topic | ||||||
|     deleted = client.deleteAcls(List(anyAcl.toFilter).asJava).all().get().asScala.toSet |     deleted = client.deleteAcls(List(anyAcl.toFilter).asJava).all().get().asScala.toSet | ||||||
|     assertEquals(Set(anyAcl), deleted) |     brokers.foreach { b => | ||||||
|  |       TestUtils.waitAndVerifyRemovedAcl(anyAcl.entry(), b.dataPlaneRequestProcessor.authorizer.get, anyAcl.pattern()) | ||||||
|  |     } | ||||||
|     assertEquals(Set(acl2, fooAcl, prefixAcl), getAcls(allTopicAcls)) |     assertEquals(Set(acl2, fooAcl, prefixAcl), getAcls(allTopicAcls)) | ||||||
| 
 | 
 | ||||||
|     ensureAcls(deleted) |     ensureAcls(deleted) | ||||||
| 
 | 
 | ||||||
|     // Delete only ACLs on specific prefixed 'mytopic' topics: |     // Delete only ACLs on specific prefixed 'mytopic' topics: | ||||||
|     deleted = client.deleteAcls(List(prefixAcl.toFilter).asJava).all().get().asScala.toSet |     deleted = client.deleteAcls(List(prefixAcl.toFilter).asJava).all().get().asScala.toSet | ||||||
|     assertEquals(Set(prefixAcl), deleted) |     brokers.foreach { b => | ||||||
|  |       TestUtils.waitAndVerifyRemovedAcl(prefixAcl.entry(), b.dataPlaneRequestProcessor.authorizer.get, prefixAcl.pattern()) | ||||||
|  |     } | ||||||
|     assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(allTopicAcls)) |     assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(allTopicAcls)) | ||||||
| 
 | 
 | ||||||
|     ensureAcls(deleted) |     ensureAcls(deleted) | ||||||
| 
 | 
 | ||||||
|     // Delete all literal ACLs: |     // Delete all literal ACLs: | ||||||
|     deleted = client.deleteAcls(List(allLiteralTopicAcls).asJava).all().get().asScala.toSet |     deleted = client.deleteAcls(List(allLiteralTopicAcls).asJava).all().get().asScala.toSet | ||||||
|     assertEquals(Set(anyAcl, acl2, fooAcl), deleted) |     brokers.foreach { b => | ||||||
|  |       Set(anyAcl, acl2, fooAcl).foreach(acl => | ||||||
|  |         TestUtils.waitAndVerifyRemovedAcl(acl.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl.pattern()) | ||||||
|  |       ) | ||||||
|  |     } | ||||||
|     assertEquals(Set(prefixAcl), getAcls(allTopicAcls)) |     assertEquals(Set(prefixAcl), getAcls(allTopicAcls)) | ||||||
| 
 | 
 | ||||||
|     ensureAcls(deleted) |     ensureAcls(deleted) | ||||||
| 
 | 
 | ||||||
|     // Delete all prefixed ACLs: |     // Delete all prefixed ACLs: | ||||||
|     deleted = client.deleteAcls(List(allPrefixedTopicAcls).asJava).all().get().asScala.toSet |     deleted = client.deleteAcls(List(allPrefixedTopicAcls).asJava).all().get().asScala.toSet | ||||||
|     assertEquals(Set(prefixAcl), deleted) |     brokers.foreach { b => | ||||||
|  |       TestUtils.waitAndVerifyRemovedAcl(prefixAcl.entry(), b.dataPlaneRequestProcessor.authorizer.get, prefixAcl.pattern()) | ||||||
|  |     } | ||||||
|     assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(allTopicAcls)) |     assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(allTopicAcls)) | ||||||
| 
 | 
 | ||||||
|     ensureAcls(deleted) |     ensureAcls(deleted) | ||||||
| 
 | 
 | ||||||
|     // Delete all topic ACLs: |     // Delete all topic ACLs: | ||||||
|     deleted = client.deleteAcls(List(allTopicAcls).asJava).all().get().asScala.toSet |     deleted = client.deleteAcls(List(allTopicAcls).asJava).all().get().asScala.toSet | ||||||
|  |     brokers.foreach { b => | ||||||
|  |       Set(anyAcl, acl2, fooAcl, prefixAcl).foreach(acl => | ||||||
|  |         TestUtils.waitAndVerifyRemovedAcl(acl.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl.pattern()) | ||||||
|  |       ) | ||||||
|  |     } | ||||||
|     assertEquals(Set(), getAcls(allTopicAcls)) |     assertEquals(Set(), getAcls(allTopicAcls)) | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   //noinspection ScalaDeprecation - test explicitly covers clients using legacy / deprecated constructors |   //noinspection ScalaDeprecation - test explicitly covers clients using legacy / deprecated constructors | ||||||
|   @Test |   @ParameterizedTest | ||||||
|   def testLegacyAclOpsNeverAffectOrReturnPrefixed(): Unit = { |   @ValueSource(strings = Array("zk", "kraft")) | ||||||
|  |   def testLegacyAclOpsNeverAffectOrReturnPrefixed(quorum: String): Unit = { | ||||||
|     client = createAdminClient |     client = createAdminClient | ||||||
|     ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl))  // <-- prefixed exists, but should never be returned. |     ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl))  // <-- prefixed exists, but should never be returned. | ||||||
| 
 | 
 | ||||||
|  | @ -258,27 +290,36 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu | ||||||
| 
 | 
 | ||||||
|     // Delete only (legacy) ACLs on 'mytopic2' topic |     // Delete only (legacy) ACLs on 'mytopic2' topic | ||||||
|     var deleted = client.deleteAcls(List(legacyMyTopic2Acls).asJava).all().get().asScala.toSet |     var deleted = client.deleteAcls(List(legacyMyTopic2Acls).asJava).all().get().asScala.toSet | ||||||
|     assertEquals(Set(acl2), deleted) |     brokers.foreach { b => | ||||||
|  |       TestUtils.waitAndVerifyRemovedAcl(acl2.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl2.pattern()) | ||||||
|  |     } | ||||||
|     assertEquals(Set(anyAcl, fooAcl, prefixAcl), getAcls(allTopicAcls)) |     assertEquals(Set(anyAcl, fooAcl, prefixAcl), getAcls(allTopicAcls)) | ||||||
| 
 | 
 | ||||||
|     ensureAcls(deleted) |     ensureAcls(deleted) | ||||||
| 
 | 
 | ||||||
|     // Delete only (legacy) ACLs on '*' topic |     // Delete only (legacy) ACLs on '*' topic | ||||||
|     deleted = client.deleteAcls(List(legacyAnyTopicAcls).asJava).all().get().asScala.toSet |     deleted = client.deleteAcls(List(legacyAnyTopicAcls).asJava).all().get().asScala.toSet | ||||||
|     assertEquals(Set(anyAcl), deleted) |     brokers.foreach { b => | ||||||
|  |       TestUtils.waitAndVerifyRemovedAcl(anyAcl.entry(), b.dataPlaneRequestProcessor.authorizer.get, anyAcl.pattern()) | ||||||
|  |     } | ||||||
|     assertEquals(Set(acl2, fooAcl, prefixAcl), getAcls(allTopicAcls)) |     assertEquals(Set(acl2, fooAcl, prefixAcl), getAcls(allTopicAcls)) | ||||||
| 
 | 
 | ||||||
|     ensureAcls(deleted) |     ensureAcls(deleted) | ||||||
| 
 | 
 | ||||||
|     // Delete all (legacy) topic ACLs: |     // Delete all (legacy) topic ACLs: | ||||||
|     deleted = client.deleteAcls(List(legacyAllTopicAcls).asJava).all().get().asScala.toSet |     deleted = client.deleteAcls(List(legacyAllTopicAcls).asJava).all().get().asScala.toSet | ||||||
|     assertEquals(Set(anyAcl, acl2, fooAcl), deleted) |     brokers.foreach { b => | ||||||
|  |       Set(anyAcl, acl2, fooAcl).foreach(acl => | ||||||
|  |         TestUtils.waitAndVerifyRemovedAcl(acl.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl.pattern()) | ||||||
|  |       ) | ||||||
|  |     } | ||||||
|     assertEquals(Set(), getAcls(legacyAllTopicAcls)) |     assertEquals(Set(), getAcls(legacyAllTopicAcls)) | ||||||
|     assertEquals(Set(prefixAcl), getAcls(allTopicAcls)) |     assertEquals(Set(prefixAcl), getAcls(allTopicAcls)) | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   @Test |   @ParameterizedTest | ||||||
|   def testAttemptToCreateInvalidAcls(): Unit = { |   @ValueSource(strings = Array("zk", "kraft")) | ||||||
|  |   def testAttemptToCreateInvalidAcls(quorum: String): Unit = { | ||||||
|     client = createAdminClient |     client = createAdminClient | ||||||
|     val clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "foobar", PatternType.LITERAL), |     val clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "foobar", PatternType.LITERAL), | ||||||
|       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) |       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) | ||||||
|  | @ -367,8 +408,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu | ||||||
|     }, "timed out waiting for describeAcls to " + (if (expectAuth) "succeed" else "fail")) |     }, "timed out waiting for describeAcls to " + (if (expectAuth) "succeed" else "fail")) | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   @Test |   @ParameterizedTest | ||||||
|   def testAclAuthorizationDenied(): Unit = { |   @ValueSource(strings = Array("zk", "kraft")) | ||||||
|  |   def testAclAuthorizationDenied(quorum: String): Unit = { | ||||||
|     client = createAdminClient |     client = createAdminClient | ||||||
| 
 | 
 | ||||||
|     // Test that we cannot create or delete ACLs when ALTER is denied. |     // Test that we cannot create or delete ACLs when ALTER is denied. | ||||||
|  | @ -416,8 +458,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu | ||||||
|     } |     } | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   @Test |   @ParameterizedTest | ||||||
|   def testCreateTopicsResponseMetadataAndConfig(): Unit = { |   @ValueSource(strings = Array("zk", "kraft")) | ||||||
|  |   def testCreateTopicsResponseMetadataAndConfig(quorum: String): Unit = { | ||||||
|     val topic1 = "mytopic1" |     val topic1 = "mytopic1" | ||||||
|     val topic2 = "mytopic2" |     val topic2 = "mytopic2" | ||||||
|     val denyAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic2, PatternType.LITERAL), |     val denyAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic2, PatternType.LITERAL), | ||||||
|  |  | ||||||
|  | @ -42,6 +42,7 @@ object SslAdminIntegrationTest { | ||||||
|   @volatile var lastUpdateRequestContext: Option[AuthorizableRequestContext] = None |   @volatile var lastUpdateRequestContext: Option[AuthorizableRequestContext] = None | ||||||
|   val superuserCn = "super-user" |   val superuserCn = "super-user" | ||||||
|   val serverUser = "server" |   val serverUser = "server" | ||||||
|  |   val clientCn = "client" | ||||||
| 
 | 
 | ||||||
|   class TestableAclAuthorizer extends AclAuthorizer { |   class TestableAclAuthorizer extends AclAuthorizer { | ||||||
|     override def createAcls(requestContext: AuthorizableRequestContext, |     override def createAcls(requestContext: AuthorizableRequestContext, | ||||||
|  | @ -86,27 +87,34 @@ object SslAdminIntegrationTest { | ||||||
|   class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { |   class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { | ||||||
|     private val Pattern = "O=A (.*?),CN=(.*?)".r |     private val Pattern = "O=A (.*?),CN=(.*?)".r | ||||||
| 
 | 
 | ||||||
|     // Use fields from DN as server principal to grant authorisation for servers and super admin client |     // Use fields from DN as principal to grant appropriate permissions | ||||||
|     override def build(context: AuthenticationContext): KafkaPrincipal = { |     override def build(context: AuthenticationContext): KafkaPrincipal = { | ||||||
|       val peerPrincipal = context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName |       if (context.securityProtocol().equals(SecurityProtocol.PLAINTEXT)) { | ||||||
|       peerPrincipal match { |         KafkaPrincipal.ANONYMOUS | ||||||
|         case Pattern(name, cn) => |       } else { | ||||||
|           val principal = if ((name == "server") || (cn == superuserCn)) "server" else KafkaPrincipal.ANONYMOUS.getName |         val peerPrincipal = context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName | ||||||
|           new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal) |         peerPrincipal match { | ||||||
|         case _ => |           case Pattern(name, cn) => | ||||||
|           KafkaPrincipal.ANONYMOUS |             val principal = | ||||||
|  |               if ((name == serverUser) || (cn == superuserCn)) serverUser | ||||||
|  |               else if (cn == clientCn) clientCn | ||||||
|  |               else KafkaPrincipal.ANONYMOUS.getName | ||||||
|  |             new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal) | ||||||
|  |           case _ => | ||||||
|  |             KafkaPrincipal.ANONYMOUS | ||||||
|  |         } | ||||||
|       } |       } | ||||||
|     } |     } | ||||||
|   } |   } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { | class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { | ||||||
|   override val aclAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName |   override val zkAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName | ||||||
| 
 | 
 | ||||||
|   this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required") |   this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required") | ||||||
|   this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[SslAdminIntegrationTest.TestPrincipalBuilder].getName) |   this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[SslAdminIntegrationTest.TestPrincipalBuilder].getName) | ||||||
|   override protected def securityProtocol = SecurityProtocol.SSL |   override protected def securityProtocol = SecurityProtocol.SSL | ||||||
|   override def kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SslAdminIntegrationTest.serverUser) |   override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SslAdminIntegrationTest.serverUser) | ||||||
| 
 | 
 | ||||||
|   override def setUpSasl(): Unit = { |   override def setUpSasl(): Unit = { | ||||||
|     SslAdminIntegrationTest.semaphore = None |     SslAdminIntegrationTest.semaphore = None | ||||||
|  | @ -213,11 +221,12 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { | ||||||
|   private def verifyAclUpdates(): Unit = { |   private def verifyAclUpdates(): Unit = { | ||||||
|     val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL), |     val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL), | ||||||
|       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) |       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) | ||||||
|  |     val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SslAdminIntegrationTest.clientCn) | ||||||
| 
 | 
 | ||||||
|     def validateRequestContext(context: AuthorizableRequestContext, apiKey: ApiKeys): Unit = { |     def validateRequestContext(context: AuthorizableRequestContext, apiKey: ApiKeys): Unit = { | ||||||
|       assertEquals(SecurityProtocol.SSL, context.securityProtocol) |       assertEquals(SecurityProtocol.SSL, context.securityProtocol) | ||||||
|       assertEquals("SSL", context.listenerName) |       assertEquals("SSL", context.listenerName) | ||||||
|       assertEquals(KafkaPrincipal.ANONYMOUS, context.principal) |       assertEquals(clientPrincipal, context.principal) | ||||||
|       assertEquals(apiKey.id.toInt, context.requestType) |       assertEquals(apiKey.id.toInt, context.requestType) | ||||||
|       assertEquals(apiKey.latestVersion.toInt, context.requestVersion) |       assertEquals(apiKey.latestVersion.toInt, context.requestVersion) | ||||||
|       assertTrue(context.correlationId > 0, s"Invalid correlation id: ${context.correlationId}") |       assertTrue(context.correlationId > 0, s"Invalid correlation id: ${context.correlationId}") | ||||||
|  | @ -280,4 +289,12 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { | ||||||
|     props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG) |     props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG) | ||||||
|     props |     props | ||||||
|   } |   } | ||||||
|  | 
 | ||||||
|  |   // Override the CN to create a principal based on it | ||||||
|  |   override def clientSecurityProps(certAlias: String): Properties = { | ||||||
|  |     val props = TestUtils.securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, certAlias, SslAdminIntegrationTest.clientCn, | ||||||
|  |       clientSaslProperties) | ||||||
|  |     props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG) | ||||||
|  |     props | ||||||
|  |   } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -38,7 +38,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem | ||||||
| import org.apache.kafka.network.SocketServerConfigs | import org.apache.kafka.network.SocketServerConfigs | ||||||
| import org.apache.kafka.raft.QuorumConfig | import org.apache.kafka.raft.QuorumConfig | ||||||
| import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, MetadataVersion} | import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, MetadataVersion} | ||||||
| import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs} | import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs} | ||||||
| import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler} | import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler} | ||||||
| import org.apache.zookeeper.client.ZKClientConfig | import org.apache.zookeeper.client.ZKClientConfig | ||||||
| import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper} | import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper} | ||||||
|  | @ -366,6 +366,8 @@ abstract class QuorumTestHarness extends Logging { | ||||||
|     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"CONTROLLER://localhost:0") |     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"CONTROLLER://localhost:0") | ||||||
|     props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") |     props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") | ||||||
|     props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:0") |     props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:0") | ||||||
|  |     // Setting the configuration to the same value set on the brokers via TestUtils to keep KRaft based and Zk based controller configs are consistent. | ||||||
|  |     props.setProperty(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000") | ||||||
|     val config = new KafkaConfig(props) |     val config = new KafkaConfig(props) | ||||||
|     val controllerQuorumVotersFuture = new CompletableFuture[util.Map[Integer, InetSocketAddress]] |     val controllerQuorumVotersFuture = new CompletableFuture[util.Map[Integer, InetSocketAddress]] | ||||||
|     val metaPropertiesEnsemble = new MetaPropertiesEnsemble.Loader(). |     val metaPropertiesEnsemble = new MetaPropertiesEnsemble.Loader(). | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue