mirror of https://github.com/apache/kafka.git
MINOR: Fix some MetadataDelta handling issues during ZK migration (#15327)
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
parent
08b68583fa
commit
c000b1fae2
|
@ -21,14 +21,14 @@ import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString
|
||||||
import kafka.utils.TestUtils
|
import kafka.utils.TestUtils
|
||||||
import org.apache.kafka.common.Uuid
|
import org.apache.kafka.common.Uuid
|
||||||
import org.apache.kafka.common.acl._
|
import org.apache.kafka.common.acl._
|
||||||
import org.apache.kafka.common.metadata.AccessControlEntryRecord
|
import org.apache.kafka.common.metadata.{AccessControlEntryRecord, RemoveAccessControlEntryRecord}
|
||||||
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
|
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
|
||||||
import org.apache.kafka.common.security.auth.KafkaPrincipal
|
import org.apache.kafka.common.security.auth.KafkaPrincipal
|
||||||
import org.apache.kafka.common.utils.SecurityUtils
|
import org.apache.kafka.common.utils.SecurityUtils
|
||||||
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
|
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
|
||||||
import org.apache.kafka.metadata.migration.KRaftMigrationZkWriter
|
import org.apache.kafka.metadata.migration.KRaftMigrationZkWriter
|
||||||
import org.apache.kafka.server.common.ApiMessageAndVersion
|
import org.apache.kafka.server.common.ApiMessageAndVersion
|
||||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
|
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
|
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
|
@ -169,7 +169,7 @@ class ZkAclMigrationClientTest extends ZkMigrationTestHarness {
|
||||||
val image = delta.apply(MetadataProvenance.EMPTY)
|
val image = delta.apply(MetadataProvenance.EMPTY)
|
||||||
|
|
||||||
// load snapshot to Zookeeper.
|
// load snapshot to Zookeeper.
|
||||||
val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient)
|
val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient, fail(_))
|
||||||
kraftMigrationZkWriter.handleSnapshot(image, (_, _, operation) => { migrationState = operation.apply(migrationState) })
|
kraftMigrationZkWriter.handleSnapshot(image, (_, _, operation) => { migrationState = operation.apply(migrationState) })
|
||||||
|
|
||||||
// Verify the new ACLs in Zookeeper.
|
// Verify the new ACLs in Zookeeper.
|
||||||
|
@ -189,4 +189,160 @@ class ZkAclMigrationClientTest extends ZkMigrationTestHarness {
|
||||||
AclPermissionType.fromCode(acl1Resource3.permissionType())),
|
AclPermissionType.fromCode(acl1Resource3.permissionType())),
|
||||||
resource3AclsInZk.head.ace)
|
resource3AclsInZk.head.ace)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def user(user: String): String = {
|
||||||
|
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user).toString
|
||||||
|
}
|
||||||
|
|
||||||
|
def acl(resourceName: String,
|
||||||
|
resourceType: ResourceType,
|
||||||
|
resourcePattern: PatternType,
|
||||||
|
principal: String,
|
||||||
|
host: String = "*",
|
||||||
|
operation: AclOperation = AclOperation.READ,
|
||||||
|
permissionType: AclPermissionType = AclPermissionType.ALLOW
|
||||||
|
): AccessControlEntryRecord = {
|
||||||
|
new AccessControlEntryRecord()
|
||||||
|
.setId(Uuid.randomUuid())
|
||||||
|
.setHost(host)
|
||||||
|
.setOperation(operation.code())
|
||||||
|
.setPrincipal(principal)
|
||||||
|
.setPermissionType(permissionType.code())
|
||||||
|
.setPatternType(resourcePattern.code())
|
||||||
|
.setResourceName(resourceName)
|
||||||
|
.setResourceType(resourceType.code())
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testDeleteOneAclOfMany(): Unit = {
|
||||||
|
zkClient.createAclPaths()
|
||||||
|
val topicName = "topic-" + Uuid.randomUuid()
|
||||||
|
val resource = new ResourcePattern(ResourceType.TOPIC, topicName, PatternType.LITERAL)
|
||||||
|
|
||||||
|
// Create a delta with some ACLs
|
||||||
|
val delta = new MetadataDelta(MetadataImage.EMPTY)
|
||||||
|
val acl1 = acl(topicName, ResourceType.TOPIC, PatternType.LITERAL, user("alice"))
|
||||||
|
val acl2 = acl(topicName, ResourceType.TOPIC, PatternType.LITERAL, user("bob"))
|
||||||
|
val acl3 = acl(topicName, ResourceType.TOPIC, PatternType.LITERAL, user("carol"))
|
||||||
|
delta.replay(acl1)
|
||||||
|
delta.replay(acl2)
|
||||||
|
delta.replay(acl3)
|
||||||
|
val image = delta.apply(MetadataProvenance.EMPTY)
|
||||||
|
|
||||||
|
// Sync image to ZK
|
||||||
|
val errorLogs = mutable.Buffer[String]()
|
||||||
|
val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient, errorLogs.append)
|
||||||
|
kraftMigrationZkWriter.handleSnapshot(image, (_, _, operation) => {
|
||||||
|
migrationState = operation.apply(migrationState)
|
||||||
|
})
|
||||||
|
|
||||||
|
// verify 3 ACLs in ZK
|
||||||
|
val aclsInZk = zkClient.getVersionedAclsForResource(resource).acls
|
||||||
|
assertEquals(3, aclsInZk.size)
|
||||||
|
|
||||||
|
// Delete one of the ACLs
|
||||||
|
val delta2 = new MetadataDelta.Builder()
|
||||||
|
.setImage(image)
|
||||||
|
.build()
|
||||||
|
delta2.replay(new RemoveAccessControlEntryRecord().setId(acl3.id()))
|
||||||
|
val image2 = delta2.apply(MetadataProvenance.EMPTY)
|
||||||
|
kraftMigrationZkWriter.handleDelta(image, image2, delta2, (_, _, operation) => {
|
||||||
|
migrationState = operation.apply(migrationState)
|
||||||
|
})
|
||||||
|
|
||||||
|
// verify the other 2 ACLs are still in ZK
|
||||||
|
val aclsInZk2 = zkClient.getVersionedAclsForResource(resource).acls
|
||||||
|
assertEquals(2, aclsInZk2.size)
|
||||||
|
assertEquals(0, errorLogs.size)
|
||||||
|
|
||||||
|
// Add another ACL
|
||||||
|
val acl4 = acl(topicName, ResourceType.TOPIC, PatternType.LITERAL, user("carol"))
|
||||||
|
delta2.replay(acl4)
|
||||||
|
val image3 = delta2.apply(MetadataProvenance.EMPTY)
|
||||||
|
|
||||||
|
// This is a contrived error case. In practice, we will never pass the same image as prev and current.
|
||||||
|
// The point of this is to exercise the case of a deleted ACL missing from the prev image.
|
||||||
|
kraftMigrationZkWriter.handleDelta(image3, image3, delta2, (_, _, operation) => {
|
||||||
|
migrationState = operation.apply(migrationState)
|
||||||
|
})
|
||||||
|
|
||||||
|
val aclsInZk3 = zkClient.getVersionedAclsForResource(resource).acls
|
||||||
|
assertEquals(3, aclsInZk3.size)
|
||||||
|
assertEquals(1, errorLogs.size)
|
||||||
|
assertEquals(s"Cannot delete ACL ${acl3.id()} from ZK since it is missing from previous AclImage", errorLogs.head)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testAclUpdateAndDelete(): Unit = {
|
||||||
|
zkClient.createAclPaths()
|
||||||
|
val errorLogs = mutable.Buffer[String]()
|
||||||
|
val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient, errorLogs.append)
|
||||||
|
|
||||||
|
val topicName = "topic-" + Uuid.randomUuid()
|
||||||
|
val otherName = "other-" + Uuid.randomUuid()
|
||||||
|
val literalResource = new ResourcePattern(ResourceType.TOPIC, topicName, PatternType.LITERAL)
|
||||||
|
val prefixedResource = new ResourcePattern(ResourceType.TOPIC, topicName, PatternType.PREFIXED)
|
||||||
|
val otherResource = new ResourcePattern(ResourceType.TOPIC, otherName, PatternType.LITERAL)
|
||||||
|
|
||||||
|
// Create a delta with some ACLs
|
||||||
|
val acl1 = acl(topicName, ResourceType.TOPIC, PatternType.LITERAL, user("alice"))
|
||||||
|
val acl2 = acl(topicName, ResourceType.TOPIC, PatternType.LITERAL, user("bob"))
|
||||||
|
val acl3 = acl(topicName, ResourceType.TOPIC, PatternType.LITERAL, user("carol"))
|
||||||
|
val acl4 = acl(topicName, ResourceType.TOPIC, PatternType.LITERAL, user("dave"))
|
||||||
|
|
||||||
|
val delta1 = new MetadataDelta(MetadataImage.EMPTY)
|
||||||
|
delta1.replay(acl1)
|
||||||
|
delta1.replay(acl2)
|
||||||
|
delta1.replay(acl3)
|
||||||
|
delta1.replay(acl4)
|
||||||
|
|
||||||
|
val image1 = delta1.apply(MetadataProvenance.EMPTY)
|
||||||
|
kraftMigrationZkWriter.handleDelta(MetadataImage.EMPTY, image1, delta1, (_, _, operation) => {
|
||||||
|
migrationState = operation.apply(migrationState)
|
||||||
|
})
|
||||||
|
assertEquals(4, zkClient.getVersionedAclsForResource(literalResource).acls.size)
|
||||||
|
assertEquals(0, zkClient.getVersionedAclsForResource(prefixedResource).acls.size)
|
||||||
|
assertEquals(0, zkClient.getVersionedAclsForResource(otherResource).acls.size)
|
||||||
|
assertEquals(0, errorLogs.size)
|
||||||
|
|
||||||
|
val acl5 = acl(topicName, ResourceType.TOPIC, PatternType.PREFIXED, user("alice"))
|
||||||
|
val acl6 = acl(topicName, ResourceType.TOPIC, PatternType.PREFIXED, user("bob"))
|
||||||
|
val acl7 = acl(otherName, ResourceType.TOPIC, PatternType.LITERAL, user("carol"))
|
||||||
|
val acl8 = acl(otherName, ResourceType.TOPIC, PatternType.LITERAL, user("dave"))
|
||||||
|
|
||||||
|
// Add two prefixed and two "other" ACLs, delete one of the literal ACLs
|
||||||
|
val delta2 = new MetadataDelta.Builder().setImage(image1).build()
|
||||||
|
delta2.replay(acl5)
|
||||||
|
delta2.replay(acl6)
|
||||||
|
delta2.replay(acl7)
|
||||||
|
delta2.replay(acl8)
|
||||||
|
delta2.replay(new RemoveAccessControlEntryRecord().setId(acl1.id()))
|
||||||
|
|
||||||
|
val image2 = delta2.apply(MetadataProvenance.EMPTY)
|
||||||
|
kraftMigrationZkWriter.handleDelta(image1, image2, delta2, (_, _, operation) => {
|
||||||
|
migrationState = operation.apply(migrationState)
|
||||||
|
})
|
||||||
|
assertEquals(3, zkClient.getVersionedAclsForResource(literalResource).acls.size)
|
||||||
|
assertEquals(2, zkClient.getVersionedAclsForResource(prefixedResource).acls.size)
|
||||||
|
assertEquals(2, zkClient.getVersionedAclsForResource(otherResource).acls.size)
|
||||||
|
assertEquals(0, errorLogs.size)
|
||||||
|
|
||||||
|
// Delete and add ACL for literal resource, remove both prefixed ACLs, add another "other"
|
||||||
|
val acl9 = acl(otherName, ResourceType.TOPIC, PatternType.LITERAL, user("eve"))
|
||||||
|
val delta3 = new MetadataDelta.Builder().setImage(image2).build()
|
||||||
|
delta3.replay(acl1)
|
||||||
|
delta3.replay(new RemoveAccessControlEntryRecord().setId(acl2.id()))
|
||||||
|
delta3.replay(new RemoveAccessControlEntryRecord().setId(acl5.id()))
|
||||||
|
delta3.replay(new RemoveAccessControlEntryRecord().setId(acl6.id()))
|
||||||
|
delta3.replay(acl9)
|
||||||
|
|
||||||
|
val image3 = delta3.apply(MetadataProvenance.EMPTY)
|
||||||
|
kraftMigrationZkWriter.handleDelta(image2, image3, delta3, (_, _, operation) => {
|
||||||
|
migrationState = operation.apply(migrationState)
|
||||||
|
})
|
||||||
|
assertEquals(3, zkClient.getVersionedAclsForResource(literalResource).acls.size)
|
||||||
|
assertEquals(0, zkClient.getVersionedAclsForResource(prefixedResource).acls.size)
|
||||||
|
assertEquals(3, zkClient.getVersionedAclsForResource(otherResource).acls.size)
|
||||||
|
assertEquals(0, errorLogs.size)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
|
||||||
import org.apache.kafka.server.common.ApiMessageAndVersion
|
import org.apache.kafka.server.common.ApiMessageAndVersion
|
||||||
import org.apache.kafka.server.config.ConfigType
|
import org.apache.kafka.server.config.ConfigType
|
||||||
import org.apache.kafka.server.util.MockRandom
|
import org.apache.kafka.server.util.MockRandom
|
||||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
|
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
|
|
||||||
import java.util
|
import java.util
|
||||||
|
@ -326,7 +326,7 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
|
||||||
val image = delta.apply(MetadataProvenance.EMPTY)
|
val image = delta.apply(MetadataProvenance.EMPTY)
|
||||||
|
|
||||||
// load snapshot to Zookeeper.
|
// load snapshot to Zookeeper.
|
||||||
val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient)
|
val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient, fail(_))
|
||||||
kraftMigrationZkWriter.handleSnapshot(image, (_, _, operation) => {
|
kraftMigrationZkWriter.handleSnapshot(image, (_, _, operation) => {
|
||||||
migrationState = operation.apply(migrationState)
|
migrationState = operation.apply(migrationState)
|
||||||
})
|
})
|
||||||
|
|
|
@ -317,7 +317,7 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testTopicAndBrokerConfigsMigrationWithSnapshots(): Unit = {
|
def testTopicAndBrokerConfigsMigrationWithSnapshots(): Unit = {
|
||||||
val kraftWriter = new KRaftMigrationZkWriter(migrationClient)
|
val kraftWriter = new KRaftMigrationZkWriter(migrationClient, fail(_))
|
||||||
|
|
||||||
// Add add some topics and broker configs and create new image.
|
// Add add some topics and broker configs and create new image.
|
||||||
val topicName = "testTopic"
|
val topicName = "testTopic"
|
||||||
|
|
|
@ -25,12 +25,10 @@ import org.apache.kafka.metadata.authorizer.StandardAclWithId;
|
||||||
import org.apache.kafka.server.common.MetadataVersion;
|
import org.apache.kafka.server.common.MetadataVersion;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
|
||||||
|
@ -40,7 +38,6 @@ import java.util.stream.Collectors;
|
||||||
public final class AclsDelta {
|
public final class AclsDelta {
|
||||||
private final AclsImage image;
|
private final AclsImage image;
|
||||||
private final Map<Uuid, Optional<StandardAcl>> changes = new LinkedHashMap<>();
|
private final Map<Uuid, Optional<StandardAcl>> changes = new LinkedHashMap<>();
|
||||||
private final Set<StandardAcl> deleted = new HashSet<>();
|
|
||||||
|
|
||||||
public AclsDelta(AclsImage image) {
|
public AclsDelta(AclsImage image) {
|
||||||
this.image = image;
|
this.image = image;
|
||||||
|
@ -56,15 +53,6 @@ public final class AclsDelta {
|
||||||
return changes;
|
return changes;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Return a Set of the ACLs which were deleted in this delta. This is used by the ZK migration components.
|
|
||||||
*
|
|
||||||
* @return Set of deleted ACLs
|
|
||||||
*/
|
|
||||||
public Set<StandardAcl> deleted() {
|
|
||||||
return deleted;
|
|
||||||
}
|
|
||||||
|
|
||||||
void finishSnapshot() {
|
void finishSnapshot() {
|
||||||
for (Entry<Uuid, StandardAcl> entry : image.acls().entrySet()) {
|
for (Entry<Uuid, StandardAcl> entry : image.acls().entrySet()) {
|
||||||
if (!changes.containsKey(entry.getKey())) {
|
if (!changes.containsKey(entry.getKey())) {
|
||||||
|
@ -93,7 +81,6 @@ public final class AclsDelta {
|
||||||
public void replay(RemoveAccessControlEntryRecord record) {
|
public void replay(RemoveAccessControlEntryRecord record) {
|
||||||
if (image.acls().containsKey(record.id())) {
|
if (image.acls().containsKey(record.id())) {
|
||||||
changes.put(record.id(), Optional.empty());
|
changes.put(record.id(), Optional.empty());
|
||||||
deleted.add(image.acls().get(record.id()));
|
|
||||||
} else if (changes.containsKey(record.id())) {
|
} else if (changes.containsKey(record.id())) {
|
||||||
changes.remove(record.id());
|
changes.remove(record.id());
|
||||||
// No need to track a ACL that was added and deleted within the same delta
|
// No need to track a ACL that was added and deleted within the same delta
|
||||||
|
|
|
@ -134,7 +134,8 @@ public class KRaftMigrationDriver implements MetadataPublisher {
|
||||||
this.time = time;
|
this.time = time;
|
||||||
LogContext logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] ");
|
LogContext logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] ");
|
||||||
this.controllerMetrics = controllerMetrics;
|
this.controllerMetrics = controllerMetrics;
|
||||||
this.log = logContext.logger(KRaftMigrationDriver.class);
|
Logger log = logContext.logger(KRaftMigrationDriver.class);
|
||||||
|
this.log = log;
|
||||||
this.migrationState = MigrationDriverState.UNINITIALIZED;
|
this.migrationState = MigrationDriverState.UNINITIALIZED;
|
||||||
this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
|
this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
|
||||||
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, "controller-" + nodeId + "-migration-driver-");
|
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, "controller-" + nodeId + "-migration-driver-");
|
||||||
|
@ -144,7 +145,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
|
||||||
this.initialZkLoadHandler = initialZkLoadHandler;
|
this.initialZkLoadHandler = initialZkLoadHandler;
|
||||||
this.faultHandler = faultHandler;
|
this.faultHandler = faultHandler;
|
||||||
this.quorumFeatures = quorumFeatures;
|
this.quorumFeatures = quorumFeatures;
|
||||||
this.zkMetadataWriter = new KRaftMigrationZkWriter(zkMigrationClient);
|
this.zkMetadataWriter = new KRaftMigrationZkWriter(zkMigrationClient, log::error);
|
||||||
this.recordRedactor = new RecordRedactor(configSchema);
|
this.recordRedactor = new RecordRedactor(configSchema);
|
||||||
this.minBatchSize = minBatchSize;
|
this.minBatchSize = minBatchSize;
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,8 +60,8 @@ import java.util.Map.Entry;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
public class KRaftMigrationZkWriter {
|
public class KRaftMigrationZkWriter {
|
||||||
|
|
||||||
|
@ -82,11 +82,14 @@ public class KRaftMigrationZkWriter {
|
||||||
|
|
||||||
|
|
||||||
private final MigrationClient migrationClient;
|
private final MigrationClient migrationClient;
|
||||||
|
private final Consumer<String> errorLogger;
|
||||||
|
|
||||||
public KRaftMigrationZkWriter(
|
public KRaftMigrationZkWriter(
|
||||||
MigrationClient migrationClient
|
MigrationClient migrationClient,
|
||||||
|
Consumer<String> errorLogger
|
||||||
) {
|
) {
|
||||||
this.migrationClient = migrationClient;
|
this.migrationClient = migrationClient;
|
||||||
|
this.errorLogger = errorLogger;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void handleSnapshot(MetadataImage image, KRaftMigrationOperationConsumer operationConsumer) {
|
public void handleSnapshot(MetadataImage image, KRaftMigrationOperationConsumer operationConsumer) {
|
||||||
|
@ -122,7 +125,7 @@ public class KRaftMigrationZkWriter {
|
||||||
updated = true;
|
updated = true;
|
||||||
}
|
}
|
||||||
if (delta.aclsDelta() != null) {
|
if (delta.aclsDelta() != null) {
|
||||||
handleAclsDelta(image.acls(), delta.aclsDelta(), operationConsumer);
|
handleAclsDelta(previousImage.acls(), image.acls(), delta.aclsDelta(), operationConsumer);
|
||||||
updated = true;
|
updated = true;
|
||||||
}
|
}
|
||||||
if (delta.delegationTokenDelta() != null) {
|
if (delta.delegationTokenDelta() != null) {
|
||||||
|
@ -593,6 +596,7 @@ public class KRaftMigrationZkWriter {
|
||||||
});
|
});
|
||||||
|
|
||||||
newResources.forEach(resourcePattern -> {
|
newResources.forEach(resourcePattern -> {
|
||||||
|
// newResources is generated from allAclsInSnapshot, and we don't remove from that map, so this unguarded .get() is safe
|
||||||
Set<AccessControlEntry> accessControlEntries = allAclsInSnapshot.get(resourcePattern);
|
Set<AccessControlEntry> accessControlEntries = allAclsInSnapshot.get(resourcePattern);
|
||||||
String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern;
|
String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern;
|
||||||
operationConsumer.accept(UPDATE_ACL, name, migrationState ->
|
operationConsumer.accept(UPDATE_ACL, name, migrationState ->
|
||||||
|
@ -612,43 +616,45 @@ public class KRaftMigrationZkWriter {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void handleAclsDelta(AclsImage image, AclsDelta delta, KRaftMigrationOperationConsumer operationConsumer) {
|
void handleAclsDelta(AclsImage prevImage, AclsImage image, AclsDelta delta, KRaftMigrationOperationConsumer operationConsumer) {
|
||||||
// Compute the resource patterns that were changed
|
|
||||||
Set<ResourcePattern> resourcesWithChangedAcls = delta.changes().values()
|
|
||||||
.stream()
|
|
||||||
.filter(Optional::isPresent)
|
|
||||||
.map(Optional::get)
|
|
||||||
.map(this::resourcePatternFromAcl)
|
|
||||||
.collect(Collectors.toSet());
|
|
||||||
|
|
||||||
Set<ResourcePattern> resourcesWithDeletedAcls = delta.deleted()
|
|
||||||
.stream()
|
|
||||||
.map(this::resourcePatternFromAcl)
|
|
||||||
.collect(Collectors.toSet());
|
|
||||||
|
|
||||||
// Need to collect all ACLs for any changed resource pattern
|
// Need to collect all ACLs for any changed resource pattern
|
||||||
Map<ResourcePattern, List<AccessControlEntry>> aclsToWrite = new HashMap<>();
|
Map<ResourcePattern, List<AccessControlEntry>> aclsToWrite = new HashMap<>();
|
||||||
image.acls().forEach((uuid, standardAcl) -> {
|
delta.changes().forEach((aclId, aclChange) -> {
|
||||||
ResourcePattern resourcePattern = resourcePatternFromAcl(standardAcl);
|
if (aclChange.isPresent()) {
|
||||||
boolean removed = resourcesWithDeletedAcls.remove(resourcePattern);
|
ResourcePattern resourcePattern = resourcePatternFromAcl(aclChange.get());
|
||||||
// If a resource pattern is present in the delta as a changed or deleted acl, need to include it
|
aclsToWrite.put(resourcePattern, new ArrayList<>());
|
||||||
if (resourcesWithChangedAcls.contains(resourcePattern) || removed) {
|
} else {
|
||||||
aclsToWrite.computeIfAbsent(resourcePattern, __ -> new ArrayList<>()).add(
|
// We need to look in the previous image to get deleted ACLs resource pattern
|
||||||
new AccessControlEntry(standardAcl.principal(), standardAcl.host(), standardAcl.operation(), standardAcl.permissionType())
|
StandardAcl deletedAcl = prevImage.acls().get(aclId);
|
||||||
);
|
if (deletedAcl == null) {
|
||||||
|
errorLogger.accept("Cannot delete ACL " + aclId + " from ZK since it is missing from previous AclImage");
|
||||||
|
} else {
|
||||||
|
ResourcePattern resourcePattern = resourcePatternFromAcl(deletedAcl);
|
||||||
|
aclsToWrite.put(resourcePattern, new ArrayList<>());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
resourcesWithDeletedAcls.forEach(deletedResource -> {
|
// Iterate through the new image to collect any ACLs for these changed resources
|
||||||
String name = "Deleting resource " + deletedResource + " which has no more ACLs";
|
image.acls().forEach((uuid, standardAcl) -> {
|
||||||
operationConsumer.accept(DELETE_ACL, name, migrationState ->
|
ResourcePattern resourcePattern = resourcePatternFromAcl(standardAcl);
|
||||||
migrationClient.aclClient().deleteResource(deletedResource, migrationState));
|
List<AccessControlEntry> entries = aclsToWrite.get(resourcePattern);
|
||||||
|
if (entries != null) {
|
||||||
|
entries.add(new AccessControlEntry(standardAcl.principal(), standardAcl.host(), standardAcl.operation(), standardAcl.permissionType()));
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// If there are no more ACLs for a resource, delete it. Otherwise, update it with the new set of ACLs
|
||||||
aclsToWrite.forEach((resourcePattern, accessControlEntries) -> {
|
aclsToWrite.forEach((resourcePattern, accessControlEntries) -> {
|
||||||
|
if (accessControlEntries.isEmpty()) {
|
||||||
|
String name = "Deleting resource " + resourcePattern + " which has no more ACLs";
|
||||||
|
operationConsumer.accept(DELETE_ACL, name, migrationState ->
|
||||||
|
migrationClient.aclClient().deleteResource(resourcePattern, migrationState));
|
||||||
|
} else {
|
||||||
String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern;
|
String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern;
|
||||||
operationConsumer.accept(UPDATE_ACL, name, migrationState ->
|
operationConsumer.accept(UPDATE_ACL, name, migrationState ->
|
||||||
migrationClient.aclClient().writeResourceAcls(resourcePattern, accessControlEntries, migrationState));
|
migrationClient.aclClient().writeResourceAcls(resourcePattern, accessControlEntries, migrationState));
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -80,7 +80,7 @@ public class KRaftMigrationZkWriterTest {
|
||||||
.setConfigMigrationClient(configClient)
|
.setConfigMigrationClient(configClient)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
|
KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient, __ -> { });
|
||||||
|
|
||||||
MetadataImage image = new MetadataImage(
|
MetadataImage image = new MetadataImage(
|
||||||
MetadataProvenance.EMPTY,
|
MetadataProvenance.EMPTY,
|
||||||
|
@ -120,7 +120,7 @@ public class KRaftMigrationZkWriterTest {
|
||||||
.setAclMigrationClient(aclClient)
|
.setAclMigrationClient(aclClient)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
|
KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient, __ -> { });
|
||||||
|
|
||||||
MetadataImage image = new MetadataImage(
|
MetadataImage image = new MetadataImage(
|
||||||
MetadataProvenance.EMPTY,
|
MetadataProvenance.EMPTY,
|
||||||
|
@ -179,7 +179,7 @@ public class KRaftMigrationZkWriterTest {
|
||||||
.setAclMigrationClient(aclClient)
|
.setAclMigrationClient(aclClient)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
|
KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient, __ -> { });
|
||||||
|
|
||||||
MetadataImage image = new MetadataImage(
|
MetadataImage image = new MetadataImage(
|
||||||
MetadataProvenance.EMPTY,
|
MetadataProvenance.EMPTY,
|
||||||
|
|
Loading…
Reference in New Issue