KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads. (#12628)

Fixes an issue with StandardAuthorizer#authorize that allowed inconsistent results. The underlying 
concurrent data structure (ConcurrentSkipListMap) had weak consistency guarantees. This meant
that a concurrent update to the authorizer data could result in the authorize function processing 
ACL updates out of order.

This patch replaces the concurrent data structures with regular non-thread safe equivalents and uses
a read/write lock for thread safety and strong consistency.

Reviewers: David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>, Colin P. McCabe <cmccabe@apache.org>, Luke Chen <showuon@gmail.com>
This commit is contained in:
Akhilesh C 2022-09-20 13:54:18 -07:00 committed by GitHub
parent ae4bb0c6fa
commit 6c6b8e2f96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 129 additions and 86 deletions

View File

@ -39,6 +39,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
@ -58,19 +59,34 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
*/
private final CompletableFuture<Void> initialLoadFuture = new CompletableFuture<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
/**
* The current data. Can be read without a lock. Must be written while holding the object lock.
* The current data. We use a read-write lock to synchronize reads and writes to the data. We
* expect one writer and multiple readers accessing the ACL data, and we use the lock to make
* sure we have consistent reads when writer tries to change the data.
*/
private volatile StandardAuthorizerData data = StandardAuthorizerData.createEmpty();
@Override
public synchronized void setAclMutator(AclMutator aclMutator) {
public void setAclMutator(AclMutator aclMutator) {
lock.writeLock().lock();
try {
this.data = data.copyWithNewAclMutator(aclMutator);
} finally {
lock.writeLock().unlock();
}
}
@Override
public AclMutator aclMutatorOrException() {
AclMutator aclMutator = data.aclMutator;
AclMutator aclMutator;
lock.readLock().lock();
try {
aclMutator = data.aclMutator;
} finally {
lock.readLock().unlock();
}
if (aclMutator == null) {
throw new NotControllerException("The current node is not the active controller.");
}
@ -78,8 +94,13 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
}
@Override
public synchronized void completeInitialLoad() {
public void completeInitialLoad() {
lock.writeLock().lock();
try {
data = data.copyWithNewLoadingComplete(true);
} finally {
lock.writeLock().unlock();
}
data.log.info("Completed initial ACL load process.");
initialLoadFuture.complete(null);
}
@ -97,17 +118,36 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
@Override
public void addAcl(Uuid id, StandardAcl acl) {
lock.writeLock().lock();
try {
data.addAcl(id, acl);
} finally {
lock.writeLock().unlock();
}
}
@Override
public void removeAcl(Uuid id) {
lock.writeLock().lock();
try {
data.removeAcl(id);
} finally {
lock.writeLock().unlock();
}
}
@Override
public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
data = data.copyWithNewAcls(acls.entrySet());
public void loadSnapshot(Map<Uuid, StandardAcl> acls) {
StandardAuthorizerData newData = StandardAuthorizerData.createEmpty();
for (Map.Entry<Uuid, StandardAcl> entry : acls.entrySet()) {
newData.addAcl(entry.getKey(), entry.getValue());
}
lock.writeLock().lock();
try {
data = data.copyWithNewAcls(newData.getAclsByResource(), newData.getAclsById());
} finally {
lock.writeLock().unlock();
}
}
@Override
@ -129,23 +169,40 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
public List<AuthorizationResult> authorize(
AuthorizableRequestContext requestContext,
List<Action> actions) {
StandardAuthorizerData curData = data;
List<AuthorizationResult> results = new ArrayList<>(actions.size());
for (Action action: actions) {
lock.readLock().lock();
try {
StandardAuthorizerData curData = data;
for (Action action : actions) {
AuthorizationResult result = curData.authorize(requestContext, action);
results.add(result);
}
} finally {
lock.readLock().unlock();
}
return results;
}
@Override
public Iterable<AclBinding> acls(AclBindingFilter filter) {
lock.readLock().lock();
try {
// The Iterable returned here is consistent because it is created over a read-only
// copy of ACLs data.
return data.acls(filter);
} finally {
lock.readLock().unlock();
}
}
@Override
public int aclCount() {
lock.readLock().lock();
try {
return data.aclCount();
} finally {
lock.readLock().unlock();
}
}
@Override
@ -156,7 +213,7 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
}
@Override
public synchronized void configure(Map<String, ?> configs) {
public void configure(Map<String, ?> configs) {
Set<String> superUsers = getConfiguredSuperUsers(configs);
AuthorizationResult defaultResult = getDefaultResult(configs);
int nodeId;
@ -165,17 +222,32 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
} catch (Exception e) {
nodeId = -1;
}
this.data = data.copyWithNewConfig(nodeId, superUsers, defaultResult);
lock.writeLock().lock();
try {
data = data.copyWithNewConfig(nodeId, superUsers, defaultResult);
} finally {
lock.writeLock().unlock();
}
this.data.log.info("set super.users={}, default result={}", String.join(",", superUsers), defaultResult);
}
// VisibleForTesting
Set<String> superUsers() {
lock.readLock().lock();
try {
return new HashSet<>(data.superUsers());
} finally {
lock.readLock().unlock();
}
}
AuthorizationResult defaultResult() {
lock.readLock().lock();
try {
return data.defaultResult();
} finally {
lock.readLock().unlock();
}
}
static Set<String> getConfiguredSuperUsers(Map<String, ?> configs) {

View File

@ -36,16 +36,15 @@ import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.List;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.TreeSet;
import static org.apache.kafka.common.acl.AclOperation.ALL;
import static org.apache.kafka.common.acl.AclOperation.ALTER;
@ -64,7 +63,7 @@ import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
/**
* A class which encapsulates the configuration and the ACL data owned by StandardAuthorizer.
*
* The methods in this class support lockless concurrent access.
* The class is not thread-safe.
*/
public class StandardAuthorizerData {
/**
@ -111,12 +110,12 @@ public class StandardAuthorizerData {
/**
* Contains all of the current ACLs sorted by (resource type, resource name).
*/
private final ConcurrentSkipListSet<StandardAcl> aclsByResource;
private final TreeSet<StandardAcl> aclsByResource;
/**
* Contains all of the current ACLs indexed by UUID.
*/
private final ConcurrentHashMap<Uuid, StandardAcl> aclsById;
private final HashMap<Uuid, StandardAcl> aclsById;
private static Logger createLogger(int nodeId) {
return new LogContext("[StandardAuthorizer " + nodeId + "] ").logger(StandardAuthorizerData.class);
@ -132,7 +131,7 @@ public class StandardAuthorizerData {
false,
Collections.emptySet(),
DENIED,
new ConcurrentSkipListSet<>(), new ConcurrentHashMap<>());
new TreeSet<>(), new HashMap<>());
}
private StandardAuthorizerData(Logger log,
@ -140,8 +139,8 @@ public class StandardAuthorizerData {
boolean loadingComplete,
Set<String> superUsers,
AuthorizationResult defaultResult,
ConcurrentSkipListSet<StandardAcl> aclsByResource,
ConcurrentHashMap<Uuid, StandardAcl> aclsById) {
TreeSet<StandardAcl> aclsByResource,
HashMap<Uuid, StandardAcl> aclsById) {
this.log = log;
this.auditLog = auditLogger();
this.aclMutator = aclMutator;
@ -186,19 +185,17 @@ public class StandardAuthorizerData {
aclsById);
}
StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, StandardAcl>> aclEntries) {
StandardAuthorizerData copyWithNewAcls(TreeSet<StandardAcl> aclsByResource, HashMap<Uuid,
StandardAcl> aclsById) {
StandardAuthorizerData newData = new StandardAuthorizerData(
log,
aclMutator,
loadingComplete,
superUsers,
defaultRule.result,
new ConcurrentSkipListSet<>(),
new ConcurrentHashMap<>());
for (Entry<Uuid, StandardAcl> entry : aclEntries) {
newData.addAcl(entry.getKey(), entry.getValue());
}
log.info("Applied {} acl(s) from image.", aclEntries.size());
aclsByResource,
aclsById);
log.info("Initialized with {} acl(s).", aclsById.size());
return newData;
}
@ -529,55 +526,21 @@ public class StandardAuthorizerData {
return acl.permissionType().equals(ALLOW) ? ALLOWED : DENIED;
}
/**
* Creates a consistent Iterable on read-only copy of AclBindings data for the given filter.
*
* @param filter The filter constraining the AclBindings to be present in the Iterable.
* @return Iterable over AclBindings matching the filter.
*/
Iterable<AclBinding> acls(AclBindingFilter filter) {
return new AclIterable(filter);
}
class AclIterable implements Iterable<AclBinding> {
private final AclBindingFilter filter;
AclIterable(AclBindingFilter filter) {
this.filter = filter;
}
@Override
public Iterator<AclBinding> iterator() {
return new AclIterator(filter);
}
}
class AclIterator implements Iterator<AclBinding> {
private final AclBindingFilter filter;
private final Iterator<StandardAcl> iterator;
private AclBinding next;
AclIterator(AclBindingFilter filter) {
this.filter = filter;
this.iterator = aclsByResource.iterator();
this.next = null;
}
@Override
public boolean hasNext() {
while (next == null) {
if (!iterator.hasNext()) return false;
AclBinding binding = iterator.next().toBinding();
if (filter.matches(binding)) {
next = binding;
}
}
return true;
}
@Override
public AclBinding next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
AclBinding result = next;
next = null;
return result;
List<AclBinding> aclBindingList = new ArrayList<>();
aclsByResource.forEach(acl -> {
AclBinding aclBinding = acl.toBinding();
if (filter.matches(aclBinding)) {
aclBindingList.add(aclBinding);
}
});
return aclBindingList;
}
private interface MatchingRule {
@ -654,4 +617,12 @@ public class StandardAuthorizerData {
}
}
}
TreeSet<StandardAcl> getAclsByResource() {
return aclsByResource;
}
HashMap<Uuid, StandardAcl> getAclsById() {
return aclsById;
}
}