KAFKA-13983: Fail the creation with "/" in resource name in zk ACL (#12359)

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Aman Singh 2022-07-08 15:47:48 +05:30 committed by GitHub
parent 63a6130af3
commit dc6f555492
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 12 deletions

View File

@ -121,6 +121,8 @@ object AclAuthorizer {
private def validateAclBinding(aclBinding: AclBinding): Unit = {
if (aclBinding.isUnknown)
throw new IllegalArgumentException("ACL binding contains unknown elements")
if (aclBinding.pattern().name().contains("/"))
throw new IllegalArgumentException(s"ACL binding contains invalid resource name: ${aclBinding.pattern().name()}")
}
}

View File

@ -16,40 +16,39 @@
*/
package kafka.security.authorizer
import java.io.File
import java.net.InetAddress
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
import java.util.{Collections, UUID}
import java.util.concurrent.{Executors, Semaphore, TimeUnit}
import kafka.Kafka
import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
import kafka.server.{KafkaConfig, QuorumTestHarness}
import kafka.utils.TestUtils
import kafka.zk.ZkAclStore
import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.errors.{ApiException, UnsupportedVersionException}
import org.apache.kafka.common.requests.RequestContext
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.resource.PatternType.{LITERAL, MATCH, PREFIXED}
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE
import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.PatternType.{LITERAL, MATCH, PREFIXED}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.server.authorizer._
import org.apache.kafka.common.utils.{Time, SecurityUtils => JSecurityUtils}
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_2_0_IV0, IBP_2_0_IV1}
import org.apache.zookeeper.client.ZKClientConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
import java.io.File
import java.net.InetAddress
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
import java.util.concurrent.{Executors, Semaphore, TimeUnit}
import java.util.{Collections, UUID}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
@ -722,6 +721,12 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
assertTrue(e.getCause.isInstanceOf[UnsupportedVersionException], s"Unexpected exception $e")
}
@Test
def testCreateAclWithInvalidResourceName(): Unit = {
assertThrows(classOf[ApiException],
() => addAcls(aclAuthorizer, Set(allowReadAcl), new ResourcePattern(TOPIC, "test/1", LITERAL)))
}
@Test
def testWritesExtendedAclChangeEventIfInterBrokerProtocolNotSet(): Unit = {
givenAuthorizerWithProtocolVersion(Option.empty)