From 5372e626d9472b72552a9dca14d2f7e4cdf106ff Mon Sep 17 00:00:00 2001 From: Alyssa Huang Date: Tue, 24 Jun 2025 11:24:50 -0700 Subject: [PATCH] KAFKA-19411: Fix deleteAcls bug which allows more deletions than max records per user op (#19974) If there are more deletion filters after we initially hit the `MAX_RECORDS_PER_USER_OP` bound, we will add an additional deletion record ontop of that for each additional filter. The current error message returned to the client is not useful either, adding logic so client doesn't just get `UNKNOWN_SERVER_EXCEPTION` with no details returned. Conflicts: - In AclControlManagerTest.java, use !isPresent instead of isEmpty so that this will work with java 8. --- .../kafka/controller/AclControlManager.java | 13 +++-- .../controller/AclControlManagerTest.java | 58 +++++++++++++++++++ 2 files changed, 67 insertions(+), 4 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java index f849b540a57..a6945fde652 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java @@ -175,6 +175,10 @@ public class AclControlManager { validateFilter(filter); AclDeleteResult result = deleteAclsForFilter(filter, records); results.add(result); + } catch (BoundedListTooLongException e) { + // we do not return partial results here because the fact that only a portion of the deletions + // succeeded can be easily missed due to response size. instead fail the entire response + throw new InvalidRequestException(e.getMessage(), e); } catch (Throwable e) { results.add(new AclDeleteResult(ApiError.fromThrowable(e).exception())); } @@ -190,13 +194,14 @@ public class AclControlManager { StandardAcl acl = entry.getValue(); AclBinding binding = acl.toBinding(); if (filter.matches(binding)) { - deleted.add(new AclBindingDeleteResult(binding)); - records.add(new ApiMessageAndVersion( - new RemoveAccessControlEntryRecord().setId(id), (short) 0)); - if (records.size() > MAX_RECORDS_PER_USER_OP) { + // check size limitation first before adding additional records + if (records.size() >= MAX_RECORDS_PER_USER_OP) { throw new BoundedListTooLongException("Cannot remove more than " + MAX_RECORDS_PER_USER_OP + " acls in a single delete operation."); } + deleted.add(new AclBindingDeleteResult(binding)); + records.add(new ApiMessageAndVersion( + new RemoveAccessControlEntryRecord().setId(id), (short) 0)); } } return new AclDeleteResult(deleted); diff --git a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java index a62b9f00d6d..ecf957333f6 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java @@ -48,6 +48,7 @@ import org.apache.kafka.server.authorizer.AuthorizableRequestContext; import org.apache.kafka.server.authorizer.AuthorizationResult; import org.apache.kafka.server.authorizer.AuthorizerServerInfo; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.mutable.BoundedListTooLongException; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -70,6 +71,7 @@ import static org.apache.kafka.common.acl.AclPermissionType.ALLOW; import static org.apache.kafka.common.resource.PatternType.LITERAL; import static org.apache.kafka.common.resource.PatternType.MATCH; import static org.apache.kafka.common.resource.ResourceType.TOPIC; +import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP; import static org.apache.kafka.metadata.authorizer.StandardAclWithIdTest.TEST_ACLS; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -365,4 +367,60 @@ public class AclControlManagerTest { assertEquals(id, ((RemoveAccessControlEntryRecord) deleteAclResultsBothFilters.records().get(0).message()).id()); assertEquals(2, deleteAclResultsBothFilters.response().size()); } + + @Test + public void testDeleteExceedsMaxRecords() { + AclControlManager manager = new AclControlManager.Builder().build(); + MockClusterMetadataAuthorizer authorizer = new MockClusterMetadataAuthorizer(); + authorizer.loadSnapshot(manager.idToAcl()); + + List firstCreate = new ArrayList<>(); + List secondCreate = new ArrayList<>(); + + // create MAX_RECORDS_PER_USER_OP + 2 ACLs + for (int i = 0; i < MAX_RECORDS_PER_USER_OP + 2; i++) { + StandardAclWithId acl = new StandardAclWithId(Uuid.randomUuid(), + new StandardAcl( + ResourceType.TOPIC, + "mytopic_" + i, + PatternType.LITERAL, + "User:alice", + "127.0.0.1", + AclOperation.READ, + AclPermissionType.ALLOW)); + + // split acl creations between two create requests + if (i % 2 == 0) { + firstCreate.add(acl.toBinding()); + } else { + secondCreate.add(acl.toBinding()); + } + } + ControllerResult> firstCreateResult = manager.createAcls(firstCreate); + assertEquals((MAX_RECORDS_PER_USER_OP / 2) + 1, firstCreateResult.response().size()); + for (AclCreateResult result : firstCreateResult.response()) { + assertTrue(!result.exception().isPresent()); + } + + ControllerResult> secondCreateResult = manager.createAcls(secondCreate); + assertEquals((MAX_RECORDS_PER_USER_OP / 2) + 1, secondCreateResult.response().size()); + for (AclCreateResult result : secondCreateResult.response()) { + assertTrue(!result.exception().isPresent()); + } + + RecordTestUtils.replayAll(manager, firstCreateResult.records()); + RecordTestUtils.replayAll(manager, secondCreateResult.records()); + assertFalse(manager.idToAcl().isEmpty()); + + ArrayList filters = new ArrayList<>(); + for (int i = 0; i < MAX_RECORDS_PER_USER_OP + 2; i++) { + filters.add(new AclBindingFilter( + new ResourcePatternFilter(ResourceType.TOPIC, "mytopic_" + i, PatternType.LITERAL), + AccessControlEntryFilter.ANY)); + } + + Exception exception = assertThrows(InvalidRequestException.class, () -> manager.deleteAcls(filters)); + assertEquals(BoundedListTooLongException.class, exception.getCause().getClass()); + assertEquals("Cannot remove more than " + MAX_RECORDS_PER_USER_OP + " acls in a single delete operation.", exception.getCause().getMessage()); + } }