From 583acb60d68795f5e55d7ac5ac3b0f0b1508a611 Mon Sep 17 00:00:00 2001 From: Nick Guo Date: Sun, 22 Jun 2025 22:08:20 +0800 Subject: [PATCH] MINOR: Restore original behavior of GroupAuthorizerIntegrationTest (#19968) this is a follow-up for https://github.com/apache/kafka/pull/19685 The timeout issue in `AsyncConsumer#unsubscribe` was fixed by https://github.com/apache/kafka/pull/19779. As a result, the test `GroupAuthorizerIntegrationTest#testConsumeUnsubscribeWithoutGroupPermission` should now retain its original behavior as expected prior to the issue. Reviewers: Chia-Ping Tsai --- .../GroupAuthorizerIntegrationTest.java | 50 +++++++++++++++++-- 1 file changed, 46 insertions(+), 4 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java index 725c0f53786..a1eb4c4c027 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java @@ -141,7 +141,9 @@ public class GroupAuthorizerIntegrationTest { } private void addAndVerifyAcls(Set acls, ResourcePattern resource, ClusterInstance clusterInstance) throws InterruptedException { - List aclBindings = acls.stream().map(acl -> new AclBinding(resource, acl)).toList(); + List aclBindings = acls.stream() + .map(acl -> new AclBinding(resource, acl)) + .toList(); Authorizer authorizer = getAuthorizer(clusterInstance); authorizer.createAcls(ANONYMOUS_CONTEXT, aclBindings) .forEach(future -> { @@ -155,6 +157,29 @@ public class GroupAuthorizerIntegrationTest { clusterInstance.waitAcls(aclBindingFilter, acls); } + private void removeAndVerifyAcls(Set deleteAcls, ResourcePattern resource, ClusterInstance clusterInstance) throws InterruptedException { + List aclBindingFilters = deleteAcls.stream() + .map(acl -> new AclBindingFilter(resource.toFilter(), acl.toFilter())) + .toList(); + Authorizer authorizer = getAuthorizer(clusterInstance); + authorizer.deleteAcls(ANONYMOUS_CONTEXT, aclBindingFilters) + .forEach(future -> { + try { + future.toCompletableFuture().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Failed to delete ACLs", e); + } + }); + + AclBindingFilter aclBindingFilter = new AclBindingFilter(resource.toFilter(), AccessControlEntryFilter.ANY); + TestUtils.waitForCondition(() -> { + Set remainingAclEntries = new HashSet<>(); + authorizer.acls(aclBindingFilter).forEach(aclBinding -> remainingAclEntries.add(aclBinding.entry())); + return deleteAcls.stream().noneMatch(remainingAclEntries::contains); + }, "Failed to verify ACLs deletion"); + } + + static final AuthorizableRequestContext ANONYMOUS_CONTEXT = new AuthorizableRequestContext() { @Override public String listenerName() { @@ -253,17 +278,27 @@ public class GroupAuthorizerIntegrationTest { } } + @ClusterTest + public void testClassicConsumeUnsubscribeWithGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + testConsumeUnsubscribeWithOrWithoutGroupPermission(clusterInstance, GroupProtocol.CLASSIC, true); + } + + @ClusterTest + public void testAsyncConsumeUnsubscribeWithGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + testConsumeUnsubscribeWithOrWithoutGroupPermission(clusterInstance, GroupProtocol.CONSUMER, true); + } + @ClusterTest public void testClassicConsumeUnsubscribeWithoutGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { - testConsumeUnsubscribeWithGroupPermission(clusterInstance, GroupProtocol.CLASSIC); + testConsumeUnsubscribeWithOrWithoutGroupPermission(clusterInstance, GroupProtocol.CLASSIC, false); } @ClusterTest public void testAsyncConsumeUnsubscribeWithoutGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { - testConsumeUnsubscribeWithGroupPermission(clusterInstance, GroupProtocol.CONSUMER); + testConsumeUnsubscribeWithOrWithoutGroupPermission(clusterInstance, GroupProtocol.CONSUMER, false); } - private void testConsumeUnsubscribeWithGroupPermission(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException, ExecutionException { + private void testConsumeUnsubscribeWithOrWithoutGroupPermission(ClusterInstance clusterInstance, GroupProtocol groupProtocol, boolean withGroupPermission) throws InterruptedException, ExecutionException { setup(clusterInstance); String topic = "topic"; String group = "group"; @@ -297,6 +332,13 @@ public class GroupAuthorizerIntegrationTest { ConsumerRecords records = consumer.poll(Duration.ofSeconds(15)); return records.count() == 1; }, "consumer failed to receive message"); + if (!withGroupPermission) { + removeAndVerifyAcls( + Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL), + clusterInstance + ); + } assertDoesNotThrow(consumer::unsubscribe); } }