KAFKA-19411: Fix deleteAcls bug which allows more deletions than max records per user op (#19974)

If there are more deletion filters after we initially hit the
`MAX_RECORDS_PER_USER_OP` bound, we will add an additional deletion
record ontop of that for each additional filter.

The current error message returned to the client is not useful either,
adding logic so client doesn't just get `UNKNOWN_SERVER_EXCEPTION` with
no details returned.
This commit is contained in:
Alyssa Huang 2025-06-24 11:24:50 -07:00 committed by Colin P. McCabe
parent 019ab2cb11
commit 3f0ae7fd53
2 changed files with 67 additions and 4 deletions

View File

@ -184,6 +184,10 @@ public class AclControlManager {
validateFilter(filter); validateFilter(filter);
AclDeleteResult result = deleteAclsForFilter(filter, records); AclDeleteResult result = deleteAclsForFilter(filter, records);
results.add(result); results.add(result);
} catch (BoundedListTooLongException e) {
// we do not return partial results here because the fact that only a portion of the deletions
// succeeded can be easily missed due to response size. instead fail the entire response
throw new InvalidRequestException(e.getMessage(), e);
} catch (Throwable e) { } catch (Throwable e) {
results.add(new AclDeleteResult(ApiError.fromThrowable(e).exception())); results.add(new AclDeleteResult(ApiError.fromThrowable(e).exception()));
} }
@ -199,13 +203,14 @@ public class AclControlManager {
StandardAcl acl = entry.getValue(); StandardAcl acl = entry.getValue();
AclBinding binding = acl.toBinding(); AclBinding binding = acl.toBinding();
if (filter.matches(binding)) { if (filter.matches(binding)) {
deleted.add(new AclBindingDeleteResult(binding)); // check size limitation first before adding additional records
records.add(new ApiMessageAndVersion( if (records.size() >= MAX_RECORDS_PER_USER_OP) {
new RemoveAccessControlEntryRecord().setId(id), (short) 0));
if (records.size() > MAX_RECORDS_PER_USER_OP) {
throw new BoundedListTooLongException("Cannot remove more than " + throw new BoundedListTooLongException("Cannot remove more than " +
MAX_RECORDS_PER_USER_OP + " acls in a single delete operation."); MAX_RECORDS_PER_USER_OP + " acls in a single delete operation.");
} }
deleted.add(new AclBindingDeleteResult(binding));
records.add(new ApiMessageAndVersion(
new RemoveAccessControlEntryRecord().setId(id), (short) 0));
} }
} }
return new AclDeleteResult(deleted); return new AclDeleteResult(deleted);

View File

@ -48,6 +48,7 @@ import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult; import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo; import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.mutable.BoundedListTooLongException;
import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -69,6 +70,7 @@ import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
import static org.apache.kafka.common.resource.PatternType.LITERAL; import static org.apache.kafka.common.resource.PatternType.LITERAL;
import static org.apache.kafka.common.resource.PatternType.MATCH; import static org.apache.kafka.common.resource.PatternType.MATCH;
import static org.apache.kafka.common.resource.ResourceType.TOPIC; import static org.apache.kafka.common.resource.ResourceType.TOPIC;
import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
import static org.apache.kafka.metadata.authorizer.StandardAclWithIdTest.TEST_ACLS; import static org.apache.kafka.metadata.authorizer.StandardAclWithIdTest.TEST_ACLS;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
@ -380,4 +382,60 @@ public class AclControlManagerTest {
assertEquals(id, ((RemoveAccessControlEntryRecord) deleteAclResultsBothFilters.records().get(0).message()).id()); assertEquals(id, ((RemoveAccessControlEntryRecord) deleteAclResultsBothFilters.records().get(0).message()).id());
assertEquals(2, deleteAclResultsBothFilters.response().size()); assertEquals(2, deleteAclResultsBothFilters.response().size());
} }
@Test
public void testDeleteExceedsMaxRecords() {
AclControlManager manager = new AclControlManager.Builder().build();
MockClusterMetadataAuthorizer authorizer = new MockClusterMetadataAuthorizer();
authorizer.loadSnapshot(manager.idToAcl());
List<AclBinding> firstCreate = new ArrayList<>();
List<AclBinding> secondCreate = new ArrayList<>();
// create MAX_RECORDS_PER_USER_OP + 2 ACLs
for (int i = 0; i < MAX_RECORDS_PER_USER_OP + 2; i++) {
StandardAclWithId acl = new StandardAclWithId(Uuid.randomUuid(),
new StandardAcl(
ResourceType.TOPIC,
"mytopic_" + i,
PatternType.LITERAL,
"User:alice",
"127.0.0.1",
AclOperation.READ,
AclPermissionType.ALLOW));
// split acl creations between two create requests
if (i % 2 == 0) {
firstCreate.add(acl.toBinding());
} else {
secondCreate.add(acl.toBinding());
}
}
ControllerResult<List<AclCreateResult>> firstCreateResult = manager.createAcls(firstCreate);
assertEquals((MAX_RECORDS_PER_USER_OP / 2) + 1, firstCreateResult.response().size());
for (AclCreateResult result : firstCreateResult.response()) {
assertTrue(result.exception().isEmpty());
}
ControllerResult<List<AclCreateResult>> secondCreateResult = manager.createAcls(secondCreate);
assertEquals((MAX_RECORDS_PER_USER_OP / 2) + 1, secondCreateResult.response().size());
for (AclCreateResult result : secondCreateResult.response()) {
assertTrue(result.exception().isEmpty());
}
RecordTestUtils.replayAll(manager, firstCreateResult.records());
RecordTestUtils.replayAll(manager, secondCreateResult.records());
assertFalse(manager.idToAcl().isEmpty());
ArrayList<AclBindingFilter> filters = new ArrayList<>();
for (int i = 0; i < MAX_RECORDS_PER_USER_OP + 2; i++) {
filters.add(new AclBindingFilter(
new ResourcePatternFilter(ResourceType.TOPIC, "mytopic_" + i, PatternType.LITERAL),
AccessControlEntryFilter.ANY));
}
Exception exception = assertThrows(InvalidRequestException.class, () -> manager.deleteAcls(filters));
assertEquals(BoundedListTooLongException.class, exception.getCause().getClass());
assertEquals("Cannot remove more than " + MAX_RECORDS_PER_USER_OP + " acls in a single delete operation.", exception.getCause().getMessage());
}
} }