diff --git a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java index d7c5dd1a3e5..ca324ab5788 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java @@ -160,6 +160,12 @@ public class AclControlManager { if (binding.pattern().name() == null || binding.pattern().name().isEmpty()) { throw new InvalidRequestException("Resource name should not be empty"); } + int colonIndex = binding.entry().principal().indexOf(":"); + if (colonIndex == -1) { + throw new InvalidRequestException("Could not parse principal from `" + + binding.entry().principal() + "` " + "(no colon is present separating the " + + "principal type from the principal name)"); + } } ControllerResult> deleteAcls(List filters) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java index 84143c8b3e1..30210fe0157 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java @@ -114,6 +114,34 @@ public class AclControlManagerTest { getMessage()); } + /** + * Verify that validateNewAcl catches invalid ACLs with principals that do not contain a colon. + */ + @Test + public void testValidateAclWithBadPrincipal() { + assertEquals("Could not parse principal from `invalid` (no colon is present " + + "separating the principal type from the principal name)", + assertThrows(InvalidRequestException.class, () -> + AclControlManager.validateNewAcl(new AclBinding( + new ResourcePattern(TOPIC, "*", LITERAL), + new AccessControlEntry("invalid", "*", ALTER, ALLOW)))). + getMessage()); + } + + /** + * Verify that validateNewAcl catches invalid ACLs with principals that do not contain a colon. + */ + @Test + public void testValidateAclWithEmptyPrincipal() { + assertEquals("Could not parse principal from `` (no colon is present " + + "separating the principal type from the principal name)", + assertThrows(InvalidRequestException.class, () -> + AclControlManager.validateNewAcl(new AclBinding( + new ResourcePattern(TOPIC, "*", LITERAL), + new AccessControlEntry("", "*", ALTER, ALLOW)))). + getMessage()); + } + /** * Verify that validateFilter catches invalid filters. */