KAFKA-19042 Move GroupAuthorizerIntegrationTest to clients-integration-tests module (#19685)
CI / build (push) Waiting to run Details

move GroupAuthorizerIntegrationTest to clients-integration-tests module

Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
 <payang@apache.org>, keemsisi <keemsisi@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
Nick Guo 2025-05-31 02:34:56 +08:00 committed by GitHub
parent 8323168b57
commit cc0f06554b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 402 additions and 239 deletions

View File

@ -0,0 +1,402 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.security;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.config.ServerConfigs;
import java.net.InetAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
import static org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ClusterTestDefaults(serverProperties = {
@ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG, value = "Group:broker"),
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
@ClusterConfigProperty(key = ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, value = "org.apache.kafka.metadata.authorizer.StandardAuthorizer"),
@ClusterConfigProperty(key = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, value = "org.apache.kafka.clients.security.GroupAuthorizerIntegrationTest$GroupPrincipalBuilder"),
})
public class GroupAuthorizerIntegrationTest {
private static final KafkaPrincipal BROKER_PRINCIPAL = new KafkaPrincipal("Group", "broker");
private static final KafkaPrincipal CLIENT_PRINCIPAL = new KafkaPrincipal("Group", "client");
private static final String BROKER_LISTENER_NAME = "BROKER";
private static final String CLIENT_LISTENER_NAME = "EXTERNAL";
private static final String CONTROLLER_LISTENER_NAME = "CONTROLLER";
private Authorizer getAuthorizer(ClusterInstance clusterInstance) {
return clusterInstance.controllers().values().stream()
.filter(server -> server.authorizerPlugin().isDefined())
.map(server -> server.authorizerPlugin().get().get()).findFirst().get();
}
private void setup(ClusterInstance clusterInstance) throws InterruptedException {
// Allow inter-broker communication
addAndVerifyAcls(
Set.of(createAcl(AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW, BROKER_PRINCIPAL)),
new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL),
clusterInstance
);
addAndVerifyAcls(
Set.of(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)),
new ResourcePattern(ResourceType.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME, PatternType.LITERAL),
clusterInstance
);
NewTopic offsetTopic = new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, 1, (short) 1);
try (Admin admin = clusterInstance.admin(Map.of(
AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true))
) {
admin.createTopics(Collections.singleton(offsetTopic));
clusterInstance.waitForTopic(Topic.GROUP_METADATA_TOPIC_NAME, 1);
}
}
public static class GroupPrincipalBuilder extends DefaultKafkaPrincipalBuilder {
public GroupPrincipalBuilder() {
super(null, null);
}
@Override
public KafkaPrincipal build(AuthenticationContext context) {
String listenerName = context.listenerName();
return switch (listenerName) {
case BROKER_LISTENER_NAME, CONTROLLER_LISTENER_NAME -> BROKER_PRINCIPAL;
case CLIENT_LISTENER_NAME -> CLIENT_PRINCIPAL;
default -> throw new IllegalArgumentException("No principal mapped to listener " + listenerName);
};
}
}
private AccessControlEntry createAcl(AclOperation aclOperation, AclPermissionType aclPermissionType, KafkaPrincipal principal) {
return new AccessControlEntry(
principal.toString(),
WILDCARD_HOST,
aclOperation,
aclPermissionType
);
}
private void addAndVerifyAcls(Set<AccessControlEntry> acls, ResourcePattern resource, ClusterInstance clusterInstance) throws InterruptedException {
List<AclBinding> aclBindings = acls.stream().map(acl -> new AclBinding(resource, acl)).toList();
Authorizer authorizer = getAuthorizer(clusterInstance);
authorizer.createAcls(ANONYMOUS_CONTEXT, aclBindings)
.forEach(future -> {
try {
future.toCompletableFuture().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Failed to create ACLs", e);
}
});
AclBindingFilter aclBindingFilter = new AclBindingFilter(resource.toFilter(), AccessControlEntryFilter.ANY);
clusterInstance.waitAcls(aclBindingFilter, acls);
}
static final AuthorizableRequestContext ANONYMOUS_CONTEXT = new AuthorizableRequestContext() {
@Override
public String listenerName() {
return "";
}
@Override
public SecurityProtocol securityProtocol() {
return SecurityProtocol.PLAINTEXT;
}
@Override
public KafkaPrincipal principal() {
return KafkaPrincipal.ANONYMOUS;
}
@Override
public InetAddress clientAddress() {
return null;
}
@Override
public int requestType() {
return 0;
}
@Override
public int requestVersion() {
return 0;
}
@Override
public String clientId() {
return "";
}
@Override
public int correlationId() {
return 0;
}
};
@ClusterTest
public void testUnauthorizedProduceAndConsumeWithClassicConsumer(ClusterInstance clusterInstance) throws InterruptedException {
testUnauthorizedProduceAndConsume(clusterInstance, GroupProtocol.CLASSIC);
}
@ClusterTest
public void testUnauthorizedProduceAndConsumeWithAsyncConsumer(ClusterInstance clusterInstance) throws InterruptedException {
testUnauthorizedProduceAndConsume(clusterInstance, GroupProtocol.CONSUMER);
}
public void testUnauthorizedProduceAndConsume(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException {
setup(clusterInstance);
String topic = "topic";
String group = "group";
addAndVerifyAcls(
Set.of(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)),
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL),
clusterInstance
);
addAndVerifyAcls(
Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL),
clusterInstance
);
Producer<byte[], byte[]> producer = clusterInstance.producer();
Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(
GROUP_PROTOCOL_CONFIG, groupProtocol.name.toLowerCase(Locale.ROOT),
ConsumerConfig.GROUP_ID_CONFIG, group
));
try {
clusterInstance.createTopic(topic, 1, (short) 1);
ExecutionException produceException = assertThrows(
ExecutionException.class,
() -> producer.send(new ProducerRecord<>(topic, "message".getBytes())).get()
);
Throwable cause = produceException.getCause();
assertInstanceOf(TopicAuthorizationException.class, cause);
TopicAuthorizationException topicAuthException = (TopicAuthorizationException) cause;
assertEquals(Set.of(topic), topicAuthException.unauthorizedTopics());
TopicPartition topicPartition = new TopicPartition(topic, 0);
consumer.assign(Collections.singletonList(topicPartition));
TopicAuthorizationException consumeException = assertThrows(
TopicAuthorizationException.class,
() -> consumer.poll(Duration.ofSeconds(15))
);
assertEquals(consumeException.unauthorizedTopics(), topicAuthException.unauthorizedTopics());
} finally {
producer.close(Duration.ZERO);
consumer.close();
}
}
@ClusterTest
public void testClassicConsumeUnsubscribeWithoutGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
testConsumeUnsubscribeWithGroupPermission(clusterInstance, GroupProtocol.CLASSIC);
}
@ClusterTest
public void testAsyncConsumeUnsubscribeWithoutGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
testConsumeUnsubscribeWithGroupPermission(clusterInstance, GroupProtocol.CONSUMER);
}
private void testConsumeUnsubscribeWithGroupPermission(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
setup(clusterInstance);
String topic = "topic";
String group = "group";
// allow topic read/write permission to poll/send record
Set<AccessControlEntry> acls = new HashSet<>();
acls.add(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW, CLIENT_PRINCIPAL));
acls.add(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW, CLIENT_PRINCIPAL));
acls.add(createAcl(AclOperation.READ, AclPermissionType.ALLOW, CLIENT_PRINCIPAL));
addAndVerifyAcls(
acls,
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL),
clusterInstance
);
addAndVerifyAcls(
Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL),
clusterInstance
);
try (Producer<byte[], byte[]> producer = clusterInstance.producer();
Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(
ConsumerConfig.GROUP_ID_CONFIG, group,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
GROUP_PROTOCOL_CONFIG, groupProtocol.name.toLowerCase(Locale.ROOT)))
) {
clusterInstance.createTopic(topic, 1, (short) 1);
producer.send(new ProducerRecord<>(topic, "message".getBytes())).get();
consumer.subscribe(Collections.singletonList(topic));
TestUtils.waitForCondition(() -> {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(15));
return records.count() == 1;
}, "consumer failed to receive message");
assertDoesNotThrow(consumer::unsubscribe);
}
}
@ClusterTest
public void testClassicConsumeCloseWithGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
testConsumeCloseWithGroupPermission(clusterInstance, GroupProtocol.CLASSIC);
}
@ClusterTest
public void testAsyncConsumeCloseWithGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
testConsumeCloseWithGroupPermission(clusterInstance, GroupProtocol.CONSUMER);
}
private void testConsumeCloseWithGroupPermission(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
setup(clusterInstance);
String topic = "topic";
String group = "group";
// allow topic read/write permission to poll/send record
Set<AccessControlEntry> acls = new HashSet<>();
acls.add(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW, CLIENT_PRINCIPAL));
acls.add(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW, CLIENT_PRINCIPAL));
acls.add(createAcl(AclOperation.READ, AclPermissionType.ALLOW, CLIENT_PRINCIPAL));
addAndVerifyAcls(
acls,
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL),
clusterInstance
);
addAndVerifyAcls(
Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL),
clusterInstance
);
Producer<Object, Object> producer = clusterInstance.producer();
Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(
ConsumerConfig.GROUP_ID_CONFIG, group,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
GROUP_PROTOCOL_CONFIG, groupProtocol.name.toLowerCase(Locale.ROOT)));
try {
clusterInstance.createTopic(topic, 1, (short) 1);
producer.send(new ProducerRecord<>(topic, "message".getBytes())).get();
consumer.subscribe(List.of(topic));
TestUtils.waitForCondition(() -> {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(15));
return records.count() == 1;
}, "consumer failed to receive message");
} finally {
producer.close();
assertDoesNotThrow(() -> consumer.close());
}
}
@ClusterTest
public void testAuthorizedProduceAndConsumeWithClassic(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
testAuthorizedProduceAndConsume(clusterInstance, GroupProtocol.CLASSIC);
}
@ClusterTest
public void testAuthorizedProduceAndConsumeWithAsync(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
testAuthorizedProduceAndConsume(clusterInstance, GroupProtocol.CONSUMER);
}
private void testAuthorizedProduceAndConsume(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
setup(clusterInstance);
String topic = "topic";
String group = "group";
Set<AccessControlEntry> acls = new HashSet<>();
acls.add(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW, CLIENT_PRINCIPAL));
acls.add(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW, CLIENT_PRINCIPAL));
acls.add(createAcl(AclOperation.READ, AclPermissionType.ALLOW, CLIENT_PRINCIPAL));
addAndVerifyAcls(
acls,
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL),
clusterInstance
);
addAndVerifyAcls(
Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL),
clusterInstance
);
try (Producer<byte[], byte[]> producer = clusterInstance.producer();
Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(
ConsumerConfig.GROUP_ID_CONFIG, group,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
GROUP_PROTOCOL_CONFIG, groupProtocol.name.toLowerCase(Locale.ROOT)))
) {
clusterInstance.createTopic(topic, 1, (short) 1);
producer.send(new ProducerRecord<>(topic, "message".getBytes())).get();
TopicPartition topicPartition = new TopicPartition(topic, 0);
consumer.assign(List.of(topicPartition));
TestUtils.waitForCondition(() -> {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(15));
return records.count() == 1;
}, "consumer failed to receive message");
}
}
}

View File

@ -1,239 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package kafka.api
import java.util.Properties
import java.util.concurrent.ExecutionException
import kafka.api.GroupAuthorizerIntegrationTest._
import kafka.server.BaseRequestTest
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.acl.{AccessControlEntry, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.errors.TopicAuthorizationException
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import scala.jdk.CollectionConverters._
object GroupAuthorizerIntegrationTest {
val BrokerPrincipal = new KafkaPrincipal("Group", "broker")
val ClientPrincipal = new KafkaPrincipal("Group", "client")
val BrokerListenerName = "BROKER"
val ClientListenerName = "CLIENT"
val ControllerListenerName = "CONTROLLER"
class GroupPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
context.listenerName match {
case BrokerListenerName | ControllerListenerName => BrokerPrincipal
case ClientListenerName => ClientPrincipal
case listenerName => throw new IllegalArgumentException(s"No principal mapped to listener $listenerName")
}
}
}
}
class GroupAuthorizerIntegrationTest extends BaseRequestTest {
val brokerId: Integer = 0
override def brokerCount: Int = 1
override def interBrokerListenerName: ListenerName = new ListenerName(BrokerListenerName)
override def listenerName: ListenerName = new ListenerName(ClientListenerName)
def brokerPrincipal: KafkaPrincipal = BrokerPrincipal
def clientPrincipal: KafkaPrincipal = ClientPrincipal
override def kraftControllerConfigs(testInfo: TestInfo): collection.Seq[Properties] = {
val controllerConfigs = super.kraftControllerConfigs(testInfo)
controllerConfigs.foreach(addNodeProperties)
controllerConfigs
}
override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.put(ServerConfigs.BROKER_ID_CONFIG, brokerId.toString)
addNodeProperties(properties)
}
private def addNodeProperties(properties: Properties): Unit = {
properties.put(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[StandardAuthorizer].getName)
properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString)
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1")
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, "1")
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1")
properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[GroupPrincipalBuilder].getName)
}
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
doSetup(testInfo, createOffsetsTopic = false)
// Allow inter-broker communication
addAndVerifyAcls(
Set(createAcl(AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW, principal = BrokerPrincipal)),
new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
)
createOffsetsTopic(interBrokerListenerName)
}
private def createAcl(aclOperation: AclOperation,
aclPermissionType: AclPermissionType,
principal: KafkaPrincipal = ClientPrincipal): AccessControlEntry = {
new AccessControlEntry(principal.toString, WILDCARD_HOST, aclOperation, aclPermissionType)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testUnauthorizedProduceAndConsume(groupProtocol: String): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition("topic", 0)
createTopic(topic, listenerName = interBrokerListenerName)
val producer = createProducer()
val produceException = assertThrows(classOf[ExecutionException],
() => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()).getCause
assertTrue(produceException.isInstanceOf[TopicAuthorizationException])
assertEquals(Set(topic), produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala)
val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
consumer.assign(java.util.List.of(topicPartition))
val consumeException = assertThrows(classOf[TopicAuthorizationException],
() => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
@Timeout(60)
def testConsumeUnsubscribeWithoutGroupPermission(groupProtocol: String): Unit = {
val topic = "topic"
createTopic(topic, listenerName = interBrokerListenerName)
// allow topic read/write permission to poll/send record
addAndVerifyAcls(
Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
)
val producer = createProducer()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()
producer.close()
// allow group read permission to join group
val group = "group"
addAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)
val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val consumer = createConsumer(configOverrides = props)
consumer.subscribe(java.util.List.of(topic))
TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
removeAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)
assertDoesNotThrow(new Executable {
override def execute(): Unit = consumer.unsubscribe()
})
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testConsumeCloseWithoutGroupPermission(groupProtocol: String): Unit = {
val topic = "topic"
createTopic(topic, listenerName = interBrokerListenerName)
// allow topic read/write permission to poll/send record
addAndVerifyAcls(
Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
)
val producer = createProducer()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()
// allow group read permission to join group
val group = "group"
addAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)
val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val consumer = createConsumer(configOverrides = props)
consumer.subscribe(java.util.List.of(topic))
TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
removeAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)
assertDoesNotThrow(new Executable {
override def execute(): Unit = consumer.close()
})
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testAuthorizedProduceAndConsume(groupProtocol: String): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition("topic", 0)
createTopic(topic, listenerName = interBrokerListenerName)
addAndVerifyAcls(
Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
)
val producer = createProducer()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()
addAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
)
val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
consumer.assign(java.util.List.of(topicPartition))
TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
}
}