MINOR: Restore original behavior of GroupAuthorizerIntegrationTest (#19968)
CI / build (push) Waiting to run Details

this is a follow-up for https://github.com/apache/kafka/pull/19685

The timeout issue in `AsyncConsumer#unsubscribe` was fixed by
https://github.com/apache/kafka/pull/19779. As a result, the test

`GroupAuthorizerIntegrationTest#testConsumeUnsubscribeWithoutGroupPermission`
should now retain its original behavior as expected prior to the issue.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Nick Guo 2025-06-22 22:08:20 +08:00 committed by GitHub
parent 22bef988d4
commit 583acb60d6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 46 additions and 4 deletions

View File

@ -141,7 +141,9 @@ public class GroupAuthorizerIntegrationTest {
} }
private void addAndVerifyAcls(Set<AccessControlEntry> acls, ResourcePattern resource, ClusterInstance clusterInstance) throws InterruptedException { private void addAndVerifyAcls(Set<AccessControlEntry> acls, ResourcePattern resource, ClusterInstance clusterInstance) throws InterruptedException {
List<AclBinding> aclBindings = acls.stream().map(acl -> new AclBinding(resource, acl)).toList(); List<AclBinding> aclBindings = acls.stream()
.map(acl -> new AclBinding(resource, acl))
.toList();
Authorizer authorizer = getAuthorizer(clusterInstance); Authorizer authorizer = getAuthorizer(clusterInstance);
authorizer.createAcls(ANONYMOUS_CONTEXT, aclBindings) authorizer.createAcls(ANONYMOUS_CONTEXT, aclBindings)
.forEach(future -> { .forEach(future -> {
@ -155,6 +157,29 @@ public class GroupAuthorizerIntegrationTest {
clusterInstance.waitAcls(aclBindingFilter, acls); clusterInstance.waitAcls(aclBindingFilter, acls);
} }
private void removeAndVerifyAcls(Set<AccessControlEntry> deleteAcls, ResourcePattern resource, ClusterInstance clusterInstance) throws InterruptedException {
List<AclBindingFilter> aclBindingFilters = deleteAcls.stream()
.map(acl -> new AclBindingFilter(resource.toFilter(), acl.toFilter()))
.toList();
Authorizer authorizer = getAuthorizer(clusterInstance);
authorizer.deleteAcls(ANONYMOUS_CONTEXT, aclBindingFilters)
.forEach(future -> {
try {
future.toCompletableFuture().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Failed to delete ACLs", e);
}
});
AclBindingFilter aclBindingFilter = new AclBindingFilter(resource.toFilter(), AccessControlEntryFilter.ANY);
TestUtils.waitForCondition(() -> {
Set<AccessControlEntry> remainingAclEntries = new HashSet<>();
authorizer.acls(aclBindingFilter).forEach(aclBinding -> remainingAclEntries.add(aclBinding.entry()));
return deleteAcls.stream().noneMatch(remainingAclEntries::contains);
}, "Failed to verify ACLs deletion");
}
static final AuthorizableRequestContext ANONYMOUS_CONTEXT = new AuthorizableRequestContext() { static final AuthorizableRequestContext ANONYMOUS_CONTEXT = new AuthorizableRequestContext() {
@Override @Override
public String listenerName() { public String listenerName() {
@ -253,17 +278,27 @@ public class GroupAuthorizerIntegrationTest {
} }
} }
@ClusterTest
public void testClassicConsumeUnsubscribeWithGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
testConsumeUnsubscribeWithOrWithoutGroupPermission(clusterInstance, GroupProtocol.CLASSIC, true);
}
@ClusterTest
public void testAsyncConsumeUnsubscribeWithGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
testConsumeUnsubscribeWithOrWithoutGroupPermission(clusterInstance, GroupProtocol.CONSUMER, true);
}
@ClusterTest @ClusterTest
public void testClassicConsumeUnsubscribeWithoutGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { public void testClassicConsumeUnsubscribeWithoutGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
testConsumeUnsubscribeWithGroupPermission(clusterInstance, GroupProtocol.CLASSIC); testConsumeUnsubscribeWithOrWithoutGroupPermission(clusterInstance, GroupProtocol.CLASSIC, false);
} }
@ClusterTest @ClusterTest
public void testAsyncConsumeUnsubscribeWithoutGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { public void testAsyncConsumeUnsubscribeWithoutGroupPermission(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
testConsumeUnsubscribeWithGroupPermission(clusterInstance, GroupProtocol.CONSUMER); testConsumeUnsubscribeWithOrWithoutGroupPermission(clusterInstance, GroupProtocol.CONSUMER, false);
} }
private void testConsumeUnsubscribeWithGroupPermission(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException, ExecutionException { private void testConsumeUnsubscribeWithOrWithoutGroupPermission(ClusterInstance clusterInstance, GroupProtocol groupProtocol, boolean withGroupPermission) throws InterruptedException, ExecutionException {
setup(clusterInstance); setup(clusterInstance);
String topic = "topic"; String topic = "topic";
String group = "group"; String group = "group";
@ -297,6 +332,13 @@ public class GroupAuthorizerIntegrationTest {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(15)); ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(15));
return records.count() == 1; return records.count() == 1;
}, "consumer failed to receive message"); }, "consumer failed to receive message");
if (!withGroupPermission) {
removeAndVerifyAcls(
Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW, CLIENT_PRINCIPAL)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL),
clusterInstance
);
}
assertDoesNotThrow(consumer::unsubscribe); assertDoesNotThrow(consumer::unsubscribe);
} }
} }