KAFKA-15009: Handle new ACLs in KRaft snapshot during migration (#13741)

When loading a snapshot during dual-write mode, we were missing the logic to detect new ACLs that 
had been added on the KRaft side. This patch adds support for finding those new ACLs as well as tests
to verify the correct behavior.

Reviewers: David Arthur <mumrah@gmail.com>
This commit is contained in:
Akhilesh C 2023-05-23 07:43:02 -07:00 committed by GitHub
parent 15f8705246
commit ea6ce3bf82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 90 additions and 5 deletions

View File

@ -16,19 +16,21 @@
*/ */
package kafka.zk.migration package kafka.zk.migration
import kafka.security.authorizer.AclAuthorizer import kafka.security.authorizer.{AclAuthorizer, AclEntry}
import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString} import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.acl._ import org.apache.kafka.common.acl._
import org.apache.kafka.common.metadata.AccessControlEntryRecord import org.apache.kafka.common.metadata.AccessControlEntryRecord
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType} import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.SecurityUtils import org.apache.kafka.common.utils.SecurityUtils
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.migration.KRaftMigrationZkWriter
import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.common.ApiMessageAndVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.util.UUID
import scala.collection.mutable import scala.collection.mutable
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -73,9 +75,9 @@ class ZkAclMigrationClientTest extends ZkMigrationTestHarness {
@Test @Test
def testAclsMigrateAndDualWrite(): Unit = { def testAclsMigrateAndDualWrite(): Unit = {
val resource1 = new ResourcePattern(ResourceType.TOPIC, "foo-" + UUID.randomUUID(), PatternType.LITERAL) val resource1 = new ResourcePattern(ResourceType.TOPIC, "foo-" + Uuid.randomUuid(), PatternType.LITERAL)
val resource2 = new ResourcePattern(ResourceType.TOPIC, "bar-" + UUID.randomUUID(), PatternType.LITERAL) val resource2 = new ResourcePattern(ResourceType.TOPIC, "bar-" + Uuid.randomUuid(), PatternType.LITERAL)
val prefixedResource = new ResourcePattern(ResourceType.TOPIC, "bar-", PatternType.PREFIXED) val prefixedResource = new ResourcePattern(ResourceType.TOPIC, "bar-" + Uuid.randomUuid(), PatternType.PREFIXED)
val username = "alice" val username = "alice"
val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val wildcardPrincipal = SecurityUtils.parseKafkaPrincipal(WildcardPrincipalString) val wildcardPrincipal = SecurityUtils.parseKafkaPrincipal(WildcardPrincipalString)
@ -115,4 +117,78 @@ class ZkAclMigrationClientTest extends ZkMigrationTestHarness {
authorizer.close() authorizer.close()
} }
} }
@Test
def testAclsChangesInSnapshot(): Unit = {
// Create some ACLs in Zookeeper.
val resource1 = new ResourcePattern(ResourceType.TOPIC, "foo-" + Uuid.randomUuid(), PatternType.LITERAL)
val resource2 = new ResourcePattern(ResourceType.TOPIC, "bar-" + Uuid.randomUuid(), PatternType.LITERAL)
val resource3 = new ResourcePattern(ResourceType.TOPIC, "baz-" + Uuid.randomUuid(), PatternType.LITERAL)
val username1 = "alice"
val username2 = "blah"
val principal1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username1)
val principal2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username2)
val acl1Resource1 = new AclEntry(new AccessControlEntry(principal1.toString, WildcardHost, AclOperation.WRITE, AclPermissionType.ALLOW))
val acl1Resource2 = new AclEntry(new AccessControlEntry(principal2.toString, WildcardHost, AclOperation.READ, AclPermissionType.ALLOW))
zkClient.createAclPaths()
zkClient.createAclsForResourceIfNotExists(resource1, Set(acl1Resource1))
zkClient.createAclsForResourceIfNotExists(resource2, Set(acl1Resource2))
// Create a metadata image such that ACLs for one resource are update, one resource is deleted
// one new resource is created in Zookeeper.
// Create an ACL for a new resource.
val delta = new MetadataDelta(MetadataImage.EMPTY)
val acl1Resource3 = new AccessControlEntryRecord()
.setId(Uuid.randomUuid())
.setHost("192.168.10.1")
.setOperation(AclOperation.READ.code())
.setPrincipal(WildcardPrincipalString)
.setPermissionType(AclPermissionType.ALLOW.code())
.setPatternType(resource3.patternType().code())
.setResourceName(resource3.name())
.setResourceType(resource3.resourceType().code()
)
delta.replay(acl1Resource3)
// Change an ACL for existing resource.
val acl2Resource1 = new AccessControlEntryRecord()
.setId(Uuid.randomUuid())
.setHost("192.168.15.1")
.setOperation(AclOperation.WRITE.code())
.setPrincipal(principal1.toString)
.setPermissionType(AclPermissionType.ALLOW.code())
.setPatternType(resource1.patternType().code())
.setResourceName(resource1.name())
.setResourceType(resource1.resourceType().code()
)
delta.replay(acl2Resource1)
// Do not add anything for resource 2 in the delta.
val image = delta.apply(MetadataProvenance.EMPTY)
// load snapshot to Zookeeper.
val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient,
(_, operation) => { migrationState = operation.apply(migrationState) })
kraftMigrationZkWriter.handleLoadSnapshot(image)
// Verify the new ACLs in Zookeeper.
val resource1AclsInZk = zkClient.getVersionedAclsForResource(resource1).acls
assertEquals(1, resource1AclsInZk.size)
assertEquals(
new AccessControlEntry(acl2Resource1.principal(), acl2Resource1.host(),
AclOperation.fromCode(acl2Resource1.operation()),
AclPermissionType.fromCode(acl2Resource1.permissionType())),
resource1AclsInZk.head.ace)
val resource2AclsInZk = zkClient.getVersionedAclsForResource(resource2).acls
assertTrue(resource2AclsInZk.isEmpty)
val resource3AclsInZk = zkClient.getVersionedAclsForResource(resource3).acls
assertEquals(
new AccessControlEntry(acl1Resource3.principal(), acl1Resource3.host(),
AclOperation.fromCode(acl1Resource3.operation()),
AclPermissionType.fromCode(acl1Resource3.permissionType())),
resource3AclsInZk.head.ace)
}
} }

View File

@ -350,9 +350,11 @@ public class KRaftMigrationZkWriter {
); );
}); });
Set<ResourcePattern> newResources = new HashSet<>(allAclsInSnapshot.keySet());
Set<ResourcePattern> resourcesToDelete = new HashSet<>(); Set<ResourcePattern> resourcesToDelete = new HashSet<>();
Map<ResourcePattern, Set<AccessControlEntry>> changedResources = new HashMap<>(); Map<ResourcePattern, Set<AccessControlEntry>> changedResources = new HashMap<>();
migrationClient.aclClient().iterateAcls((resourcePattern, accessControlEntries) -> { migrationClient.aclClient().iterateAcls((resourcePattern, accessControlEntries) -> {
newResources.remove(resourcePattern);
if (!allAclsInSnapshot.containsKey(resourcePattern)) { if (!allAclsInSnapshot.containsKey(resourcePattern)) {
resourcesToDelete.add(resourcePattern); resourcesToDelete.add(resourcePattern);
} else { } else {
@ -363,6 +365,13 @@ public class KRaftMigrationZkWriter {
} }
}); });
newResources.forEach(resourcePattern -> {
Set<AccessControlEntry> accessControlEntries = allAclsInSnapshot.get(resourcePattern);
String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern;
operationConsumer.accept(name, migrationState ->
migrationClient.aclClient().writeResourceAcls(resourcePattern, accessControlEntries, migrationState));
});
resourcesToDelete.forEach(deletedResource -> { resourcesToDelete.forEach(deletedResource -> {
String name = "Deleting resource " + deletedResource + " which has no ACLs in snapshot"; String name = "Deleting resource " + deletedResource + " which has no ACLs in snapshot";
operationConsumer.accept(name, migrationState -> operationConsumer.accept(name, migrationState ->