KAFKA-14435: Fix `allow.everyone.if.no.acl.found` config behavior for StandardAuthorizer

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Colin Patrick McCabe <cmccabe@apache.org>
This commit is contained in:
Purshotam Chauhan 2022-12-06 00:13:04 +05:30 committed by Manikumar Reddy
parent d40561e90a
commit c6590ee28b
2 changed files with 90 additions and 28 deletions

View File

@ -105,7 +105,7 @@ public class StandardAuthorizerData {
/**
* The result to return if no ACLs match.
*/
private final DefaultRule defaultRule;
private final DefaultRule noAclRule;
/**
* Contains all of the current ACLs sorted by (resource type, resource name).
@ -146,7 +146,7 @@ public class StandardAuthorizerData {
this.aclMutator = aclMutator;
this.loadingComplete = loadingComplete;
this.superUsers = superUsers;
this.defaultRule = new DefaultRule(defaultResult);
this.noAclRule = new DefaultRule(defaultResult);
this.aclsByResource = aclsByResource;
this.aclsById = aclsById;
}
@ -157,7 +157,7 @@ public class StandardAuthorizerData {
newAclMutator,
loadingComplete,
superUsers,
defaultRule.result,
noAclRule.result,
aclsByResource,
aclsById);
}
@ -167,7 +167,7 @@ public class StandardAuthorizerData {
aclMutator,
newLoadingComplete,
superUsers,
defaultRule.result,
noAclRule.result,
aclsByResource,
aclsById);
}
@ -192,7 +192,7 @@ public class StandardAuthorizerData {
aclMutator,
loadingComplete,
superUsers,
defaultRule.result,
noAclRule.result,
aclsByResource,
aclsById);
log.info("Initialized with {} acl(s).", aclsById.size());
@ -239,7 +239,7 @@ public class StandardAuthorizerData {
}
AuthorizationResult defaultResult() {
return defaultRule.result;
return noAclRule.result;
}
int aclCount() {
@ -268,20 +268,12 @@ public class StandardAuthorizerData {
} else if (!loadingComplete) {
throw new AuthorizerNotReadyException();
} else {
MatchingAclRule aclRule = findAclRule(
rule = findAclRule(
matchingPrincipals(requestContext),
requestContext.clientAddress().getHostAddress(),
action
);
if (aclRule != null) {
rule = aclRule;
} else {
// If nothing matched, we return the default result.
rule = defaultRule;
}
}
logAuditMessage(principal, requestContext, action, rule);
return rule.result();
}
@ -344,7 +336,7 @@ public class StandardAuthorizerData {
}
}
private MatchingAclRule findAclRule(
private MatchingRule findAclRule(
Set<KafkaPrincipal> matchingPrincipals,
String host,
Action action
@ -370,7 +362,7 @@ public class StandardAuthorizerData {
// 8. rs=TOPIC rn=eeee pt=LITERAL
//
// Once we reached element 5, we would jump to element 7.
MatchingAclBuilder matchingAclBuilder = new MatchingAclBuilder();
MatchingRuleBuilder matchingRuleBuilder = new MatchingRuleBuilder(noAclRule);
StandardAcl exemplar = new StandardAcl(
action.resourcePattern().resourceType(),
action.resourcePattern().name(),
@ -379,9 +371,9 @@ public class StandardAuthorizerData {
"",
AclOperation.UNKNOWN,
AclPermissionType.UNKNOWN);
checkSection(action, exemplar, matchingPrincipals, host, matchingAclBuilder);
if (matchingAclBuilder.foundDeny()) {
return matchingAclBuilder.build();
checkSection(action, exemplar, matchingPrincipals, host, matchingRuleBuilder);
if (matchingRuleBuilder.foundDeny()) {
return matchingRuleBuilder.build();
}
// In addition to ACLs for this specific resource name, there can also be wildcard
@ -395,8 +387,8 @@ public class StandardAuthorizerData {
"",
AclOperation.UNKNOWN,
AclPermissionType.UNKNOWN);
checkSection(action, exemplar, matchingPrincipals, host, matchingAclBuilder);
return matchingAclBuilder.build();
checkSection(action, exemplar, matchingPrincipals, host, matchingRuleBuilder);
return matchingRuleBuilder.build();
}
static int matchesUpTo(
@ -418,7 +410,7 @@ public class StandardAuthorizerData {
StandardAcl exemplar,
Set<KafkaPrincipal> matchingPrincipals,
String host,
MatchingAclBuilder matchingAclBuilder
MatchingRuleBuilder matchingRuleBuilder
) {
String resourceName = action.resourcePattern().name();
NavigableSet<StandardAcl> tailSet = aclsByResource.tailSet(exemplar, true);
@ -455,11 +447,12 @@ public class StandardAuthorizerData {
iterator = tailSet.iterator();
continue;
}
matchingRuleBuilder.hasResourceAcls = true;
AuthorizationResult result = findResult(action, matchingPrincipals, host, acl);
if (ALLOWED == result) {
matchingAclBuilder.allowAcl = acl;
matchingRuleBuilder.allowAcl = acl;
} else if (DENIED == result) {
matchingAclBuilder.denyAcl = acl;
matchingRuleBuilder.denyAcl = acl;
return;
}
}
@ -630,21 +623,30 @@ public class StandardAuthorizerData {
}
}
private static class MatchingAclBuilder {
private static class MatchingRuleBuilder {
private static final DefaultRule DENY_RULE = new DefaultRule(DENIED);
private final DefaultRule noAclRule;
private StandardAcl denyAcl;
private StandardAcl allowAcl;
private boolean hasResourceAcls;
public MatchingRuleBuilder(DefaultRule noAclRule) {
this.noAclRule = noAclRule;
}
boolean foundDeny() {
return denyAcl != null;
}
MatchingAclRule build() {
MatchingRule build() {
if (denyAcl != null) {
return new MatchingAclRule(denyAcl, DENIED);
} else if (allowAcl != null) {
return new MatchingAclRule(allowAcl, ALLOWED);
} else if (!hasResourceAcls) {
return noAclRule;
} else {
return null;
return DENY_RULE;
}
}
}

View File

@ -167,6 +167,66 @@ public class StandardAuthorizerTest {
ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "false")));
}
@Test
public void testAllowEveryoneIfNoAclFoundConfigEnabled() throws Exception {
StandardAuthorizer authorizer = new StandardAuthorizer();
HashMap<String, Object> configs = new HashMap<>();
configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true");
authorizer.configure(configs);
authorizer.start(new AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
authorizer.completeInitialLoad();
List<StandardAclWithId> acls = singletonList(
withId(new StandardAcl(TOPIC, "topic1", LITERAL, "User:Alice", WILDCARD, READ, ALLOW))
);
acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl()));
assertEquals(singletonList(DENIED),
authorizer.authorize(
new MockAuthorizableRequestContext.Builder()
.setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob"))
.build(),
singletonList(newAction(READ, TOPIC, "topic1"))
));
assertEquals(singletonList(ALLOWED),
authorizer.authorize(
new MockAuthorizableRequestContext.Builder()
.setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob"))
.build(),
singletonList(newAction(READ, TOPIC, "topic2"))
));
}
@Test
public void testAllowEveryoneIfNoAclFoundConfigDisabled() throws Exception {
StandardAuthorizer authorizer = new StandardAuthorizer();
HashMap<String, Object> configs = new HashMap<>();
configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "false");
authorizer.configure(configs);
authorizer.start(new AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
authorizer.completeInitialLoad();
List<StandardAclWithId> acls = singletonList(
withId(new StandardAcl(TOPIC, "topic1", LITERAL, "User:Alice", WILDCARD, READ, ALLOW))
);
acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl()));
assertEquals(singletonList(DENIED),
authorizer.authorize(
new MockAuthorizableRequestContext.Builder()
.setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob"))
.build(),
singletonList(newAction(READ, TOPIC, "topic1"))
));
assertEquals(singletonList(DENIED),
authorizer.authorize(
new MockAuthorizableRequestContext.Builder()
.setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob"))
.build(),
singletonList(newAction(READ, TOPIC, "topic2"))
));
}
@Test
public void testConfigure() {
StandardAuthorizer authorizer = new StandardAuthorizer();