KAFKA-14796 Migrate ACLs from AclAuthorizor to KRaft (#13368)

This patch refactors the loadCache method in AclAuthorizer to make it reusable by ZkMigrationClient.
The loaded ACLs are converted to AccessControlEntryRecord. I noticed we still have the defunct
AccessControlRecord, so I've deleted it.

Also included here are the methods to write ACL changes back to ZK while in dual-write mode.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
David Arthur 2023-03-27 19:12:02 -04:00 committed by GitHub
parent 31440b00f3
commit f1b3732fa6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 400 additions and 70 deletions

View File

@ -268,12 +268,14 @@
<subpackage name="metadata">
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.acl" />
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.metadata" />

View File

@ -124,6 +124,33 @@ object AclAuthorizer {
if (aclBinding.pattern().name().contains("/"))
throw new IllegalArgumentException(s"ACL binding contains invalid resource name: ${aclBinding.pattern().name()}")
}
def loadAllAcls(
zkClient: KafkaZkClient,
logger: Logging,
aclConsumer: (ResourcePattern, VersionedAcls) => Unit
): Unit = {
ZkAclStore.stores.foreach { store =>
val resourceTypes = zkClient.getResourceTypes(store.patternType)
for (rType <- resourceTypes) {
val resourceType = Try(SecurityUtils.resourceType(rType))
resourceType match {
case Success(resourceTypeObj) =>
val resourceNames = zkClient.getResourceNames(store.patternType, resourceTypeObj)
for (resourceName <- resourceNames) {
val resource = new ResourcePattern(resourceTypeObj, resourceName, store.patternType)
val versionedAcls = getAclsFromZk(zkClient, resource)
aclConsumer.apply(resource, versionedAcls)
}
case Failure(_) => logger.warn(s"Ignoring unknown ResourceType: $rType")
}
}
}
}
def getAclsFromZk(zkClient: KafkaZkClient, resource: ResourcePattern): VersionedAcls = {
zkClient.getVersionedAclsForResource(resource)
}
}
class AclAuthorizer extends Authorizer with Logging {
@ -549,22 +576,7 @@ class AclAuthorizer extends Authorizer with Logging {
private def loadCache(): Unit = {
lock synchronized {
ZkAclStore.stores.foreach { store =>
val resourceTypes = zkClient.getResourceTypes(store.patternType)
for (rType <- resourceTypes) {
val resourceType = Try(SecurityUtils.resourceType(rType))
resourceType match {
case Success(resourceTypeObj) =>
val resourceNames = zkClient.getResourceNames(store.patternType, resourceTypeObj)
for (resourceName <- resourceNames) {
val resource = new ResourcePattern(resourceTypeObj, resourceName, store.patternType)
val versionedAcls = getAclsFromZk(resource)
updateCache(resource, versionedAcls)
}
case Failure(_) => warn(s"Ignoring unknown ResourceType: $rType")
}
}
}
loadAllAcls(zkClient, this, updateCache)
}
}
@ -634,7 +646,7 @@ class AclAuthorizer extends Authorizer with Logging {
if (aclCache.contains(resource))
getAclsFromCache(resource)
else
getAclsFromZk(resource)
getAclsFromZk(zkClient, resource)
var newVersionedAcls: VersionedAcls = null
var writeComplete = false
var retries = 0
@ -654,7 +666,7 @@ class AclAuthorizer extends Authorizer with Logging {
if (!updateSucceeded) {
trace(s"Failed to update ACLs for $resource. Used version ${currentVersionedAcls.zkVersion}. Reading data and retrying update.")
Thread.sleep(backoffTime)
currentVersionedAcls = getAclsFromZk(resource)
currentVersionedAcls = getAclsFromZk(zkClient, resource)
retries += 1
} else {
newVersionedAcls = VersionedAcls(newAcls, updateVersion)
@ -682,9 +694,6 @@ class AclAuthorizer extends Authorizer with Logging {
aclCache.getOrElse(resource, throw new IllegalArgumentException(s"ACLs do not exist in the cache for resource $resource"))
}
private def getAclsFromZk(resource: ResourcePattern): VersionedAcls = {
zkClient.getVersionedAclsForResource(resource)
}
// Visible for benchmark
def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = {
@ -738,7 +747,7 @@ class AclAuthorizer extends Authorizer with Logging {
private[authorizer] def processAclChangeNotification(resource: ResourcePattern): Unit = {
lock synchronized {
val versionedAcls = getAclsFromZk(resource)
val versionedAcls = getAclsFromZk(zkClient, resource)
info(s"Processing Acl change notification for $resource, versionedAcls : ${versionedAcls.acls}, zkVersion : ${versionedAcls.zkVersion}")
updateCache(resource, versionedAcls)
}

View File

@ -18,15 +18,19 @@ package kafka.zk
import kafka.api.LeaderAndIsr
import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
import kafka.security.authorizer.{AclAuthorizer, AclEntry}
import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering, VersionedAcls}
import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, ZkAdminManager}
import kafka.utils.{Logging, PasswordEncoder}
import kafka.zk.TopicZNode.TopicIdReplicaAssignment
import kafka.zookeeper._
import org.apache.kafka.common.acl.AccessControlEntry
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
import org.apache.kafka.common.metadata._
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
import org.apache.kafka.metadata.migration.{MigrationClient, MigrationClientAuthException, MigrationClientException, ZkMigrationLeadershipState}
@ -36,10 +40,14 @@ import org.apache.zookeeper.{CreateMode, KeeperException}
import java.util
import java.util.Properties
import java.util.function.Consumer
import java.util.function.{BiConsumer, Consumer}
import scala.collection.Seq
import scala.jdk.CollectionConverters._
object ZkMigrationClient {
val MaxBatchSize = 100
}
/**
* Migration client in KRaft controller responsible for handling communication to Zookeeper and
* the ZkBrokers present in the cluster. Methods that directly use KafkaZkClient should use the wrapZkException
@ -72,6 +80,7 @@ class ZkMigrationClient(
initialState: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = wrapZkException {
zkClient.createTopLevelPaths()
zkClient.createAclPaths()
zkClient.getOrCreateMigrationState(initialState)
}
@ -252,6 +261,44 @@ class ZkMigrationClient(
}
}
override def iterateAcls(aclConsumer: BiConsumer[ResourcePattern, util.Set[AccessControlEntry]]): Unit = {
// This is probably fairly inefficient, but it preserves the semantics from AclAuthorizer (which is non-trivial)
var allAcls = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering)
def updateAcls(resourcePattern: ResourcePattern, versionedAcls: VersionedAcls): Unit = {
allAcls = allAcls.updated(resourcePattern, versionedAcls)
}
AclAuthorizer.loadAllAcls(zkClient, this, updateAcls)
allAcls.foreach { case (resourcePattern, versionedAcls) =>
aclConsumer.accept(resourcePattern, versionedAcls.acls.map(_.ace).asJava)
}
}
def migrateAcls(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
iterateAcls(new util.function.BiConsumer[ResourcePattern, util.Set[AccessControlEntry]]() {
override def accept(resourcePattern: ResourcePattern, acls: util.Set[AccessControlEntry]): Unit = {
val batch = new util.ArrayList[ApiMessageAndVersion]()
acls.asScala.foreach { entry =>
batch.add(new ApiMessageAndVersion(new AccessControlEntryRecord()
.setId(Uuid.randomUuid())
.setResourceType(resourcePattern.resourceType().code())
.setResourceName(resourcePattern.name())
.setPatternType(resourcePattern.patternType().code())
.setPrincipal(entry.principal())
.setHost(entry.host())
.setOperation(entry.operation().code())
.setPermissionType(entry.permissionType().code()), AccessControlEntryRecord.HIGHEST_SUPPORTED_VERSION))
if (batch.size() == ZkMigrationClient.MaxBatchSize) {
recordConsumer.accept(batch)
batch.clear()
}
}
if (!batch.isEmpty) {
recordConsumer.accept(batch)
}
}
})
}
override def readAllMetadata(
batchConsumer: Consumer[util.List[ApiMessageAndVersion]],
brokerIdConsumer: Consumer[Integer]
@ -260,6 +307,7 @@ class ZkMigrationClient(
migrateBrokerConfigs(batchConsumer)
migrateClientQuotas(batchConsumer)
migrateProducerId(batchConsumer)
migrateAcls(batchConsumer)
}
override def readBrokerIds(): util.Set[Integer] = wrapZkException {
@ -517,4 +565,82 @@ class ZkMigrationClient(
state
}
}
private def aclChangeNotificationRequest(resourcePattern: ResourcePattern): CreateRequest = {
// ZK broker needs the ACL change notification znode to be updated in order to process the new ACLs
val aclChange = ZkAclStore(resourcePattern.patternType).changeStore.createChangeNode(resourcePattern)
CreateRequest(aclChange.path, aclChange.bytes, zkClient.defaultAcls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL)
}
private def tryWriteAcls(
resourcePattern: ResourcePattern,
aclEntries: Set[AclEntry],
create: Boolean,
state: ZkMigrationLeadershipState
): Option[ZkMigrationLeadershipState] = wrapZkException {
val aclData = ResourceZNode.encode(aclEntries)
val request = if (create) {
val path = ResourceZNode.path(resourcePattern)
CreateRequest(path, aclData, zkClient.defaultAcls(path), CreateMode.PERSISTENT)
} else {
SetDataRequest(ResourceZNode.path(resourcePattern), aclData, ZkVersion.MatchAnyVersion)
}
val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(request), state)
if (responses.head.resultCode.equals(Code.NONODE)) {
// Need to call this method again with create=true
None
} else {
// Write the ACL notification outside of a metadata multi-op
zkClient.retryRequestUntilConnected(aclChangeNotificationRequest(resourcePattern))
Some(state.withMigrationZkVersion(migrationZkVersion))
}
}
override def writeAddedAcls(
resourcePattern: ResourcePattern,
newAcls: util.List[AccessControlEntry],
state: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = {
val existingAcls = AclAuthorizer.getAclsFromZk(zkClient, resourcePattern)
val addedAcls = newAcls.asScala.map(new AclEntry(_)).toSet
val updatedAcls = existingAcls.acls ++ addedAcls
tryWriteAcls(resourcePattern, updatedAcls, create=false, state) match {
case Some(newState) => newState
case None => tryWriteAcls(resourcePattern, updatedAcls, create=true, state) match {
case Some(newState) => newState
case None => throw new MigrationClientException(s"Could not write ACLs for resource pattern $resourcePattern")
}
}
}
override def removeDeletedAcls(
resourcePattern: ResourcePattern,
deletedAcls: util.List[AccessControlEntry],
state: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = wrapZkException {
val existingAcls = AclAuthorizer.getAclsFromZk(zkClient, resourcePattern)
val removedAcls = deletedAcls.asScala.map(new AclEntry(_)).toSet
val remainingAcls = existingAcls.acls -- removedAcls
val request = if (remainingAcls.isEmpty) {
DeleteRequest(ResourceZNode.path(resourcePattern), ZkVersion.MatchAnyVersion)
} else {
val aclData = ResourceZNode.encode(remainingAcls)
SetDataRequest(ResourceZNode.path(resourcePattern), aclData, ZkVersion.MatchAnyVersion)
}
val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(request), state)
if (responses.head.resultCode.equals(Code.OK) || responses.head.resultCode.equals(Code.NONODE)) {
// Write the ACL notification outside of a metadata multi-op
zkClient.retryRequestUntilConnected(aclChangeNotificationRequest(resourcePattern))
state.withMigrationZkVersion(migrationZkVersion)
} else {
throw new MigrationClientException(s"Could not delete ACL for resource pattern $resourcePattern")
}
}
}

View File

@ -16,20 +16,30 @@
*/
package kafka.zk
import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
import kafka.server.{ConfigType, KafkaConfig}
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import kafka.utils.{PasswordEncoder, TestUtils}
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasResult, AlterConfigOp, AlterConfigsResult, ConfigEntry, NewTopic}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.acl.AclOperation.{DESCRIBE, READ, WRITE}
import org.apache.kafka.common.acl.AclPermissionType.ALLOW
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.resource.ResourceType.TOPIC
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.SecurityUtils
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.authorizer.StandardAcl
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
@ -39,8 +49,8 @@ import org.junit.jupiter.api.extension.ExtendWith
import org.slf4j.LoggerFactory
import java.util
import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.{Properties, UUID}
import scala.collection.Seq
import scala.jdk.CollectionConverters._
@ -67,6 +77,49 @@ class ZkMigrationIntegrationTest {
}
}
@ClusterTest(
brokers = 3, clusterType = Type.ZK, autoStart = AutoStart.YES,
metadataVersion = MetadataVersion.IBP_3_4_IV0,
serverProperties = Array(
new ClusterConfigProperty(key="authorizer.class.name", value="kafka.security.authorizer.AclAuthorizer"),
new ClusterConfigProperty(key="super.users", value="User:ANONYMOUS")
)
)
def testMigrateAcls(clusterInstance: ClusterInstance): Unit = {
val admin = clusterInstance.createAdminClient()
val resource1 = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
val resource2 = new ResourcePattern(TOPIC, "bar-" + UUID.randomUUID(), LITERAL)
val prefixedResource = new ResourcePattern(TOPIC, "bar-", PREFIXED)
val username = "alice"
val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val wildcardPrincipal = SecurityUtils.parseKafkaPrincipal(WildcardPrincipalString)
val acl1 = new AclBinding(resource1, new AccessControlEntry(principal.toString, WildcardHost, READ, ALLOW))
val acl2 = new AclBinding(resource1, new AccessControlEntry(principal.toString, "192.168.0.1", WRITE, ALLOW))
val acl3 = new AclBinding(resource2, new AccessControlEntry(principal.toString, WildcardHost, DESCRIBE, ALLOW))
val acl4 = new AclBinding(prefixedResource, new AccessControlEntry(wildcardPrincipal.toString, WildcardHost, READ, ALLOW))
val result = admin.createAcls(List(acl1, acl2, acl3, acl4).asJava)
result.all().get
val underlying = clusterInstance.asInstanceOf[ZkClusterInstance].getUnderlying()
val zkClient = underlying.zkClient
val migrationClient = new ZkMigrationClient(zkClient, PasswordEncoder.noop())
val verifier = new MetadataDeltaVerifier()
migrationClient.readAllMetadata(batch => verifier.accept(batch), _ => { })
verifier.verify { image =>
val aclMap = image.acls().acls()
assertEquals(4, aclMap.size())
assertTrue(aclMap.values().containsAll(Seq(
StandardAcl.fromAclBinding(acl1),
StandardAcl.fromAclBinding(acl2),
StandardAcl.fromAclBinding(acl3),
StandardAcl.fromAclBinding(acl4)
).asJava))
}
}
@ClusterTest(brokers = 3, clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_4_IV0)
def testMigrate(clusterInstance: ClusterInstance): Unit = {
val admin = clusterInstance.createAdminClient()

View File

@ -19,24 +19,30 @@ package kafka.zk
import kafka.api.LeaderAndIsr
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.coordinator.transaction.ProducerIdManager
import kafka.security.authorizer.AclAuthorizer
import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
import kafka.server.{ConfigType, KafkaConfig, QuorumTestHarness, ZkAdminManager}
import kafka.utils.PasswordEncoder
import kafka.utils.{PasswordEncoder, TestUtils}
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType, ProducerIdsRecord}
import org.apache.kafka.common.metadata.{AccessControlEntryRecord, ConfigRecord, MetadataRecordType, ProducerIdsRecord}
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{SecurityUtils, Time}
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import java.util.Properties
import scala.collection.Map
import java.util.{Properties, UUID}
import scala.collection.{Map, mutable}
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
/**
@ -423,4 +429,89 @@ class ZkMigrationClientTest extends QuorumTestHarness {
assertEquals(1, newProps.size())
assertEquals("100000", newProps.getProperty(TopicConfig.SEGMENT_MS_CONFIG))
}
def migrateAclsAndVerify(authorizer: AclAuthorizer, acls: Seq[AclBinding]): Unit = {
authorizer.createAcls(null, acls.asJava)
val batches = new ArrayBuffer[mutable.Buffer[ApiMessageAndVersion]]()
migrationClient.migrateAcls(batch => batches.addOne(batch.asScala))
val records = batches.flatten.map(_.message().asInstanceOf[AccessControlEntryRecord])
assertEquals(acls.size, records.size, "Expected one record for each ACLBinding")
}
def writeAclAndReadWithAuthorizer(
authorizer: AclAuthorizer,
resourcePattern: ResourcePattern,
ace: AccessControlEntry,
pred: Seq[AclBinding] => Boolean
): Seq[AclBinding] = {
val resourceFilter = new AclBindingFilter(
new ResourcePatternFilter(resourcePattern.resourceType(), resourcePattern.name(), resourcePattern.patternType()),
AclBindingFilter.ANY.entryFilter()
)
migrationState = migrationClient.writeAddedAcls(resourcePattern, List(ace).asJava, migrationState)
val (acls, ok) = TestUtils.computeUntilTrue(authorizer.acls(resourceFilter).asScala.toSeq)(pred)
assertTrue(ok)
acls
}
def deleteAclAndReadWithAuthorizer(
authorizer: AclAuthorizer,
resourcePattern: ResourcePattern,
ace: AccessControlEntry,
pred: Seq[AclBinding] => Boolean
): Seq[AclBinding] = {
val resourceFilter = new AclBindingFilter(
new ResourcePatternFilter(resourcePattern.resourceType(), resourcePattern.name(), resourcePattern.patternType()),
AclBindingFilter.ANY.entryFilter()
)
migrationState = migrationClient.removeDeletedAcls(resourcePattern, List(ace).asJava, migrationState)
val (acls, ok) = TestUtils.computeUntilTrue(authorizer.acls(resourceFilter).asScala.toSeq)(pred)
assertTrue(ok)
acls
}
@Test
def testAclsMigrateAndDualWrite(): Unit = {
val resource1 = new ResourcePattern(ResourceType.TOPIC, "foo-" + UUID.randomUUID(), PatternType.LITERAL)
val resource2 = new ResourcePattern(ResourceType.TOPIC, "bar-" + UUID.randomUUID(), PatternType.LITERAL)
val prefixedResource = new ResourcePattern(ResourceType.TOPIC, "bar-", PatternType.PREFIXED)
val username = "alice"
val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val wildcardPrincipal = SecurityUtils.parseKafkaPrincipal(WildcardPrincipalString)
val ace1 = new AccessControlEntry(principal.toString, WildcardHost, AclOperation.READ, AclPermissionType.ALLOW)
val acl1 = new AclBinding(resource1, ace1)
val ace2 = new AccessControlEntry(principal.toString, "192.168.0.1", AclOperation.WRITE, AclPermissionType.ALLOW)
val acl2 = new AclBinding(resource1, ace2)
val acl3 = new AclBinding(resource2, new AccessControlEntry(principal.toString, WildcardHost, AclOperation.DESCRIBE, AclPermissionType.ALLOW))
val acl4 = new AclBinding(prefixedResource, new AccessControlEntry(wildcardPrincipal.toString, WildcardHost, AclOperation.READ, AclPermissionType.ALLOW))
val authorizer = new AclAuthorizer()
try {
authorizer.configure(Map("zookeeper.connect" -> this.zkConnect).asJava)
// Migrate ACLs
migrateAclsAndVerify(authorizer, Seq(acl1, acl2, acl3, acl4))
// Delete one of resource1's ACLs
var resource1Acls = deleteAclAndReadWithAuthorizer(authorizer, resource1, ace2, acls => acls.size == 1)
assertEquals(acl1, resource1Acls.head)
// Delete the other ACL from resource1
deleteAclAndReadWithAuthorizer(authorizer, resource1, ace1, acls => acls.isEmpty)
// Add a new ACL for resource1
val newAce1 = new AccessControlEntry(principal.toString, "10.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW)
resource1Acls = writeAclAndReadWithAuthorizer(authorizer, resource1, newAce1, acls => acls.size == 1)
assertEquals(newAce1, resource1Acls.head.entry())
// Add a new ACL for resource2
val newAce2 = new AccessControlEntry(principal.toString, "10.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW)
val resource2Acls = writeAclAndReadWithAuthorizer(authorizer, resource2, newAce2, acls => acls.size == 2)
assertEquals(acl3, resource2Acls.head)
assertEquals(newAce2, resource2Acls.last.entry())
} finally {
authorizer.close()
}
}
}

View File

@ -16,8 +16,10 @@
*/
package org.apache.kafka.metadata.migration;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
@ -27,6 +29,7 @@ import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.loader.LoaderManifestType;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.authorizer.StandardAcl;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.LeaderAndEpoch;
@ -35,9 +38,12 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -529,6 +535,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
@Override
public void run() throws Exception {
MetadataImage prevImage = KRaftMigrationDriver.this.image;
KRaftMigrationDriver.this.image = image;
String metadataType = isSnapshot ? "snapshot" : "delta";
@ -582,12 +589,42 @@ public class KRaftMigrationDriver implements MetadataPublisher {
zkMigrationClient.writeProducerId(delta.producerIdsDelta().nextProducerId(), migrationState));
}
if (delta.aclsDelta() != null) {
Map<ResourcePattern, List<AccessControlEntry>> deletedAcls = new HashMap<>();
Map<ResourcePattern, List<AccessControlEntry>> addedAcls = new HashMap<>();
delta.aclsDelta().changes().forEach((uuid, standardAclOpt) -> {
if (!standardAclOpt.isPresent()) {
StandardAcl acl = prevImage.acls().acls().get(uuid);
if (acl != null) {
addStandardAclToMap(deletedAcls, acl);
} else {
throw new RuntimeException("Cannot remove deleted ACL " + uuid + " from ZK since it is " +
"not present in the previous AclsImage");
}
} else {
StandardAcl acl = standardAclOpt.get();
addStandardAclToMap(addedAcls, acl);
}
});
deletedAcls.forEach((resourcePattern, accessControlEntries) -> {
String name = "Deleting " + accessControlEntries.size() + " for resource " + resourcePattern;
apply(name, migrationState ->
zkMigrationClient.removeDeletedAcls(resourcePattern, accessControlEntries, migrationState));
});
addedAcls.forEach((resourcePattern, accessControlEntries) -> {
String name = "Adding " + accessControlEntries.size() + " for resource " + resourcePattern;
apply(name, migrationState ->
zkMigrationClient.writeAddedAcls(resourcePattern, accessControlEntries, migrationState));
});
}
// TODO: Unhappy path: Probably relinquish leadership and let new controller
// retry the write?
if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
log.trace("Sending RPCs to brokers for metadata {}.", metadataType);
propagator.sendRPCsToBrokersFromMetadataDelta(delta, image,
migrationLeadershipState.zkControllerEpoch());
migrationLeadershipState.zkControllerEpoch());
} else {
log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata has changed", metadataType);
}
@ -597,6 +634,13 @@ public class KRaftMigrationDriver implements MetadataPublisher {
completionHandler.accept(null);
}
private void addStandardAclToMap(Map<ResourcePattern, List<AccessControlEntry>> aclMap, StandardAcl acl) {
ResourcePattern resource = new ResourcePattern(acl.resourceType(), acl.resourceName(), acl.patternType());
aclMap.computeIfAbsent(resource, __ -> new ArrayList<>()).add(
new AccessControlEntry(acl.principal(), acl.host(), acl.operation(), acl.permissionType())
);
}
@Override
public void handleException(Throwable e) {
completionHandler.accept(e);

View File

@ -17,13 +17,16 @@
package org.apache.kafka.metadata.migration;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
/**
@ -101,6 +104,20 @@ public interface MigrationClient {
ZkMigrationLeadershipState state
);
ZkMigrationLeadershipState removeDeletedAcls(
ResourcePattern resourcePattern,
List<AccessControlEntry> deletedAcls,
ZkMigrationLeadershipState state
);
ZkMigrationLeadershipState writeAddedAcls(
ResourcePattern resourcePattern,
List<AccessControlEntry> addedAcls,
ZkMigrationLeadershipState state
);
void iterateAcls(BiConsumer<ResourcePattern, Set<AccessControlEntry>> aclConsumer);
void readAllMetadata(Consumer<List<ApiMessageAndVersion>> batchConsumer, Consumer<Integer> brokerIdConsumer);
Set<Integer> readBrokerIds();

View File

@ -1,38 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 6,
"type": "metadata",
"name": "AccessControlRecord",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ResourceType", "type": "int8", "versions": "0+",
"about": "The resource type" },
{ "name": "ResourceName", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The resource name, or null if this is for the default resource." },
{ "name": "PatternType", "type": "int8", "versions": "0+",
"about": "The pattern type (literal, prefixed, etc.)" },
{ "name": "Principal", "type": "string", "versions": "0+",
"about": "The principal name." },
{ "name": "Host", "type": "string", "versions": "0+",
"about": "The host." },
{ "name": "Operation", "type": "int8", "versions": "0+",
"about": "The operation type." },
{ "name": "PermissionType", "type": "int8", "versions": "0+",
"about": "The permission type (allow, deny)." }
]
}

View File

@ -17,10 +17,12 @@
package org.apache.kafka.metadata.migration;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
@ -49,6 +51,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class KRaftMigrationDriverTest {
@ -148,6 +151,29 @@ public class KRaftMigrationDriverTest {
return state;
}
@Override
public ZkMigrationLeadershipState removeDeletedAcls(
ResourcePattern resourcePattern,
List<AccessControlEntry> deletedAcls,
ZkMigrationLeadershipState state
) {
return state;
}
@Override
public ZkMigrationLeadershipState writeAddedAcls(
ResourcePattern resourcePattern,
List<AccessControlEntry> addedAcls,
ZkMigrationLeadershipState state
) {
return state;
}
@Override
public void iterateAcls(BiConsumer<ResourcePattern, Set<AccessControlEntry>> aclConsumer) {
}
@Override
public void readAllMetadata(
Consumer<List<ApiMessageAndVersion>> batchConsumer,