From cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b Mon Sep 17 00:00:00 2001 From: Nick Guo Date: Sat, 31 May 2025 02:34:56 +0800 Subject: [PATCH] KAFKA-19042 Move GroupAuthorizerIntegrationTest to clients-integration-tests module (#19685) move GroupAuthorizerIntegrationTest to clients-integration-tests module Reviewers: Ken Huang , PoAn Yang , keemsisi , Chia-Ping Tsai --- .../GroupAuthorizerIntegrationTest.java | 402 ++++++++++++++++++ .../api/GroupAuthorizerIntegrationTest.scala | 239 ----------- 2 files changed, 402 insertions(+), 239 deletions(-) create mode 100644 clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java delete mode 100644 core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java new file mode 100644 index 00000000000..725c0f53786 --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.security; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.Resource; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.security.auth.AuthenticationContext; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.metadata.authorizer.StandardAuthorizer; +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; +import org.apache.kafka.server.authorizer.Authorizer; +import org.apache.kafka.server.config.ServerConfigs; + +import java.net.InetAddress; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + + +@ClusterTestDefaults(serverProperties = { + @ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG, value = "Group:broker"), + @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + @ClusterConfigProperty(key = ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, value = "org.apache.kafka.metadata.authorizer.StandardAuthorizer"), + @ClusterConfigProperty(key = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, value = "org.apache.kafka.clients.security.GroupAuthorizerIntegrationTest$GroupPrincipalBuilder"), +}) +public class GroupAuthorizerIntegrationTest { + private static final KafkaPrincipal BROKER_PRINCIPAL = new KafkaPrincipal("Group", "broker"); + private static final KafkaPrincipal CLIENT_PRINCIPAL = new KafkaPrincipal("Group", "client"); + + private static final String BROKER_LISTENER_NAME = "BROKER"; + private static final String CLIENT_LISTENER_NAME = "EXTERNAL"; + private static final String CONTROLLER_LISTENER_NAME = "CONTROLLER"; + + private Authorizer getAuthorizer(ClusterInstance clusterInstance) { + return clusterInstance.controllers().values().stream() + .filter(server -> server.authorizerPlugin().isDefined()) + .map(server -> server.authorizerPlugin().get().get()).findFirst().get(); + } + + private void setup(ClusterInstance clusterInstance) throws InterruptedException { + // Allow inter-broker communication + addAndVerifyAcls( + Set.of(createAcl(AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW, BROKER_PRINCIPAL)), + new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL), + clusterInstance + ); + addAndVerifyAcls( + Set.of(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)), + new ResourcePattern(ResourceType.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME, PatternType.LITERAL), + clusterInstance + ); + + NewTopic offsetTopic = new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, 1, (short) 1); + try (Admin admin = clusterInstance.admin(Map.of( + AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true)) + ) { + admin.createTopics(Collections.singleton(offsetTopic)); + clusterInstance.waitForTopic(Topic.GROUP_METADATA_TOPIC_NAME, 1); + } + } + + public static class GroupPrincipalBuilder extends DefaultKafkaPrincipalBuilder { + public GroupPrincipalBuilder() { + super(null, null); + } + + @Override + public KafkaPrincipal build(AuthenticationContext context) { + String listenerName = context.listenerName(); + return switch (listenerName) { + case BROKER_LISTENER_NAME, CONTROLLER_LISTENER_NAME -> BROKER_PRINCIPAL; + case CLIENT_LISTENER_NAME -> CLIENT_PRINCIPAL; + default -> throw new IllegalArgumentException("No principal mapped to listener " + listenerName); + }; + } + } + + private AccessControlEntry createAcl(AclOperation aclOperation, AclPermissionType aclPermissionType, KafkaPrincipal principal) { + return new AccessControlEntry( + principal.toString(), + WILDCARD_HOST, + aclOperation, + aclPermissionType + ); + } + + private void addAndVerifyAcls(Set acls, ResourcePattern resource, ClusterInstance clusterInstance) throws InterruptedException { + List aclBindings = acls.stream().map(acl -> new AclBinding(resource, acl)).toList(); + Authorizer authorizer = getAuthorizer(clusterInstance); + authorizer.createAcls(ANONYMOUS_CONTEXT, aclBindings) + .forEach(future -> { + try { + future.toCompletableFuture().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Failed to create ACLs", e); + } + }); + AclBindingFilter aclBindingFilter = new AclBindingFilter(resource.toFilter(), AccessControlEntryFilter.ANY); + clusterInstance.waitAcls(aclBindingFilter, acls); + } + + static final AuthorizableRequestContext ANONYMOUS_CONTEXT = new AuthorizableRequestContext() { + @Override + public String listenerName() { + return ""; + } + + @Override + public SecurityProtocol securityProtocol() { + return SecurityProtocol.PLAINTEXT; + } + + @Override + public KafkaPrincipal principal() { + return KafkaPrincipal.ANONYMOUS; + } + + @Override + public InetAddress clientAddress() { + return null; + } + + @Override + public int requestType() { + return 0; + } + + @Override + public int requestVersion() { + return 0; + } + + @Override + public String clientId() { + return ""; + } + + @Override + public int correlationId() { + return 0; + } + }; + + @ClusterTest + public void testUnauthorizedProduceAndConsumeWithClassicConsumer(ClusterInstance clusterInstance) throws InterruptedException { + testUnauthorizedProduceAndConsume(clusterInstance, GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testUnauthorizedProduceAndConsumeWithAsyncConsumer(ClusterInstance clusterInstance) throws InterruptedException { + testUnauthorizedProduceAndConsume(clusterInstance, GroupProtocol.CONSUMER); + } + + public void testUnauthorizedProduceAndConsume(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException { + setup(clusterInstance); + String topic = "topic"; + String group = "group"; + + addAndVerifyAcls( + Set.of(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)), + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL), + clusterInstance + ); + addAndVerifyAcls( + Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL), + clusterInstance + ); + + Producer producer = clusterInstance.producer(); + Consumer consumer = clusterInstance.consumer(Map.of( + GROUP_PROTOCOL_CONFIG, groupProtocol.name.toLowerCase(Locale.ROOT), + ConsumerConfig.GROUP_ID_CONFIG, group + )); + + try { + clusterInstance.createTopic(topic, 1, (short) 1); + ExecutionException produceException = assertThrows( + ExecutionException.class, + () -> producer.send(new ProducerRecord<>(topic, "message".getBytes())).get() + ); + Throwable cause = produceException.getCause(); + assertInstanceOf(TopicAuthorizationException.class, cause); + TopicAuthorizationException topicAuthException = (TopicAuthorizationException) cause; + assertEquals(Set.of(topic), topicAuthException.unauthorizedTopics()); + + TopicPartition topicPartition = new TopicPartition(topic, 0); + consumer.assign(Collections.singletonList(topicPartition)); + TopicAuthorizationException consumeException = assertThrows( + TopicAuthorizationException.class, + () -> consumer.poll(Duration.ofSeconds(15)) + ); + assertEquals(consumeException.unauthorizedTopics(), topicAuthException.unauthorizedTopics()); + } finally { + producer.close(Duration.ZERO); + consumer.close(); + } + } + + @ClusterTest + public void testClassicConsumeUnsubscribeWithoutGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + testConsumeUnsubscribeWithGroupPermission(clusterInstance, GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumeUnsubscribeWithoutGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + testConsumeUnsubscribeWithGroupPermission(clusterInstance, GroupProtocol.CONSUMER); + } + + private void testConsumeUnsubscribeWithGroupPermission(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException, ExecutionException { + setup(clusterInstance); + String topic = "topic"; + String group = "group"; + + // allow topic read/write permission to poll/send record + Set acls = new HashSet<>(); + acls.add(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)); + acls.add(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)); + acls.add(createAcl(AclOperation.READ, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)); + addAndVerifyAcls( + acls, + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL), + clusterInstance + ); + addAndVerifyAcls( + Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL), + clusterInstance + ); + + try (Producer producer = clusterInstance.producer(); + Consumer consumer = clusterInstance.consumer(Map.of( + ConsumerConfig.GROUP_ID_CONFIG, group, + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", + GROUP_PROTOCOL_CONFIG, groupProtocol.name.toLowerCase(Locale.ROOT))) + ) { + clusterInstance.createTopic(topic, 1, (short) 1); + producer.send(new ProducerRecord<>(topic, "message".getBytes())).get(); + consumer.subscribe(Collections.singletonList(topic)); + TestUtils.waitForCondition(() -> { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(15)); + return records.count() == 1; + }, "consumer failed to receive message"); + assertDoesNotThrow(consumer::unsubscribe); + } + } + + @ClusterTest + public void testClassicConsumeCloseWithGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + testConsumeCloseWithGroupPermission(clusterInstance, GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumeCloseWithGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + testConsumeCloseWithGroupPermission(clusterInstance, GroupProtocol.CONSUMER); + } + + private void testConsumeCloseWithGroupPermission(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException, ExecutionException { + setup(clusterInstance); + String topic = "topic"; + String group = "group"; + + // allow topic read/write permission to poll/send record + Set acls = new HashSet<>(); + acls.add(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)); + acls.add(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)); + acls.add(createAcl(AclOperation.READ, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)); + addAndVerifyAcls( + acls, + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL), + clusterInstance + ); + addAndVerifyAcls( + Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL), + clusterInstance + ); + + Producer producer = clusterInstance.producer(); + Consumer consumer = clusterInstance.consumer(Map.of( + ConsumerConfig.GROUP_ID_CONFIG, group, + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", + GROUP_PROTOCOL_CONFIG, groupProtocol.name.toLowerCase(Locale.ROOT))); + + try { + clusterInstance.createTopic(topic, 1, (short) 1); + producer.send(new ProducerRecord<>(topic, "message".getBytes())).get(); + consumer.subscribe(List.of(topic)); + TestUtils.waitForCondition(() -> { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(15)); + return records.count() == 1; + }, "consumer failed to receive message"); + } finally { + producer.close(); + assertDoesNotThrow(() -> consumer.close()); + } + } + + @ClusterTest + public void testAuthorizedProduceAndConsumeWithClassic(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + testAuthorizedProduceAndConsume(clusterInstance, GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAuthorizedProduceAndConsumeWithAsync(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + testAuthorizedProduceAndConsume(clusterInstance, GroupProtocol.CONSUMER); + } + + private void testAuthorizedProduceAndConsume(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException, ExecutionException { + setup(clusterInstance); + String topic = "topic"; + String group = "group"; + + Set acls = new HashSet<>(); + acls.add(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)); + acls.add(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)); + acls.add(createAcl(AclOperation.READ, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)); + addAndVerifyAcls( + acls, + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL), + clusterInstance + ); + addAndVerifyAcls( + Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL), + clusterInstance + ); + + try (Producer producer = clusterInstance.producer(); + Consumer consumer = clusterInstance.consumer(Map.of( + ConsumerConfig.GROUP_ID_CONFIG, group, + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", + GROUP_PROTOCOL_CONFIG, groupProtocol.name.toLowerCase(Locale.ROOT))) + ) { + clusterInstance.createTopic(topic, 1, (short) 1); + producer.send(new ProducerRecord<>(topic, "message".getBytes())).get(); + TopicPartition topicPartition = new TopicPartition(topic, 0); + consumer.assign(List.of(topicPartition)); + TestUtils.waitForCondition(() -> { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(15)); + return records.count() == 1; + }, "consumer failed to receive message"); + } + } + +} diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala deleted file mode 100644 index 01d18114a04..00000000000 --- a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala +++ /dev/null @@ -1,239 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package kafka.api - -import java.util.Properties -import java.util.concurrent.ExecutionException -import kafka.api.GroupAuthorizerIntegrationTest._ -import kafka.server.BaseRequestTest -import kafka.utils.{TestInfoUtils, TestUtils} -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.acl.{AccessControlEntry, AclOperation, AclPermissionType} -import org.apache.kafka.common.config.internals.BrokerSecurityConfigs -import org.apache.kafka.common.errors.TopicAuthorizationException -import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType} -import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal} -import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder -import org.apache.kafka.coordinator.group.GroupCoordinatorConfig -import org.apache.kafka.coordinator.transaction.TransactionLogConfig -import org.apache.kafka.metadata.authorizer.StandardAuthorizer -import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST -import org.apache.kafka.server.config.ServerConfigs -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.function.Executable -import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout} -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.MethodSource - -import scala.jdk.CollectionConverters._ - -object GroupAuthorizerIntegrationTest { - val BrokerPrincipal = new KafkaPrincipal("Group", "broker") - val ClientPrincipal = new KafkaPrincipal("Group", "client") - - val BrokerListenerName = "BROKER" - val ClientListenerName = "CLIENT" - val ControllerListenerName = "CONTROLLER" - - class GroupPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { - override def build(context: AuthenticationContext): KafkaPrincipal = { - context.listenerName match { - case BrokerListenerName | ControllerListenerName => BrokerPrincipal - case ClientListenerName => ClientPrincipal - case listenerName => throw new IllegalArgumentException(s"No principal mapped to listener $listenerName") - } - } - } -} - -class GroupAuthorizerIntegrationTest extends BaseRequestTest { - - val brokerId: Integer = 0 - - override def brokerCount: Int = 1 - override def interBrokerListenerName: ListenerName = new ListenerName(BrokerListenerName) - override def listenerName: ListenerName = new ListenerName(ClientListenerName) - - def brokerPrincipal: KafkaPrincipal = BrokerPrincipal - def clientPrincipal: KafkaPrincipal = ClientPrincipal - - override def kraftControllerConfigs(testInfo: TestInfo): collection.Seq[Properties] = { - val controllerConfigs = super.kraftControllerConfigs(testInfo) - controllerConfigs.foreach(addNodeProperties) - controllerConfigs - } - - override def brokerPropertyOverrides(properties: Properties): Unit = { - properties.put(ServerConfigs.BROKER_ID_CONFIG, brokerId.toString) - addNodeProperties(properties) - } - - private def addNodeProperties(properties: Properties): Unit = { - properties.put(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[StandardAuthorizer].getName) - properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString) - - properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1") - properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") - properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, "1") - properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") - properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1") - properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[GroupPrincipalBuilder].getName) - } - - @BeforeEach - override def setUp(testInfo: TestInfo): Unit = { - doSetup(testInfo, createOffsetsTopic = false) - - // Allow inter-broker communication - addAndVerifyAcls( - Set(createAcl(AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW, principal = BrokerPrincipal)), - new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL) - ) - - createOffsetsTopic(interBrokerListenerName) - } - - private def createAcl(aclOperation: AclOperation, - aclPermissionType: AclPermissionType, - principal: KafkaPrincipal = ClientPrincipal): AccessControlEntry = { - new AccessControlEntry(principal.toString, WILDCARD_HOST, aclOperation, aclPermissionType) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testUnauthorizedProduceAndConsume(groupProtocol: String): Unit = { - val topic = "topic" - val topicPartition = new TopicPartition("topic", 0) - - createTopic(topic, listenerName = interBrokerListenerName) - - val producer = createProducer() - val produceException = assertThrows(classOf[ExecutionException], - () => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()).getCause - assertTrue(produceException.isInstanceOf[TopicAuthorizationException]) - assertEquals(Set(topic), produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala) - - val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) - consumer.assign(java.util.List.of(topicPartition)) - val consumeException = assertThrows(classOf[TopicAuthorizationException], - () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)) - assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - @Timeout(60) - def testConsumeUnsubscribeWithoutGroupPermission(groupProtocol: String): Unit = { - val topic = "topic" - - createTopic(topic, listenerName = interBrokerListenerName) - - // allow topic read/write permission to poll/send record - addAndVerifyAcls( - Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)), - new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) - ) - val producer = createProducer() - producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get() - producer.close() - - // allow group read permission to join group - val group = "group" - addAndVerifyAcls( - Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), - new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) - ) - - val props = new Properties() - props.put(ConsumerConfig.GROUP_ID_CONFIG, group) - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - val consumer = createConsumer(configOverrides = props) - consumer.subscribe(java.util.List.of(topic)) - TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1) - - removeAndVerifyAcls( - Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), - new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) - ) - - assertDoesNotThrow(new Executable { - override def execute(): Unit = consumer.unsubscribe() - }) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testConsumeCloseWithoutGroupPermission(groupProtocol: String): Unit = { - val topic = "topic" - createTopic(topic, listenerName = interBrokerListenerName) - - // allow topic read/write permission to poll/send record - addAndVerifyAcls( - Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)), - new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) - ) - val producer = createProducer() - producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get() - - // allow group read permission to join group - val group = "group" - addAndVerifyAcls( - Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), - new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) - ) - - val props = new Properties() - props.put(ConsumerConfig.GROUP_ID_CONFIG, group) - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - val consumer = createConsumer(configOverrides = props) - consumer.subscribe(java.util.List.of(topic)) - TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1) - - removeAndVerifyAcls( - Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), - new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) - ) - - assertDoesNotThrow(new Executable { - override def execute(): Unit = consumer.close() - }) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAuthorizedProduceAndConsume(groupProtocol: String): Unit = { - val topic = "topic" - val topicPartition = new TopicPartition("topic", 0) - - createTopic(topic, listenerName = interBrokerListenerName) - - addAndVerifyAcls( - Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW)), - new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) - ) - val producer = createProducer() - producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get() - - addAndVerifyAcls( - Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), - new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) - ) - val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) - consumer.assign(java.util.List.of(topicPartition)) - TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1) - } - -}