KAFKA-13651; Add audit logging to `StandardAuthorizer` (#12031)

This patch adds audit support through the kafka.authorizer.logger logger to StandardAuthorizer. It
follows the same conventions as AclAuthorizer with a similarly formatted log message. When
logIfAllowed is set in the Action, then the log message is at DEBUG level; otherwise, we log at
trace. When logIfDenied is set, then the log message is at INFO level; otherwise, we again log at
TRACE.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Jason Gustafson 2022-04-13 10:33:15 -07:00 committed by GitHub
parent a6d86b9998
commit f97646488c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 347 additions and 69 deletions

View File

@ -1136,6 +1136,8 @@ project(':metadata') {
compileOnly libs.log4j
testImplementation libs.junitJupiter
testImplementation libs.hamcrest
testImplementation libs.mockitoCore
testImplementation libs.mockitoInline
testImplementation libs.slf4jlog4j
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':raft').sourceSets.test.output

View File

@ -89,7 +89,7 @@ public class ResourcePattern {
@Override
public String toString() {
return "ResourcePattern(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ", patternType=" + patternType + ")";
return "ResourcePattern(resourceType=" + resourceType + ", name=" + name + ", patternType=" + patternType + ")";
}
/**

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import java.util.Objects;
@ -96,6 +97,17 @@ final public class StandardAcl implements Comparable<StandardAcl> {
return principal;
}
public KafkaPrincipal kafkaPrincipal() {
int colonIndex = principal.indexOf(":");
if (colonIndex == -1) {
throw new IllegalStateException("Could not parse principal from `" + principal + "` " +
"(no colon is present separating the principal type from the principal name)");
}
String principalType = principal.substring(0, colonIndex);
String principalName = principal.substring(colonIndex + 1);
return new KafkaPrincipal(principalType, principalName);
}
public String host() {
return host;
}

View File

@ -22,12 +22,18 @@ import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
@ -69,12 +75,18 @@ public class StandardAuthorizerData {
* The principal entry used in ACLs that match any principal.
*/
public static final String WILDCARD_PRINCIPAL = "User:*";
public static final KafkaPrincipal WILDCARD_KAFKA_PRINCIPAL = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*");
/**
* The logger to use.
*/
final Logger log;
/**
* Logger to use for auditing.
*/
final Logger auditLog;
/**
* The current AclMutator.
*/
@ -88,7 +100,7 @@ public class StandardAuthorizerData {
/**
* The result to return if no ACLs match.
*/
private final AuthorizationResult defaultResult;
private final DefaultRule defaultRule;
/**
* Contains all of the current ACLs sorted by (resource type, resource name).
@ -104,6 +116,10 @@ public class StandardAuthorizerData {
return new LogContext("[StandardAuthorizer " + nodeId + "] ").logger(StandardAuthorizerData.class);
}
private static Logger auditLogger() {
return LoggerFactory.getLogger("kafka.authorizer.logger");
}
static StandardAuthorizerData createEmpty() {
return new StandardAuthorizerData(createLogger(-1),
null,
@ -119,18 +135,20 @@ public class StandardAuthorizerData {
ConcurrentSkipListSet<StandardAcl> aclsByResource,
ConcurrentHashMap<Uuid, StandardAcl> aclsById) {
this.log = log;
this.auditLog = auditLogger();
this.aclMutator = aclMutator;
this.superUsers = superUsers;
this.defaultResult = defaultResult;
this.defaultRule = new DefaultRule(defaultResult);
this.aclsByResource = aclsByResource;
this.aclsById = aclsById;
}
StandardAuthorizerData copyWithNewAclMutator(AclMutator newAclMutator) {
return new StandardAuthorizerData(log,
return new StandardAuthorizerData(
log,
newAclMutator,
superUsers,
defaultResult,
defaultRule.result,
aclsByResource,
aclsById);
}
@ -152,7 +170,7 @@ public class StandardAuthorizerData {
log,
aclMutator,
superUsers,
defaultResult,
defaultRule.result,
new ConcurrentSkipListSet<>(),
new ConcurrentHashMap<>());
for (Entry<Uuid, StandardAcl> entry : aclEntries) {
@ -206,18 +224,13 @@ public class StandardAuthorizerData {
}
AuthorizationResult defaultResult() {
return defaultResult;
return defaultRule.result;
}
int aclCount() {
return aclsById.size();
}
static class AuthorizationResultBuilder {
boolean foundDeny = false;
boolean foundAllow = false;
}
/**
* Authorize an action based on the current set of ACLs.
*
@ -227,18 +240,98 @@ public class StandardAuthorizerData {
* result. In general it makes more sense to configure the default result to be
* DENY, but some people (and unit tests) configure it as ALLOW.
*/
AuthorizationResult authorize(AuthorizableRequestContext requestContext,
Action action) {
public AuthorizationResult authorize(
AuthorizableRequestContext requestContext,
Action action
) {
KafkaPrincipal principal = baseKafkaPrincipal(requestContext);
final MatchingRule rule;
// Superusers are authorized to do anything.
if (superUsers.contains(requestContext.principal().toString())) {
if (log.isTraceEnabled()) {
log.trace("authorize(requestContext=" + requestContext + ", action=" + action +
"): ALLOWED because " + requestContext.principal().toString() +
" is a superuser");
if (superUsers.contains(principal.toString())) {
rule = SuperUserRule.INSTANCE;
} else {
MatchingAclRule aclRule = findAclRule(
matchingPrincipals(requestContext),
requestContext.clientAddress().getHostAddress(),
action
);
if (aclRule != null) {
rule = aclRule;
} else {
// If nothing matched, we return the default result.
rule = defaultRule;
}
return ALLOWED;
}
logAuditMessage(principal, requestContext, action, rule);
return rule.result();
}
private String buildAuditMessage(
KafkaPrincipal principal,
AuthorizableRequestContext context,
Action action,
MatchingRule rule
) {
StringBuilder bldr = new StringBuilder();
bldr.append("Principal = ").append(principal);
bldr.append(" is ").append(rule.result() == ALLOWED ? "Allowed" : "Denied");
bldr.append(" operation = ").append(action.operation());
bldr.append(" from host = ").append(context.clientAddress().getHostAddress());
bldr.append(" on resource = ");
appendResourcePattern(action.resourcePattern(), bldr);
bldr.append(" for request = ").append(ApiKeys.forId(context.requestType()).name);
bldr.append(" with resourceRefCount = ").append(action.resourceReferenceCount());
bldr.append(" based on rule ").append(rule);
return bldr.toString();
}
private void appendResourcePattern(ResourcePattern resourcePattern, StringBuilder bldr) {
bldr.append(SecurityUtils.resourceTypeName(resourcePattern.resourceType()))
.append(":")
.append(resourcePattern.patternType())
.append(":")
.append(resourcePattern.name());
}
private void logAuditMessage(
KafkaPrincipal principal,
AuthorizableRequestContext requestContext,
Action action,
MatchingRule rule
) {
switch (rule.result()) {
case ALLOWED:
// logIfAllowed is true if access is granted to the resource as a result of this authorization.
// In this case, log at debug level. If false, no access is actually granted, the result is used
// only to determine authorized operations. So log only at trace level.
if (action.logIfAllowed() && auditLog.isDebugEnabled()) {
auditLog.debug(buildAuditMessage(principal, requestContext, action, rule));
} else if (auditLog.isTraceEnabled()) {
auditLog.trace(buildAuditMessage(principal, requestContext, action, rule));
}
return;
case DENIED:
// logIfDenied is true if access to the resource was explicitly requested. Since this is an attempt
// to access unauthorized resources, log at info level. If false, this is either a request to determine
// authorized operations or a filter (e.g for regex subscriptions) to filter out authorized resources.
// In this case, log only at trace level.
if (action.logIfDenied()) {
auditLog.info(buildAuditMessage(principal, requestContext, action, rule));
} else if (auditLog.isTraceEnabled()) {
auditLog.trace(buildAuditMessage(principal, requestContext, action, rule));
}
}
}
private MatchingAclRule findAclRule(
Set<KafkaPrincipal> matchingPrincipals,
String host,
Action action
) {
// This code relies on the ordering of StandardAcl within the NavigableMap.
// Entries are sorted by resource type first, then REVERSE resource name.
// Therefore, we can find all the applicable ACLs by starting at
@ -255,7 +348,7 @@ public class StandardAuthorizerData {
// 5. rs=TOPIC rn=eeee pt=LITERAL
//
// Once we reached element 5, we would stop scanning.
AuthorizationResultBuilder builder = new AuthorizationResultBuilder();
MatchingAclBuilder matchingAclBuilder = new MatchingAclBuilder();
StandardAcl exemplar = new StandardAcl(
action.resourcePattern().resourceType(),
action.resourcePattern().name(),
@ -264,8 +357,10 @@ public class StandardAuthorizerData {
"",
AclOperation.UNKNOWN,
AclPermissionType.UNKNOWN);
checkSection(action, exemplar, requestContext, builder);
if (builder.foundDeny) return DENIED;
checkSection(action, exemplar, matchingPrincipals, host, matchingAclBuilder);
if (matchingAclBuilder.foundDeny()) {
return matchingAclBuilder.build();
}
// In addition to ACLs for this specific resource name, there can also be wildcard
// ACLs that match any resource name. These are stored as type = LITERAL,
@ -278,30 +373,17 @@ public class StandardAuthorizerData {
"",
AclOperation.UNKNOWN,
AclPermissionType.UNKNOWN);
checkSection(action, exemplar, requestContext, builder);
if (builder.foundDeny) return DENIED;
// If we found ALLOW ACLs, the action is allowed.
if (builder.foundAllow) {
if (log.isTraceEnabled()) {
log.trace("authorize(requestContext=" + requestContext + ", action=" +
action + "): ALLOWED");
}
return ALLOWED;
}
// If nothing matched, we return the default result.
if (log.isTraceEnabled()) {
log.trace("authorize(requestContext=" + requestContext + ", action=" +
action + "): returning default result " + defaultResult);
}
return defaultResult;
checkSection(action, exemplar, matchingPrincipals, host, matchingAclBuilder);
return matchingAclBuilder.build();
}
void checkSection(Action action,
StandardAcl exemplar,
AuthorizableRequestContext requestContext,
AuthorizationResultBuilder builder) {
private void checkSection(
Action action,
StandardAcl exemplar,
Set<KafkaPrincipal> matchingPrincipals,
String host,
MatchingAclBuilder matchingAclBuilder
) {
NavigableSet<StandardAcl> tailSet = aclsByResource.tailSet(exemplar, true);
String resourceName = action.resourcePattern().name();
for (Iterator<StandardAcl> iterator = tailSet.iterator();
@ -325,15 +407,11 @@ public class StandardAuthorizerData {
// stepped outside of the section we care about and should stop scanning.
break;
}
AuthorizationResult result = findResult(action, requestContext, acl);
AuthorizationResult result = findResult(action, matchingPrincipals, host, acl);
if (ALLOWED == result) {
builder.foundAllow = true;
matchingAclBuilder.allowAcl = acl;
} else if (DENIED == result) {
if (log.isTraceEnabled()) {
log.trace("authorize(requestContext=" + requestContext + ", action=" +
action + "): DENIED because of " + acl);
}
builder.foundDeny = true;
matchingAclBuilder.denyAcl = acl;
return;
}
}
@ -351,30 +429,55 @@ public class StandardAuthorizerData {
private static final Set<AclOperation> IMPLIES_DESCRIBE_CONFIGS = Collections.unmodifiableSet(
EnumSet.of(DESCRIBE_CONFIGS, ALTER_CONFIGS));
static AuthorizationResult findResult(Action action,
AuthorizableRequestContext requestContext,
StandardAcl acl) {
return findResult(
action,
matchingPrincipals(requestContext),
requestContext.clientAddress().getHostAddress(),
acl
);
}
static KafkaPrincipal baseKafkaPrincipal(AuthorizableRequestContext context) {
KafkaPrincipal sessionPrincipal = context.principal();
return sessionPrincipal.getClass().equals(KafkaPrincipal.class)
? sessionPrincipal
: new KafkaPrincipal(sessionPrincipal.getPrincipalType(), sessionPrincipal.getName());
}
static Set<KafkaPrincipal> matchingPrincipals(AuthorizableRequestContext context) {
KafkaPrincipal sessionPrincipal = context.principal();
KafkaPrincipal basePrincipal = sessionPrincipal.getClass().equals(KafkaPrincipal.class)
? sessionPrincipal
: new KafkaPrincipal(sessionPrincipal.getPrincipalType(), sessionPrincipal.getName());
return Utils.mkSet(basePrincipal, WILDCARD_KAFKA_PRINCIPAL);
}
/**
* Determine what the result of applying an ACL to the given action and request
* context should be. Note that this function assumes that the resource name matches;
* the resource name is not checked here.
*
* @param action The input action.
* @param requestContext The input request context.
* @param acl The input ACL.
* @return null if the ACL does not match. The authorization result
* otherwise.
* @param action The input action.
* @param matchingPrincipals The set of input matching principals
* @param host The input host.
* @param acl The input ACL.
* @return null if the ACL does not match. The authorization result
* otherwise.
*/
static AuthorizationResult findResult(Action action,
AuthorizableRequestContext requestContext,
Set<KafkaPrincipal> matchingPrincipals,
String host,
StandardAcl acl) {
// Check if the principal matches. If it doesn't, return no result (null).
if (!acl.principal().equals(WILDCARD_PRINCIPAL)) {
if (!acl.principal().equals(requestContext.principal().toString())) return null;
if (!matchingPrincipals.contains(acl.kafkaPrincipal())) {
return null;
}
// Check if the host matches. If it doesn't, return no result (null).
// The hostname should be cached in the InetAddress object, so calling this more
// than once shouldn't be too expensive.
if (!acl.host().equals(WILDCARD)) {
String host = requestContext.clientAddress().getHostAddress();
if (!acl.host().equals(host)) return null;
if (!acl.host().equals(WILDCARD) && !acl.host().equals(host)) {
return null;
}
// Check if the operation field matches. Here we hit a slight complication.
// ACLs for various operations (READ, WRITE, DELETE, ALTER), "imply" the presence
@ -456,4 +559,79 @@ public class StandardAuthorizerData {
return result;
}
}
private interface MatchingRule {
AuthorizationResult result();
}
private static class SuperUserRule implements MatchingRule {
private static final SuperUserRule INSTANCE = new SuperUserRule();
@Override
public AuthorizationResult result() {
return ALLOWED;
}
@Override
public String toString() {
return "SuperUser";
}
}
private static class DefaultRule implements MatchingRule {
private final AuthorizationResult result;
private DefaultRule(AuthorizationResult result) {
this.result = result;
}
@Override
public AuthorizationResult result() {
return result;
}
@Override
public String toString() {
return result == ALLOWED ? "DefaultAllow" : "DefaultDeny";
}
}
private static class MatchingAclRule implements MatchingRule {
private final StandardAcl acl;
private final AuthorizationResult result;
private MatchingAclRule(StandardAcl acl, AuthorizationResult result) {
this.acl = acl;
this.result = result;
}
@Override
public AuthorizationResult result() {
return result;
}
@Override
public String toString() {
return "MatchingAcl(acl=" + acl + ")";
}
}
private static class MatchingAclBuilder {
private StandardAcl denyAcl;
private StandardAcl allowAcl;
boolean foundDeny() {
return denyAcl != null;
}
MatchingAclRule build() {
if (denyAcl != null) {
return new MatchingAclRule(denyAcl, DENIED);
} else if (allowAcl != null) {
return new MatchingAclRule(allowAcl, ALLOWED);
} else {
return null;
}
}
}
}

View File

@ -32,6 +32,12 @@ import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.util.Arrays;
@ -40,7 +46,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
@ -119,8 +124,6 @@ public class StandardAuthorizerTest {
new ResourcePattern(resourceType, resourceName, LITERAL), 1, false, false);
}
private final static AtomicLong NEXT_ID = new AtomicLong(0);
static StandardAcl newFooAcl(AclOperation op, AclPermissionType permission) {
return new StandardAcl(
TOPIC,
@ -428,4 +431,87 @@ public class StandardAuthorizerTest {
newAction(WRITE, GROUP, "arbitrary"),
newAction(READ, TOPIC, "ala"))));
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDenyAuditLogging(boolean logIfDenied) throws Exception {
try (MockedStatic<LoggerFactory> mockedLoggerFactory = Mockito.mockStatic(LoggerFactory.class)) {
Logger otherLog = Mockito.mock(Logger.class);
Logger auditLog = Mockito.mock(Logger.class);
mockedLoggerFactory
.when(() -> LoggerFactory.getLogger("kafka.authorizer.logger"))
.thenReturn(auditLog);
mockedLoggerFactory
.when(() -> LoggerFactory.getLogger(Mockito.any(Class.class)))
.thenReturn(otherLog);
Mockito.when(auditLog.isDebugEnabled()).thenReturn(true);
Mockito.when(auditLog.isTraceEnabled()).thenReturn(true);
StandardAuthorizer authorizer = createAuthorizerWithManyAcls();
ResourcePattern topicResource = new ResourcePattern(TOPIC, "alpha", LITERAL);
Action action = new Action(READ, topicResource, 1, false, logIfDenied);
MockAuthorizableRequestContext requestContext = new MockAuthorizableRequestContext.Builder()
.setPrincipal(new KafkaPrincipal(USER_TYPE, "bob"))
.setClientAddress(InetAddress.getByName("127.0.0.1"))
.build();
assertEquals(singletonList(DENIED), authorizer.authorize(requestContext, singletonList(action)));
String expectedAuditLog = "Principal = User:bob is Denied operation = READ " +
"from host = 127.0.0.1 on resource = Topic:LITERAL:alpha for request = Fetch " +
"with resourceRefCount = 1 based on rule MatchingAcl(acl=StandardAcl(resourceType=TOPIC, " +
"resourceName=alp, patternType=PREFIXED, principal=User:bob, host=*, operation=READ, " +
"permissionType=DENY))";
if (logIfDenied) {
Mockito.verify(auditLog).info(expectedAuditLog);
} else {
Mockito.verify(auditLog).trace(expectedAuditLog);
}
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testAllowAuditLogging(boolean logIfAllowed) throws Exception {
try (MockedStatic<LoggerFactory> mockedLoggerFactory = Mockito.mockStatic(LoggerFactory.class)) {
Logger otherLog = Mockito.mock(Logger.class);
Logger auditLog = Mockito.mock(Logger.class);
mockedLoggerFactory
.when(() -> LoggerFactory.getLogger("kafka.authorizer.logger"))
.thenReturn(auditLog);
mockedLoggerFactory
.when(() -> LoggerFactory.getLogger(Mockito.any(Class.class)))
.thenReturn(otherLog);
Mockito.when(auditLog.isDebugEnabled()).thenReturn(true);
Mockito.when(auditLog.isTraceEnabled()).thenReturn(true);
StandardAuthorizer authorizer = createAuthorizerWithManyAcls();
ResourcePattern topicResource = new ResourcePattern(TOPIC, "green1", LITERAL);
Action action = new Action(READ, topicResource, 1, logIfAllowed, false);
MockAuthorizableRequestContext requestContext = new MockAuthorizableRequestContext.Builder()
.setPrincipal(new KafkaPrincipal(USER_TYPE, "bob"))
.setClientAddress(InetAddress.getByName("127.0.0.1"))
.build();
assertEquals(singletonList(ALLOWED), authorizer.authorize(requestContext, singletonList(action)));
String expectedAuditLog = "Principal = User:bob is Allowed operation = READ " +
"from host = 127.0.0.1 on resource = Topic:LITERAL:green1 for request = Fetch " +
"with resourceRefCount = 1 based on rule MatchingAcl(acl=StandardAcl(resourceType=TOPIC, " +
"resourceName=green, patternType=PREFIXED, principal=User:bob, host=*, operation=READ, " +
"permissionType=ALLOW))";
if (logIfAllowed) {
Mockito.verify(auditLog).debug(expectedAuditLog);
} else {
Mockito.verify(auditLog).trace(expectedAuditLog);
}
}
}
}