diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java index 42f03367c22..197272a3e66 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED; import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED; @@ -58,19 +59,34 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer { */ private final CompletableFuture initialLoadFuture = new CompletableFuture<>(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + /** - * The current data. Can be read without a lock. Must be written while holding the object lock. + * The current data. We use a read-write lock to synchronize reads and writes to the data. We + * expect one writer and multiple readers accessing the ACL data, and we use the lock to make + * sure we have consistent reads when writer tries to change the data. */ private volatile StandardAuthorizerData data = StandardAuthorizerData.createEmpty(); @Override - public synchronized void setAclMutator(AclMutator aclMutator) { - this.data = data.copyWithNewAclMutator(aclMutator); + public void setAclMutator(AclMutator aclMutator) { + lock.writeLock().lock(); + try { + this.data = data.copyWithNewAclMutator(aclMutator); + } finally { + lock.writeLock().unlock(); + } } @Override public AclMutator aclMutatorOrException() { - AclMutator aclMutator = data.aclMutator; + AclMutator aclMutator; + lock.readLock().lock(); + try { + aclMutator = data.aclMutator; + } finally { + lock.readLock().unlock(); + } if (aclMutator == null) { throw new NotControllerException("The current node is not the active controller."); } @@ -78,8 +94,13 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer { } @Override - public synchronized void completeInitialLoad() { - data = data.copyWithNewLoadingComplete(true); + public void completeInitialLoad() { + lock.writeLock().lock(); + try { + data = data.copyWithNewLoadingComplete(true); + } finally { + lock.writeLock().unlock(); + } data.log.info("Completed initial ACL load process."); initialLoadFuture.complete(null); } @@ -97,17 +118,36 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer { @Override public void addAcl(Uuid id, StandardAcl acl) { - data.addAcl(id, acl); + lock.writeLock().lock(); + try { + data.addAcl(id, acl); + } finally { + lock.writeLock().unlock(); + } } @Override public void removeAcl(Uuid id) { - data.removeAcl(id); + lock.writeLock().lock(); + try { + data.removeAcl(id); + } finally { + lock.writeLock().unlock(); + } } @Override - public synchronized void loadSnapshot(Map acls) { - data = data.copyWithNewAcls(acls.entrySet()); + public void loadSnapshot(Map acls) { + StandardAuthorizerData newData = StandardAuthorizerData.createEmpty(); + for (Map.Entry entry : acls.entrySet()) { + newData.addAcl(entry.getKey(), entry.getValue()); + } + lock.writeLock().lock(); + try { + data = data.copyWithNewAcls(newData.getAclsByResource(), newData.getAclsById()); + } finally { + lock.writeLock().unlock(); + } } @Override @@ -129,23 +169,40 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer { public List authorize( AuthorizableRequestContext requestContext, List actions) { - StandardAuthorizerData curData = data; List results = new ArrayList<>(actions.size()); - for (Action action: actions) { - AuthorizationResult result = curData.authorize(requestContext, action); - results.add(result); + lock.readLock().lock(); + try { + StandardAuthorizerData curData = data; + for (Action action : actions) { + AuthorizationResult result = curData.authorize(requestContext, action); + results.add(result); + } + } finally { + lock.readLock().unlock(); } return results; } @Override public Iterable acls(AclBindingFilter filter) { - return data.acls(filter); + lock.readLock().lock(); + try { + // The Iterable returned here is consistent because it is created over a read-only + // copy of ACLs data. + return data.acls(filter); + } finally { + lock.readLock().unlock(); + } } @Override public int aclCount() { - return data.aclCount(); + lock.readLock().lock(); + try { + return data.aclCount(); + } finally { + lock.readLock().unlock(); + } } @Override @@ -156,7 +213,7 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer { } @Override - public synchronized void configure(Map configs) { + public void configure(Map configs) { Set superUsers = getConfiguredSuperUsers(configs); AuthorizationResult defaultResult = getDefaultResult(configs); int nodeId; @@ -165,17 +222,32 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer { } catch (Exception e) { nodeId = -1; } - this.data = data.copyWithNewConfig(nodeId, superUsers, defaultResult); + lock.writeLock().lock(); + try { + data = data.copyWithNewConfig(nodeId, superUsers, defaultResult); + } finally { + lock.writeLock().unlock(); + } this.data.log.info("set super.users={}, default result={}", String.join(",", superUsers), defaultResult); } // VisibleForTesting Set superUsers() { - return new HashSet<>(data.superUsers()); + lock.readLock().lock(); + try { + return new HashSet<>(data.superUsers()); + } finally { + lock.readLock().unlock(); + } } AuthorizationResult defaultResult() { - return data.defaultResult(); + lock.readLock().lock(); + try { + return data.defaultResult(); + } finally { + lock.readLock().unlock(); + } } static Set getConfiguredSuperUsers(Map configs) { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java index d9ffd17562f..c6e3b74a2ab 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java @@ -36,16 +36,15 @@ import org.apache.kafka.server.authorizer.AuthorizationResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; +import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.Iterator; -import java.util.Map.Entry; +import java.util.List; import java.util.NavigableSet; -import java.util.NoSuchElementException; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.TreeSet; import static org.apache.kafka.common.acl.AclOperation.ALL; import static org.apache.kafka.common.acl.AclOperation.ALTER; @@ -64,7 +63,7 @@ import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED; /** * A class which encapsulates the configuration and the ACL data owned by StandardAuthorizer. * - * The methods in this class support lockless concurrent access. + * The class is not thread-safe. */ public class StandardAuthorizerData { /** @@ -111,12 +110,12 @@ public class StandardAuthorizerData { /** * Contains all of the current ACLs sorted by (resource type, resource name). */ - private final ConcurrentSkipListSet aclsByResource; + private final TreeSet aclsByResource; /** * Contains all of the current ACLs indexed by UUID. */ - private final ConcurrentHashMap aclsById; + private final HashMap aclsById; private static Logger createLogger(int nodeId) { return new LogContext("[StandardAuthorizer " + nodeId + "] ").logger(StandardAuthorizerData.class); @@ -132,7 +131,7 @@ public class StandardAuthorizerData { false, Collections.emptySet(), DENIED, - new ConcurrentSkipListSet<>(), new ConcurrentHashMap<>()); + new TreeSet<>(), new HashMap<>()); } private StandardAuthorizerData(Logger log, @@ -140,8 +139,8 @@ public class StandardAuthorizerData { boolean loadingComplete, Set superUsers, AuthorizationResult defaultResult, - ConcurrentSkipListSet aclsByResource, - ConcurrentHashMap aclsById) { + TreeSet aclsByResource, + HashMap aclsById) { this.log = log; this.auditLog = auditLogger(); this.aclMutator = aclMutator; @@ -186,19 +185,17 @@ public class StandardAuthorizerData { aclsById); } - StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) { - StandardAuthorizerData newData = new StandardAuthorizerData( + StandardAuthorizerData copyWithNewAcls(TreeSet aclsByResource, HashMap aclsById) { + StandardAuthorizerData newData = new StandardAuthorizerData( log, aclMutator, loadingComplete, superUsers, defaultRule.result, - new ConcurrentSkipListSet<>(), - new ConcurrentHashMap<>()); - for (Entry entry : aclEntries) { - newData.addAcl(entry.getKey(), entry.getValue()); - } - log.info("Applied {} acl(s) from image.", aclEntries.size()); + aclsByResource, + aclsById); + log.info("Initialized with {} acl(s).", aclsById.size()); return newData; } @@ -529,55 +526,21 @@ public class StandardAuthorizerData { return acl.permissionType().equals(ALLOW) ? ALLOWED : DENIED; } + /** + * Creates a consistent Iterable on read-only copy of AclBindings data for the given filter. + * + * @param filter The filter constraining the AclBindings to be present in the Iterable. + * @return Iterable over AclBindings matching the filter. + */ Iterable acls(AclBindingFilter filter) { - return new AclIterable(filter); - } - - class AclIterable implements Iterable { - private final AclBindingFilter filter; - - AclIterable(AclBindingFilter filter) { - this.filter = filter; - } - - @Override - public Iterator iterator() { - return new AclIterator(filter); - } - } - - class AclIterator implements Iterator { - private final AclBindingFilter filter; - private final Iterator iterator; - private AclBinding next; - - AclIterator(AclBindingFilter filter) { - this.filter = filter; - this.iterator = aclsByResource.iterator(); - this.next = null; - } - - @Override - public boolean hasNext() { - while (next == null) { - if (!iterator.hasNext()) return false; - AclBinding binding = iterator.next().toBinding(); - if (filter.matches(binding)) { - next = binding; - } + List aclBindingList = new ArrayList<>(); + aclsByResource.forEach(acl -> { + AclBinding aclBinding = acl.toBinding(); + if (filter.matches(aclBinding)) { + aclBindingList.add(aclBinding); } - return true; - } - - @Override - public AclBinding next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - AclBinding result = next; - next = null; - return result; - } + }); + return aclBindingList; } private interface MatchingRule { @@ -654,4 +617,12 @@ public class StandardAuthorizerData { } } } + + TreeSet getAclsByResource() { + return aclsByResource; + } + + HashMap getAclsById() { + return aclsById; + } }