KAFKA-18519: Remove Json.scala, cleanup AclEntry.scala (#18614)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2025-01-22 23:12:06 +08:00 committed by Mickael Maison
parent ac87155ba4
commit 7fc59b0ce9
7 changed files with 17 additions and 637 deletions

View File

@ -1,92 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import com.fasterxml.jackson.core.{JsonParseException, JsonProcessingException}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.MissingNode
import kafka.utils.json.JsonValue
import scala.reflect.ClassTag
/**
* Provides methods for parsing JSON with Jackson and encoding to JSON with a simple and naive custom implementation.
*/
object Json {
private val mapper = new ObjectMapper()
/**
* Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON.
*/
def parseFull(input: String): Option[JsonValue] = tryParseFull(input).toOption
/**
* Parse a JSON string into either a generic type T, or a JsonProcessingException in the case of
* exception.
*/
def parseStringAs[T](input: String)(implicit tag: ClassTag[T]): Either[JsonProcessingException, T] = {
try Right(mapper.readValue(input, tag.runtimeClass).asInstanceOf[T])
catch { case e: JsonProcessingException => Left(e) }
}
/**
* Parse a JSON byte array into a JsonValue if possible. `None` is returned if `input` is not valid JSON.
*/
def parseBytes(input: Array[Byte]): Option[JsonValue] =
try Option(mapper.readTree(input)).map(JsonValue(_))
catch { case _: JsonProcessingException => None }
def tryParseBytes(input: Array[Byte]): Either[JsonProcessingException, JsonValue] =
try Right(mapper.readTree(input)).map(JsonValue(_))
catch { case e: JsonProcessingException => Left(e) }
/**
* Parse a JSON byte array into either a generic type T, or a JsonProcessingException in the case of exception.
*/
def parseBytesAs[T](input: Array[Byte])(implicit tag: ClassTag[T]): Either[JsonProcessingException, T] = {
try Right(mapper.readValue(input, tag.runtimeClass).asInstanceOf[T])
catch { case e: JsonProcessingException => Left(e) }
}
/**
* Parse a JSON string into a JsonValue if possible. It returns an `Either` where `Left` will be an exception and
* `Right` is the `JsonValue`.
* @param input a JSON string to parse
* @return An `Either` which in case of `Left` means an exception and `Right` is the actual return value.
*/
def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
if (input == null || input.isEmpty)
Left(new JsonParseException(MissingNode.getInstance().traverse(), "The input string shouldn't be empty"))
else
try Right(mapper.readTree(input)).map(JsonValue(_))
catch { case e: JsonProcessingException => Left(e) }
/**
* Encode an object into a JSON string. This method accepts any type supported by Jackson's ObjectMapper in
* the default configuration. That is, Java collections are supported, but Scala collections are not (to avoid
* a jackson-scala dependency).
*/
def encodeAsString(obj: Any): String = mapper.writeValueAsString(obj)
/**
* Encode an object into a JSON value in bytes. This method accepts any type supported by Jackson's ObjectMapper in
* the default configuration. That is, Java collections are supported, but Scala collections are not (to avoid
* a jackson-scala dependency).
*/
def encodeAsBytes(obj: Any): Array[Byte] = mapper.writeValueAsBytes(obj)
}

View File

@ -1,48 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.security.authorizer
import java.nio.charset.StandardCharsets.UTF_8
import kafka.utils.Json
import org.apache.kafka.common.acl.AccessControlEntry
import org.apache.kafka.common.acl.AclOperation.READ
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.security.authorizer.AclEntry
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import java.util
class AclEntryTest {
val AclJson = """{"version": 1, "acls": [{"host": "host1","permissionType": "Deny","operation": "READ", "principal": "User:alice" },
{ "host": "*" , "permissionType": "Allow", "operation": "Read", "principal": "User:bob" },
{ "host": "host1", "permissionType": "Deny", "operation": "Read" , "principal": "User:bob"}]}"""
@Test
def testAclJsonConversion(): Unit = {
val acl1 = new AclEntry(new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice").toString, "host1", READ, DENY))
val acl2 = new AclEntry(new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob").toString, "*", READ, ALLOW))
val acl3 = new AclEntry(new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob").toString, "host1", READ, DENY))
val acls = new util.HashSet[AclEntry](util.Arrays.asList(acl1, acl2, acl3))
assertEquals(acls, AclEntry.fromBytes(Json.encodeAsBytes(AclEntry.toJsonCompatibleMap(acls))))
assertEquals(acls, AclEntry.fromBytes(AclJson.getBytes(UTF_8)))
}
}

View File

@ -1,134 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import java.nio.charset.StandardCharsets
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.core.{JsonParseException, JsonProcessingException}
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node._
import kafka.utils.JsonTest.TestObject
import kafka.utils.json.JsonValue
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import scala.jdk.CollectionConverters._
import scala.collection.Map
object JsonTest {
case class TestObject(@JsonProperty("foo") foo: String, @JsonProperty("bar") bar: Int)
}
class JsonTest {
@Test
def testJsonParse(): Unit = {
val jnf = JsonNodeFactory.instance
assertEquals(Some(JsonValue(new ObjectNode(jnf))), Json.parseFull("{}"))
assertEquals(Right(JsonValue(new ObjectNode(jnf))), Json.tryParseFull("{}"))
assertEquals(classOf[Left[JsonProcessingException, JsonValue]], Json.tryParseFull(null).getClass)
assertThrows(classOf[IllegalArgumentException], () => Json.tryParseBytes(null))
assertEquals(None, Json.parseFull(""))
assertEquals(classOf[Left[JsonProcessingException, JsonValue]], Json.tryParseFull("").getClass)
assertEquals(None, Json.parseFull("""{"foo":"bar"s}"""))
val tryRes = Json.tryParseFull("""{"foo":"bar"s}""")
assertTrue(tryRes.isInstanceOf[Left[_, JsonValue]])
val objectNode = new ObjectNode(
jnf,
Map[String, JsonNode]("foo" -> new TextNode("bar"), "is_enabled" -> BooleanNode.TRUE).asJava
)
assertEquals(Some(JsonValue(objectNode)), Json.parseFull("""{"foo":"bar", "is_enabled":true}"""))
assertEquals(Right(JsonValue(objectNode)), Json.tryParseFull("""{"foo":"bar", "is_enabled":true}"""))
val arrayNode = new ArrayNode(jnf)
Vector(1, 2, 3).map(new IntNode(_)).foreach(arrayNode.add)
assertEquals(Some(JsonValue(arrayNode)), Json.parseFull("[1, 2, 3]"))
// Test with encoder that properly escapes backslash and quotes
val map = Map("foo1" -> """bar1\,bar2""", "foo2" -> """\bar""").asJava
val encoded = Json.encodeAsString(map)
val decoded = Json.parseFull(encoded)
assertEquals(decoded, Json.parseFull("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}"""))
}
@Test
def testEncodeAsString(): Unit = {
assertEquals("null", Json.encodeAsString(null))
assertEquals("1", Json.encodeAsString(1))
assertEquals("1", Json.encodeAsString(1L))
assertEquals("1", Json.encodeAsString(1.toByte))
assertEquals("1", Json.encodeAsString(1.toShort))
assertEquals("1.0", Json.encodeAsString(1.0))
assertEquals(""""str"""", Json.encodeAsString("str"))
assertEquals("true", Json.encodeAsString(true))
assertEquals("false", Json.encodeAsString(false))
assertEquals("[]", Json.encodeAsString(Seq().asJava))
assertEquals("[null]", Json.encodeAsString(Seq(null).asJava))
assertEquals("[1,2,3]", Json.encodeAsString(Seq(1,2,3).asJava))
assertEquals("""[1,"2",[3],null]""", Json.encodeAsString(Seq(1,"2",Seq(3).asJava,null).asJava))
assertEquals("{}", Json.encodeAsString(Map().asJava))
assertEquals("""{"a":1,"b":2,"c":null}""", Json.encodeAsString(Map("a" -> 1, "b" -> 2, "c" -> null).asJava))
assertEquals("""{"a":[1,2],"c":[3,4]}""", Json.encodeAsString(Map("a" -> Seq(1,2).asJava, "c" -> Seq(3,4).asJava).asJava))
assertEquals("""{"a":[1,2],"b":[3,4],"c":null}""", Json.encodeAsString(Map("a" -> Seq(1,2).asJava, "b" -> Seq(3,4).asJava, "c" -> null).asJava))
assertEquals(""""str1\\,str2"""", Json.encodeAsString("""str1\,str2"""))
assertEquals(""""\"quoted\""""", Json.encodeAsString(""""quoted""""))
}
@Test
def testEncodeAsBytes(): Unit = {
assertEquals("null", new String(Json.encodeAsBytes(null), StandardCharsets.UTF_8))
assertEquals("1", new String(Json.encodeAsBytes(1), StandardCharsets.UTF_8))
assertEquals("1", new String(Json.encodeAsBytes(1L), StandardCharsets.UTF_8))
assertEquals("1", new String(Json.encodeAsBytes(1.toByte), StandardCharsets.UTF_8))
assertEquals("1", new String(Json.encodeAsBytes(1.toShort), StandardCharsets.UTF_8))
assertEquals("1.0", new String(Json.encodeAsBytes(1.0), StandardCharsets.UTF_8))
assertEquals(""""str"""", new String(Json.encodeAsBytes("str"), StandardCharsets.UTF_8))
assertEquals("true", new String(Json.encodeAsBytes(true), StandardCharsets.UTF_8))
assertEquals("false", new String(Json.encodeAsBytes(false), StandardCharsets.UTF_8))
assertEquals("[]", new String(Json.encodeAsBytes(Seq().asJava), StandardCharsets.UTF_8))
assertEquals("[null]", new String(Json.encodeAsBytes(Seq(null).asJava), StandardCharsets.UTF_8))
assertEquals("[1,2,3]", new String(Json.encodeAsBytes(Seq(1,2,3).asJava), StandardCharsets.UTF_8))
assertEquals("""[1,"2",[3],null]""", new String(Json.encodeAsBytes(Seq(1,"2",Seq(3).asJava,null).asJava), StandardCharsets.UTF_8))
assertEquals("{}", new String(Json.encodeAsBytes(Map().asJava), StandardCharsets.UTF_8))
assertEquals("""{"a":1,"b":2,"c":null}""", new String(Json.encodeAsBytes(Map("a" -> 1, "b" -> 2, "c" -> null).asJava), StandardCharsets.UTF_8))
assertEquals("""{"a":[1,2],"c":[3,4]}""", new String(Json.encodeAsBytes(Map("a" -> Seq(1,2).asJava, "c" -> Seq(3,4).asJava).asJava), StandardCharsets.UTF_8))
assertEquals("""{"a":[1,2],"b":[3,4],"c":null}""", new String(Json.encodeAsBytes(Map("a" -> Seq(1,2).asJava, "b" -> Seq(3,4).asJava, "c" -> null).asJava), StandardCharsets.UTF_8))
assertEquals(""""str1\\,str2"""", new String(Json.encodeAsBytes("""str1\,str2"""), StandardCharsets.UTF_8))
assertEquals(""""\"quoted\""""", new String(Json.encodeAsBytes(""""quoted""""), StandardCharsets.UTF_8))
}
@Test
def testParseTo(): Unit = {
val foo = "baz"
val bar = 1
val result = Json.parseStringAs[TestObject](s"""{"foo": "$foo", "bar": $bar}""")
assertEquals(Right(TestObject(foo, bar)), result)
}
@Test
def testParseToWithInvalidJson(): Unit = {
val result = Json.parseStringAs[TestObject]("{invalid json}")
assertEquals(Left(classOf[JsonParseException]), result.left.map(_.getClass))
}
}

View File

@ -1,212 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils.json
import scala.collection.Seq
import com.fasterxml.jackson.databind.{ObjectMapper, JsonMappingException}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
import kafka.utils.Json
class JsonValueTest {
private val json = """
|{
| "boolean": false,
| "int": 1234,
| "long": 3000000000,
| "double": 16.244355,
| "string": "string",
| "number_as_string": "123",
| "array": [4.0, 11.1, 44.5],
| "object": {
| "a": true,
| "b": false
| },
| "null": null
|}
""".stripMargin
private def parse(s: String): JsonValue =
Json.parseFull(s).getOrElse(sys.error("Failed to parse json: " + s))
private def assertTo[T: DecodeJson](expected: T, jsonValue: JsonObject => JsonValue): Unit = {
val parsed = jsonValue(parse(json).asJsonObject)
assertEquals(Right(expected), parsed.toEither[T])
assertEquals(expected, parsed.to[T])
}
private def assertToFails[T: DecodeJson](jsonValue: JsonObject => JsonValue): Unit = {
val parsed = jsonValue(parse(json).asJsonObject)
assertTrue(parsed.toEither[T].isLeft)
assertThrow[JsonMappingException](parsed.to[T])
}
def assertThrow[E <: Throwable : Manifest](body: => Unit): Unit = {
import scala.util.control.Exception._
val klass = manifest[E].runtimeClass
catchingPromiscuously(klass).opt(body).foreach { _ =>
fail("Expected `" + klass + "` to be thrown, but no exception was thrown")
}
}
@Test
def testAsJsonObject(): Unit = {
val parsed = parse(json).asJsonObject
val obj = parsed("object")
assertEquals(obj, obj.asJsonObject)
assertThrow[JsonMappingException](parsed("array").asJsonObject)
}
@Test
def testAsJsonObjectOption(): Unit = {
val parsed = parse(json).asJsonObject
assertTrue(parsed("object").asJsonObjectOption.isDefined)
assertEquals(None, parsed("array").asJsonObjectOption)
}
@Test
def testAsJsonArray(): Unit = {
val parsed = parse(json).asJsonObject
val array = parsed("array")
assertEquals(array, array.asJsonArray)
assertThrow[JsonMappingException](parsed("object").asJsonArray)
}
@Test
def testAsJsonArrayOption(): Unit = {
val parsed = parse(json).asJsonObject
assertTrue(parsed("array").asJsonArrayOption.isDefined)
assertEquals(None, parsed("object").asJsonArrayOption)
}
@Test
def testJsonObjectGet(): Unit = {
val parsed = parse(json).asJsonObject
assertEquals(Some(parse("""{"a":true,"b":false}""")), parsed.get("object"))
assertEquals(None, parsed.get("aaaaa"))
}
@Test
def testJsonObjectApply(): Unit = {
val parsed = parse(json).asJsonObject
assertEquals(parse("""{"a":true,"b":false}"""), parsed("object"))
assertThrow[JsonMappingException](parsed("aaaaaaaa"))
}
@Test
def testJsonObjectIterator(): Unit = {
assertEquals(
Vector("a" -> parse("true"), "b" -> parse("false")),
parse(json).asJsonObject("object").asJsonObject.iterator.toVector
)
}
@Test
def testJsonArrayIterator(): Unit = {
assertEquals(Vector("4.0", "11.1", "44.5").map(parse), parse(json).asJsonObject("array").asJsonArray.iterator.toVector)
}
@Test
def testJsonValueEquals(): Unit = {
assertEquals(parse(json), parse(json))
assertEquals(parse("""{"blue": true, "red": false}"""), parse("""{"red": false, "blue": true}"""))
assertNotEquals(parse("""{"blue": true, "red": true}"""), parse("""{"red": false, "blue": true}"""))
assertEquals(parse("""[1, 2, 3]"""), parse("""[1, 2, 3]"""))
assertNotEquals(parse("""[1, 2, 3]"""), parse("""[2, 1, 3]"""))
assertEquals(parse("1344"), parse("1344"))
assertNotEquals(parse("1344"), parse("144"))
}
@Test
def testJsonValueHashCode(): Unit = {
assertEquals(new ObjectMapper().readTree(json).hashCode, parse(json).hashCode)
}
@Test
def testJsonValueToString(): Unit = {
val js = """{"boolean":false,"int":1234,"array":[4.0,11.1,44.5],"object":{"a":true,"b":false}}"""
assertEquals(js, parse(js).toString)
}
@Test
def testDecodeBoolean(): Unit = {
assertTo[Boolean](false, _("boolean"))
assertToFails[Boolean](_("int"))
}
@Test
def testDecodeString(): Unit = {
assertTo[String]("string", _("string"))
assertTo[String]("123", _("number_as_string"))
assertToFails[String](_("int"))
assertToFails[String](_("array"))
}
@Test
def testDecodeInt(): Unit = {
assertTo[Int](1234, _("int"))
assertToFails[Int](_("long"))
}
@Test
def testDecodeLong(): Unit = {
assertTo[Long](3000000000L, _("long"))
assertTo[Long](1234, _("int"))
assertToFails[Long](_("string"))
}
@Test
def testDecodeDouble(): Unit = {
assertTo[Double](16.244355, _("double"))
assertTo[Double](1234.0, _("int"))
assertTo[Double](3000000000L, _("long"))
assertToFails[Double](_("string"))
}
@Test
def testDecodeSeq(): Unit = {
assertTo[Seq[Double]](Seq(4.0, 11.1, 44.5), _("array"))
assertToFails[Seq[Double]](_("string"))
assertToFails[Seq[Double]](_("object"))
assertToFails[Seq[String]](_("array"))
}
@Test
def testDecodeMap(): Unit = {
assertTo[Map[String, Boolean]](Map("a" -> true, "b" -> false), _("object"))
assertToFails[Map[String, Int]](_("object"))
assertToFails[Map[String, String]](_("object"))
assertToFails[Map[String, Double]](_("array"))
}
@Test
def testDecodeOption(): Unit = {
assertTo[Option[Int]](None, _("null"))
assertTo[Option[Int]](Some(1234), _("int"))
assertToFails[Option[String]](_("int"))
}
}

View File

@ -151,13 +151,6 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="EQ_UNUSUAL"/>
</Match>
<Match>
<!-- Add a suppression for auto-generated calls to instanceof in kafka.utils.Json -->
<Source name="Json.scala"/>
<Package name="kafka.utils"/>
<Bug pattern="BC_VACUOUS_INSTANCEOF"/>
</Match>
<Match>
<!-- A spurious null check after inlining by the scalac optimizer confuses spotBugs -->
<Class name="kafka.log.Log"/>

View File

@ -35,7 +35,6 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.metadata.authorizer.StandardAcl;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.security.authorizer.AclEntry;
import org.apache.kafka.server.authorizer.Action;
import org.openjdk.jmh.annotations.Benchmark;
@ -113,14 +112,14 @@ public class AuthorizerBenchmark {
}
private void prepareAclCache() {
Map<ResourcePattern, Set<AclEntry>> aclEntries = new HashMap<>();
Map<ResourcePattern, Set<AccessControlEntry>> aclEntries = new HashMap<>();
for (int resourceId = 0; resourceId < resourceCount; resourceId++) {
ResourcePattern resource = new ResourcePattern(
(resourceId % 10 == 0) ? ResourceType.GROUP : ResourceType.TOPIC,
resourceNamePrefix + resourceId,
(resourceId % 5 == 0) ? PatternType.PREFIXED : PatternType.LITERAL);
Set<AclEntry> entries = aclEntries.computeIfAbsent(resource, k -> new HashSet<>());
Set<AccessControlEntry> entries = aclEntries.computeIfAbsent(resource, k -> new HashSet<>());
for (int aclId = 0; aclId < aclCount; aclId++) {
// The principal in the request context we are using
@ -129,36 +128,31 @@ public class AuthorizerBenchmark {
AccessControlEntry allowAce = new AccessControlEntry(
principalName, "*", AclOperation.READ, AclPermissionType.ALLOW);
entries.add(new AclEntry(allowAce));
entries.add(new AccessControlEntry(allowAce.principal(), allowAce.host(), allowAce.operation(), allowAce.permissionType()));
if (shouldDeny()) {
// dominantly deny the resource
AccessControlEntry denyAce = new AccessControlEntry(
principalName, "*", AclOperation.READ, AclPermissionType.DENY);
entries.add(new AclEntry(denyAce));
entries.add(new AccessControlEntry(principalName, "*", AclOperation.READ, AclPermissionType.DENY));
}
}
}
ResourcePattern resourcePrefix = new ResourcePattern(ResourceType.TOPIC, resourceNamePrefix,
PatternType.PREFIXED);
Set<AclEntry> entriesPrefix = aclEntries.computeIfAbsent(resourcePrefix, k -> new HashSet<>());
Set<AccessControlEntry> entriesPrefix = aclEntries.computeIfAbsent(resourcePrefix, k -> new HashSet<>());
for (int hostId = 0; hostId < hostPreCount; hostId++) {
AccessControlEntry allowAce = new AccessControlEntry(principal.toString(), "127.0.0." + hostId,
AclOperation.READ, AclPermissionType.ALLOW);
entriesPrefix.add(new AclEntry(allowAce));
entriesPrefix.add(new AccessControlEntry(allowAce.principal(), allowAce.host(), allowAce.operation(), allowAce.permissionType()));
if (shouldDeny()) {
// dominantly deny the resource
AccessControlEntry denyAce = new AccessControlEntry(principal.toString(), "127.0.0." + hostId,
AclOperation.READ, AclPermissionType.DENY);
entriesPrefix.add(new AclEntry(denyAce));
entriesPrefix.add(new AccessControlEntry(principal.toString(), "127.0.0." + hostId,
AclOperation.READ, AclPermissionType.DENY));
}
}
ResourcePattern resourceWildcard = new ResourcePattern(ResourceType.TOPIC, ResourcePattern.WILDCARD_RESOURCE,
PatternType.LITERAL);
Set<AclEntry> entriesWildcard = aclEntries.computeIfAbsent(resourceWildcard, k -> new HashSet<>());
Set<AccessControlEntry> entriesWildcard = aclEntries.computeIfAbsent(resourceWildcard, k -> new HashSet<>());
// get dynamic entries number for wildcard acl
for (int hostId = 0; hostId < resourceCount / 10; hostId++) {
String hostName = "127.0.0" + hostId;
@ -170,23 +164,22 @@ public class AuthorizerBenchmark {
AccessControlEntry allowAce = new AccessControlEntry(principal.toString(), hostName,
AclOperation.READ, AclPermissionType.ALLOW);
entriesWildcard.add(new AclEntry(allowAce));
entriesWildcard.add(new AccessControlEntry(allowAce.principal(), allowAce.host(), allowAce.operation(), allowAce.permissionType()));
if (shouldDeny()) {
AccessControlEntry denyAce = new AccessControlEntry(principal.toString(), hostName,
AclOperation.READ, AclPermissionType.DENY);
entriesWildcard.add(new AclEntry(denyAce));
entriesWildcard.add(new AccessControlEntry(principal.toString(), hostName,
AclOperation.READ, AclPermissionType.DENY));
}
}
setupAcls(aclEntries);
}
private void setupAcls(Map<ResourcePattern, Set<AclEntry>> aclEntries) {
for (Map.Entry<ResourcePattern, Set<AclEntry>> entryMap : aclEntries.entrySet()) {
private void setupAcls(Map<ResourcePattern, Set<AccessControlEntry>> aclEntries) {
for (Map.Entry<ResourcePattern, Set<AccessControlEntry>> entryMap : aclEntries.entrySet()) {
ResourcePattern resourcePattern = entryMap.getKey();
for (AclEntry aclEntry : entryMap.getValue()) {
StandardAcl standardAcl = StandardAcl.fromAclBinding(new AclBinding(resourcePattern, aclEntry));
for (AccessControlEntry accessControlEntry : entryMap.getValue()) {
StandardAcl standardAcl = StandardAcl.fromAclBinding(new AclBinding(resourcePattern, accessControlEntry));
authorizer.addAcl(Uuid.randomUuid(), standardAcl);
}
authorizer.completeInitialLoad();

View File

@ -16,28 +16,15 @@
*/
package org.apache.kafka.security.authorizer;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.Json;
import org.apache.kafka.server.util.json.DecodeJson;
import org.apache.kafka.server.util.json.JsonObject;
import org.apache.kafka.server.util.json.JsonValue;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -54,99 +41,16 @@ import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
import static org.apache.kafka.common.acl.AclOperation.READ;
import static org.apache.kafka.common.acl.AclOperation.WRITE;
public class AclEntry extends AccessControlEntry {
private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
public class AclEntry {
public static final KafkaPrincipal WILDCARD_PRINCIPAL = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*");
public static final String WILDCARD_PRINCIPAL_STRING = WILDCARD_PRINCIPAL.toString();
public static final String WILDCARD_HOST = "*";
public static final String WILDCARD_RESOURCE = ResourcePattern.WILDCARD_RESOURCE;
public static final String RESOURCE_SEPARATOR = ":";
public static final Set<ResourceType> RESOURCE_TYPES = Arrays.stream(ResourceType.values())
.filter(t -> !(t == ResourceType.UNKNOWN || t == ResourceType.ANY))
.collect(Collectors.toSet());
public static final Set<AclOperation> ACL_OPERATIONS = Arrays.stream(AclOperation.values())
.filter(t -> !(t == AclOperation.UNKNOWN || t == AclOperation.ANY))
.collect(Collectors.toSet());
private static final String PRINCIPAL_KEY = "principal";
private static final String PERMISSION_TYPE_KEY = "permissionType";
private static final String OPERATION_KEY = "operation";
private static final String HOSTS_KEY = "host";
public static final String VERSION_KEY = "version";
public static final int CURRENT_VERSION = 1;
private static final String ACLS_KEY = "acls";
public final AccessControlEntry ace;
public final KafkaPrincipal kafkaPrincipal;
public AclEntry(AccessControlEntry ace) {
super(ace.principal(), ace.host(), ace.operation(), ace.permissionType());
this.ace = ace;
kafkaPrincipal = ace.principal() == null
? null
: SecurityUtils.parseKafkaPrincipal(ace.principal());
}
/**
* Parse JSON representation of ACLs
* @param bytes of acls json string
*
* <p>
{
"version": 1,
"acls": [
{
"host":"host1",
"permissionType": "Deny",
"operation": "Read",
"principal": "User:alice"
}
]
}
* </p>
*
* @return set of AclEntry objects from the JSON string
*/
public static Set<AclEntry> fromBytes(byte[] bytes) throws IOException {
if (bytes == null || bytes.length == 0)
return Collections.emptySet();
Optional<JsonValue> jsonValue = Json.parseBytes(bytes);
if (!jsonValue.isPresent())
return Collections.emptySet();
JsonObject js = jsonValue.get().asJsonObject();
//the acl json version.
Utils.require(js.apply(VERSION_KEY).to(INT) == CURRENT_VERSION);
Set<AclEntry> res = new HashSet<>();
Iterator<JsonValue> aclsIter = js.apply(ACLS_KEY).asJsonArray().iterator();
while (aclsIter.hasNext()) {
JsonObject itemJs = aclsIter.next().asJsonObject();
KafkaPrincipal principal = SecurityUtils.parseKafkaPrincipal(itemJs.apply(PRINCIPAL_KEY).to(STRING));
AclPermissionType permissionType = SecurityUtils.permissionType(itemJs.apply(PERMISSION_TYPE_KEY).to(STRING));
String host = itemJs.apply(HOSTS_KEY).to(STRING);
AclOperation operation = SecurityUtils.operation(itemJs.apply(OPERATION_KEY).to(STRING));
res.add(new AclEntry(new AccessControlEntry(principal.toString(),
host, operation, permissionType)));
}
return res;
}
public static Map<String, Object> toJsonCompatibleMap(Set<AclEntry> acls) {
Map<String, Object> res = new HashMap<>();
res.put(AclEntry.VERSION_KEY, AclEntry.CURRENT_VERSION);
res.put(AclEntry.ACLS_KEY, acls.stream().map(AclEntry::toMap).collect(Collectors.toList()));
return res;
}
public static Set<AclOperation> supportedOperations(ResourceType resourceType) {
switch (resourceType) {
case TOPIC:
@ -182,28 +86,4 @@ public class AclEntry extends AccessControlEntry {
throw new IllegalArgumentException("Authorization error type not known");
}
}
public Map<String, Object> toMap() {
Map<String, Object> res = new HashMap<>();
res.put(AclEntry.PRINCIPAL_KEY, principal());
res.put(AclEntry.PERMISSION_TYPE_KEY, SecurityUtils.permissionTypeName(permissionType()));
res.put(AclEntry.OPERATION_KEY, SecurityUtils.operationName(operation()));
res.put(AclEntry.HOSTS_KEY, host());
return res;
}
@Override
public int hashCode() {
return ace.hashCode();
}
@Override
public boolean equals(Object o) {
return super.equals(o); // to keep spotbugs happy
}
@Override
public String toString() {
return String.format("%s has %s permission for operations: %s from hosts: %s", principal(), permissionType().name(), operation(), host());
}
}