diff --git a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java index ca324ab5788..08ed04d5301 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java @@ -106,10 +106,11 @@ public class AclControlManager { continue; } StandardAcl standardAcl = StandardAcl.fromAclBinding(acl); - if (existingAcls.add(standardAcl)) { + if (!existingAcls.contains(standardAcl)) { StandardAclWithId standardAclWithId = new StandardAclWithId(newAclId(), standardAcl); - idToAcl.put(standardAclWithId.id(), standardAcl); records.add(new ApiMessageAndVersion(standardAclWithId.toRecord(), (short) 0)); + } else { + log.debug("Not creating ACL since it already exists: {}", standardAcl); } results.add(AclCreateResult.SUCCESS); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java index 33a9333d475..64f193b5ef8 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java @@ -346,6 +346,7 @@ public class AclControlManagerTest { new AccessControlEntry("User:user", "10.0.0.1", AclOperation.ALL, ALLOW)); ControllerResult> createResult = manager.createAcls(List.of(aclBinding)); + RecordTestUtils.replayAll(manager, createResult.records()); Uuid id = ((AccessControlEntryRecord) createResult.records().get(0).message()).id(); assertEquals(1, createResult.records().size());