mirror of https://github.com/apache/kafka.git
KAFKA-16682 Rewrite JaasTestUtils by Java (#16579)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
5b9cbcf886
commit
ad08ec600f
|
@ -364,4 +364,6 @@
|
||||||
<suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"
|
<suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"
|
||||||
files="(ReplicaFetcherThreadBenchmark).java"/>
|
files="(ReplicaFetcherThreadBenchmark).java"/>
|
||||||
|
|
||||||
|
<!-- Add the new suppression rule for JaasTestUtils.java -->
|
||||||
|
<suppress checks="ImportControl" files="kafka/security/JaasTestUtils.java" />
|
||||||
</suppressions>
|
</suppressions>
|
||||||
|
|
|
@ -0,0 +1,137 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package kafka.security;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
public class JaasModule {
|
||||||
|
public static JaasModule zkDigestModule(boolean debug, Map<String, String> entries) {
|
||||||
|
String name = "org.apache.zookeeper.server.auth.DigestLoginModule";
|
||||||
|
return new JaasModule(
|
||||||
|
name,
|
||||||
|
debug,
|
||||||
|
entries
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static JaasModule krb5LoginModule(boolean useKeyTab, boolean storeKey, String keyTab, String principal, boolean debug, Optional<String> serviceName, boolean isIbmSecurity) {
|
||||||
|
String name = isIbmSecurity ? "com.ibm.security.auth.module.Krb5LoginModule" : "com.sun.security.auth.module.Krb5LoginModule";
|
||||||
|
|
||||||
|
Map<String, String> entries = new HashMap<>();
|
||||||
|
if (isIbmSecurity) {
|
||||||
|
entries.put("principal", principal);
|
||||||
|
entries.put("credsType", "both");
|
||||||
|
if (useKeyTab) {
|
||||||
|
entries.put("useKeytab", "file:" + keyTab);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
entries.put("useKeyTab", Boolean.toString(useKeyTab));
|
||||||
|
entries.put("storeKey", Boolean.toString(storeKey));
|
||||||
|
entries.put("keyTab", keyTab);
|
||||||
|
entries.put("principal", principal);
|
||||||
|
serviceName.ifPresent(s -> entries.put("serviceName", s));
|
||||||
|
}
|
||||||
|
|
||||||
|
return new JaasModule(
|
||||||
|
name,
|
||||||
|
debug,
|
||||||
|
entries
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static JaasModule oAuthBearerLoginModule(String username, boolean debug) {
|
||||||
|
String name = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule";
|
||||||
|
|
||||||
|
Map<String, String> entries = new HashMap<>();
|
||||||
|
entries.put("unsecuredLoginStringClaim_sub", username);
|
||||||
|
|
||||||
|
return new JaasModule(
|
||||||
|
name,
|
||||||
|
debug,
|
||||||
|
entries
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static JaasModule plainLoginModule(String username, String password) {
|
||||||
|
return plainLoginModule(username, password, false, Collections.emptyMap());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static JaasModule plainLoginModule(String username, String password, boolean debug, Map<String, String> validUsers) {
|
||||||
|
String name = "org.apache.kafka.common.security.plain.PlainLoginModule";
|
||||||
|
|
||||||
|
Map<String, String> entries = new HashMap<>();
|
||||||
|
entries.put("username", username);
|
||||||
|
entries.put("password", password);
|
||||||
|
validUsers.forEach((user, pass) -> entries.put("user_" + user, pass));
|
||||||
|
|
||||||
|
return new JaasModule(
|
||||||
|
name,
|
||||||
|
debug,
|
||||||
|
entries
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static JaasModule scramLoginModule(String username, String password) {
|
||||||
|
return scramLoginModule(username, password, false, Collections.emptyMap());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static JaasModule scramLoginModule(String username, String password, boolean debug, Map<String, String> tokenProps) {
|
||||||
|
String name = "org.apache.kafka.common.security.scram.ScramLoginModule";
|
||||||
|
|
||||||
|
Map<String, String> entries = new HashMap<>();
|
||||||
|
entries.put("username", username);
|
||||||
|
entries.put("password", password);
|
||||||
|
entries.putAll(tokenProps);
|
||||||
|
|
||||||
|
return new JaasModule(
|
||||||
|
name,
|
||||||
|
debug,
|
||||||
|
entries
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final String name;
|
||||||
|
|
||||||
|
private final boolean debug;
|
||||||
|
|
||||||
|
private final Map<String, String> entries;
|
||||||
|
|
||||||
|
private JaasModule(String name, boolean debug, Map<String, String> entries) {
|
||||||
|
this.name = name;
|
||||||
|
this.debug = debug;
|
||||||
|
this.entries = entries;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String name() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean debug() {
|
||||||
|
return debug;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%s required\n debug=%b\n %s;\n", name, debug, entries.entrySet().stream()
|
||||||
|
.map(e -> e.getKey() + "=\"" + e.getValue() + "\"")
|
||||||
|
.collect(Collectors.joining("\n ")));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,272 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package kafka.security;
|
||||||
|
|
||||||
|
import kafka.utils.TestUtils;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.ScramMechanism;
|
||||||
|
import org.apache.kafka.common.config.SaslConfigs;
|
||||||
|
import org.apache.kafka.common.utils.Java;
|
||||||
|
|
||||||
|
import java.io.BufferedWriter;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileWriter;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.apache.kafka.common.config.SaslConfigs.GSSAPI_MECHANISM;
|
||||||
|
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
|
||||||
|
import static org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM;
|
||||||
|
|
||||||
|
public class JaasTestUtils {
|
||||||
|
public static class JaasSection {
|
||||||
|
private final String contextName;
|
||||||
|
private final List<JaasModule> modules;
|
||||||
|
|
||||||
|
public JaasSection(String contextName, List<JaasModule> modules) {
|
||||||
|
this.contextName = contextName;
|
||||||
|
this.modules = modules;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<JaasModule> getModules() {
|
||||||
|
return modules;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getContextName() {
|
||||||
|
return contextName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%s {\n %s\n};\n",
|
||||||
|
contextName,
|
||||||
|
modules.stream().map(Object::toString).collect(Collectors.joining("\n ")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final boolean IS_IBM_SECURITY = Java.isIbmJdk() && !Java.isIbmJdkSemeru();
|
||||||
|
|
||||||
|
private static final String ZK_SERVER_CONTEXT_NAME = "Server";
|
||||||
|
private static final String ZK_CLIENT_CONTEXT_NAME = "Client";
|
||||||
|
private static final String ZK_USER_SUPER_PASSWD = "adminpasswd";
|
||||||
|
private static final String ZK_USER = "fpj";
|
||||||
|
private static final String ZK_USER_PASSWORD = "fpjsecret";
|
||||||
|
|
||||||
|
public static final String KAFKA_SERVER_CONTEXT_NAME = "KafkaServer";
|
||||||
|
public static final String KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME = "kafka";
|
||||||
|
private static final String KAFKA_SERVER_PRINCIPAL = KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME + "/localhost@EXAMPLE.COM";
|
||||||
|
public static final String KAFKA_CLIENT_CONTEXT_NAME = "KafkaClient";
|
||||||
|
public static final String KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME = "client";
|
||||||
|
private static final String KAFKA_CLIENT_PRINCIPAL = KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME + "@EXAMPLE.COM";
|
||||||
|
public static final String KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2 = "client2";
|
||||||
|
private static final String KAFKA_CLIENT_PRINCIPAL_2 = KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2 + "@EXAMPLE.COM";
|
||||||
|
|
||||||
|
public static final String KAFKA_PLAIN_USER = "plain-user";
|
||||||
|
private static final String KAFKA_PLAIN_PASSWORD = "plain-user-secret";
|
||||||
|
public static final String KAFKA_PLAIN_USER_2 = "plain-user2";
|
||||||
|
public static final String KAFKA_PLAIN_PASSWORD_2 = "plain-user2-secret";
|
||||||
|
public static final String KAFKA_PLAIN_ADMIN = "plain-admin";
|
||||||
|
private static final String KAFKA_PLAIN_ADMIN_PASSWORD = "plain-admin-secret";
|
||||||
|
|
||||||
|
public static final String KAFKA_SCRAM_USER = "scram-user";
|
||||||
|
public static final String KAFKA_SCRAM_PASSWORD = "scram-user-secret";
|
||||||
|
public static final String KAFKA_SCRAM_USER_2 = "scram-user2";
|
||||||
|
public static final String KAFKA_SCRAM_PASSWORD_2 = "scram-user2-secret";
|
||||||
|
public static final String KAFKA_SCRAM_ADMIN = "scram-admin";
|
||||||
|
public static final String KAFKA_SCRAM_ADMIN_PASSWORD = "scram-admin-secret";
|
||||||
|
|
||||||
|
public static final String KAFKA_OAUTH_BEARER_USER = "oauthbearer-user";
|
||||||
|
public static final String KAFKA_OAUTH_BEARER_USER_2 = "oauthbearer-user2";
|
||||||
|
public static final String KAFKA_OAUTH_BEARER_ADMIN = "oauthbearer-admin";
|
||||||
|
|
||||||
|
public static final String SERVICE_NAME = "kafka";
|
||||||
|
|
||||||
|
public static Properties saslConfigs(Optional<Properties> saslProperties) {
|
||||||
|
Properties result = saslProperties.orElse(new Properties());
|
||||||
|
if (IS_IBM_SECURITY && !result.containsKey(SaslConfigs.SASL_KERBEROS_SERVICE_NAME)) {
|
||||||
|
result.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, SERVICE_NAME);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static File writeJaasContextsToFile(List<JaasSection> jaasSections) throws IOException {
|
||||||
|
File jaasFile = TestUtils.tempFile();
|
||||||
|
writeToFile(jaasFile, jaasSections);
|
||||||
|
return jaasFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String scramClientLoginModule(String mechanism, String scramUser, String scramPassword) {
|
||||||
|
if (ScramMechanism.fromMechanismName(mechanism) == ScramMechanism.UNKNOWN) {
|
||||||
|
throw new IllegalArgumentException("Unsupported SCRAM mechanism " + mechanism);
|
||||||
|
}
|
||||||
|
return JaasModule.scramLoginModule(scramUser, scramPassword, false, new HashMap<>()).toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String clientLoginModule(String mechanism, Optional<File> keytabLocation, String serviceName) {
|
||||||
|
return kafkaClientModule(
|
||||||
|
mechanism,
|
||||||
|
keytabLocation,
|
||||||
|
KAFKA_CLIENT_PRINCIPAL,
|
||||||
|
KAFKA_PLAIN_USER,
|
||||||
|
KAFKA_PLAIN_PASSWORD,
|
||||||
|
KAFKA_SCRAM_USER,
|
||||||
|
KAFKA_SCRAM_PASSWORD,
|
||||||
|
KAFKA_OAUTH_BEARER_USER,
|
||||||
|
serviceName
|
||||||
|
).toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String clientLoginModule(String mechanism, Optional<File> keytabLocation) {
|
||||||
|
return clientLoginModule(mechanism, keytabLocation, SERVICE_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String adminLoginModule(String mechanism, Optional<File> keytabLocation, String serviceName) {
|
||||||
|
return kafkaClientModule(
|
||||||
|
mechanism,
|
||||||
|
keytabLocation,
|
||||||
|
KAFKA_SERVER_PRINCIPAL,
|
||||||
|
KAFKA_PLAIN_ADMIN,
|
||||||
|
KAFKA_PLAIN_ADMIN_PASSWORD,
|
||||||
|
KAFKA_SCRAM_ADMIN,
|
||||||
|
KAFKA_SCRAM_ADMIN_PASSWORD,
|
||||||
|
KAFKA_OAUTH_BEARER_ADMIN,
|
||||||
|
serviceName
|
||||||
|
).toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String adminLoginModule(String mechanism, Optional<File> keytabLocation) {
|
||||||
|
return adminLoginModule(mechanism, keytabLocation, SERVICE_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String tokenClientLoginModule(String tokenId, String password) {
|
||||||
|
Map<String, String> tokenProps = new HashMap<>();
|
||||||
|
tokenProps.put("tokenauth", "true");
|
||||||
|
return JaasModule.scramLoginModule(tokenId, password, false, tokenProps).toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static List<JaasSection> zkSections() {
|
||||||
|
Map<String, String> zkServerEntries = new HashMap<>();
|
||||||
|
zkServerEntries.put("user_super", ZK_USER_SUPER_PASSWD);
|
||||||
|
zkServerEntries.put("user_" + ZK_USER, ZK_USER_PASSWORD);
|
||||||
|
JaasSection zkServerSection = new JaasSection(ZK_SERVER_CONTEXT_NAME, Collections.singletonList(JaasModule.zkDigestModule(false, zkServerEntries)));
|
||||||
|
|
||||||
|
Map<String, String> zkClientEntries = new HashMap<>();
|
||||||
|
zkClientEntries.put("username", ZK_USER);
|
||||||
|
zkClientEntries.put("password", ZK_USER_PASSWORD);
|
||||||
|
JaasSection zkClientSection = new JaasSection(ZK_CLIENT_CONTEXT_NAME, Collections.singletonList(JaasModule.zkDigestModule(false, zkClientEntries)));
|
||||||
|
|
||||||
|
return Arrays.asList(zkServerSection, zkClientSection);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static JaasSection kafkaServerSection(String contextName, List<String> mechanisms, Optional<File> keytabLocation) {
|
||||||
|
List<JaasModule> modules = new ArrayList<>();
|
||||||
|
for (String mechanism : mechanisms) {
|
||||||
|
switch (mechanism) {
|
||||||
|
case GSSAPI_MECHANISM:
|
||||||
|
modules.add(JaasModule.krb5LoginModule(
|
||||||
|
true,
|
||||||
|
true,
|
||||||
|
keytabLocation.orElseThrow(() -> new IllegalArgumentException("Keytab location not specified for GSSAPI")).getAbsolutePath(),
|
||||||
|
KAFKA_SERVER_PRINCIPAL,
|
||||||
|
true,
|
||||||
|
Optional.of(SERVICE_NAME),
|
||||||
|
IS_IBM_SECURITY
|
||||||
|
));
|
||||||
|
break;
|
||||||
|
case PLAIN_MECHANISM:
|
||||||
|
Map<String, String> validUsers = new HashMap<>();
|
||||||
|
validUsers.put(KAFKA_PLAIN_ADMIN, KAFKA_PLAIN_ADMIN_PASSWORD);
|
||||||
|
validUsers.put(KAFKA_PLAIN_USER, KAFKA_PLAIN_PASSWORD);
|
||||||
|
validUsers.put(KAFKA_PLAIN_USER_2, KAFKA_PLAIN_PASSWORD_2);
|
||||||
|
modules.add(JaasModule.plainLoginModule(KAFKA_PLAIN_ADMIN, KAFKA_PLAIN_ADMIN_PASSWORD, false, validUsers));
|
||||||
|
break;
|
||||||
|
case OAUTHBEARER_MECHANISM:
|
||||||
|
modules.add(JaasModule.oAuthBearerLoginModule(KAFKA_OAUTH_BEARER_ADMIN, false));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
if (ScramMechanism.fromMechanismName(mechanism) != ScramMechanism.UNKNOWN) {
|
||||||
|
modules.add(JaasModule.scramLoginModule(KAFKA_SCRAM_ADMIN, KAFKA_SCRAM_ADMIN_PASSWORD, false, new HashMap<>()));
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException("Unsupported server mechanism " + mechanism);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new JaasSection(contextName, modules);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static JaasModule kafkaClientModule(String mechanism,
|
||||||
|
Optional<File> keytabLocation,
|
||||||
|
String clientPrincipal,
|
||||||
|
String plainUser,
|
||||||
|
String plainPassword,
|
||||||
|
String scramUser,
|
||||||
|
String scramPassword,
|
||||||
|
String oauthBearerUser,
|
||||||
|
String serviceName) {
|
||||||
|
switch (mechanism) {
|
||||||
|
case GSSAPI_MECHANISM:
|
||||||
|
return JaasModule.krb5LoginModule(
|
||||||
|
true,
|
||||||
|
true,
|
||||||
|
keytabLocation.orElseThrow(() -> new IllegalArgumentException("Keytab location not specified for GSSAPI")).getAbsolutePath(),
|
||||||
|
clientPrincipal,
|
||||||
|
true,
|
||||||
|
Optional.of(serviceName),
|
||||||
|
IS_IBM_SECURITY
|
||||||
|
);
|
||||||
|
case PLAIN_MECHANISM:
|
||||||
|
return JaasModule.plainLoginModule(plainUser, plainPassword, false, new HashMap<>());
|
||||||
|
case OAUTHBEARER_MECHANISM:
|
||||||
|
return JaasModule.oAuthBearerLoginModule(oauthBearerUser, false);
|
||||||
|
default:
|
||||||
|
if (ScramMechanism.fromMechanismName(mechanism) != ScramMechanism.UNKNOWN) {
|
||||||
|
return JaasModule.scramLoginModule(scramUser, scramPassword, false, new HashMap<>());
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException("Unsupported client mechanism " + mechanism);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static JaasSection kafkaClientSection(Optional<String> mechanism, Optional<File> keytabLocation) {
|
||||||
|
return new JaasSection(KAFKA_CLIENT_CONTEXT_NAME,
|
||||||
|
mechanism.map(m -> kafkaClientModule(m,
|
||||||
|
keytabLocation,
|
||||||
|
KAFKA_CLIENT_PRINCIPAL_2,
|
||||||
|
KAFKA_PLAIN_USER_2,
|
||||||
|
KAFKA_PLAIN_PASSWORD_2,
|
||||||
|
KAFKA_SCRAM_USER_2,
|
||||||
|
KAFKA_SCRAM_PASSWORD_2,
|
||||||
|
KAFKA_OAUTH_BEARER_USER_2,
|
||||||
|
SERVICE_NAME)
|
||||||
|
).map(Collections::singletonList).orElse(Collections.emptyList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void writeToFile(File file, List<JaasSection> jaasSections) throws IOException {
|
||||||
|
try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
|
||||||
|
writer.write(String.join("", jaasSections.stream().map(Object::toString).toArray(String[]::new)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -14,29 +14,29 @@
|
||||||
|
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
import java.{lang, util}
|
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
|
||||||
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
|
|
||||||
import java.util.Properties
|
|
||||||
import kafka.api.GroupedUserPrincipalBuilder._
|
import kafka.api.GroupedUserPrincipalBuilder._
|
||||||
import kafka.api.GroupedUserQuotaCallback._
|
import kafka.api.GroupedUserQuotaCallback._
|
||||||
|
import kafka.security.{JaasModule, JaasTestUtils}
|
||||||
import kafka.server._
|
import kafka.server._
|
||||||
import kafka.utils.JaasTestUtils.ScramLoginModule
|
import kafka.utils.{Logging, TestUtils}
|
||||||
import kafka.utils.{JaasTestUtils, Logging, TestUtils}
|
|
||||||
import kafka.zk.ConfigEntityChangeNotificationZNode
|
import kafka.zk.ConfigEntityChangeNotificationZNode
|
||||||
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
|
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
|
||||||
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
|
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
|
||||||
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
|
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
|
||||||
import org.apache.kafka.common.{Cluster, Reconfigurable}
|
|
||||||
import org.apache.kafka.common.config.SaslConfigs
|
import org.apache.kafka.common.config.SaslConfigs
|
||||||
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
||||||
import org.apache.kafka.common.network.ListenerName
|
import org.apache.kafka.common.network.ListenerName
|
||||||
import org.apache.kafka.common.security.auth._
|
import org.apache.kafka.common.security.auth._
|
||||||
import org.apache.kafka.server.config.{ServerConfigs, QuotaConfigs}
|
import org.apache.kafka.common.{Cluster, Reconfigurable}
|
||||||
|
import org.apache.kafka.server.config.{QuotaConfigs, ServerConfigs}
|
||||||
import org.apache.kafka.server.quota._
|
import org.apache.kafka.server.quota._
|
||||||
import org.junit.jupiter.api.Assertions._
|
import org.junit.jupiter.api.Assertions._
|
||||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
|
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
|
||||||
|
|
||||||
|
import java.util.Properties
|
||||||
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
|
||||||
|
import java.{lang, util}
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
|
|
||||||
|
@ -62,7 +62,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
override def setUp(testInfo: TestInfo): Unit = {
|
override def setUp(testInfo: TestInfo): Unit = {
|
||||||
startSasl(jaasSections(kafkaServerSaslMechanisms, Some("SCRAM-SHA-256"), KafkaSasl, JaasTestUtils.KafkaServerContextName))
|
startSasl(jaasSections(kafkaServerSaslMechanisms, Some("SCRAM-SHA-256"), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
|
||||||
this.serverConfig.setProperty(QuotaConfigs.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, classOf[GroupedUserQuotaCallback].getName)
|
this.serverConfig.setProperty(QuotaConfigs.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, classOf[GroupedUserQuotaCallback].getName)
|
||||||
this.serverConfig.setProperty(s"${listenerName.configPrefix}${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG}",
|
this.serverConfig.setProperty(s"${listenerName.configPrefix}${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG}",
|
||||||
classOf[GroupedUserPrincipalBuilder].getName)
|
classOf[GroupedUserPrincipalBuilder].getName)
|
||||||
|
@ -70,7 +70,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
||||||
super.setUp(testInfo)
|
super.setUp(testInfo)
|
||||||
|
|
||||||
producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG,
|
producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG,
|
||||||
ScramLoginModule(JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword).toString)
|
JaasModule.scramLoginModule(JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD).toString)
|
||||||
producerWithoutQuota = createProducer()
|
producerWithoutQuota = createProducer()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,7 +85,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
||||||
override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = {
|
override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = {
|
||||||
super.configureSecurityBeforeServersStart(testInfo)
|
super.configureSecurityBeforeServersStart(testInfo)
|
||||||
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
|
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
|
||||||
createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword)
|
createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -188,7 +188,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
||||||
config.put(key.toString, value)
|
config.put(key.toString, value)
|
||||||
}
|
}
|
||||||
config.put(SaslConfigs.SASL_JAAS_CONFIG,
|
config.put(SaslConfigs.SASL_JAAS_CONFIG,
|
||||||
ScramLoginModule(JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword).toString)
|
JaasModule.scramLoginModule(JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD).toString)
|
||||||
val adminClient = Admin.create(config)
|
val adminClient = Admin.create(config)
|
||||||
adminClients += adminClient
|
adminClients += adminClient
|
||||||
adminClient
|
adminClient
|
||||||
|
@ -220,12 +220,13 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
||||||
val consumerClientId = s"$user:consumer-client-id"
|
val consumerClientId = s"$user:consumer-client-id"
|
||||||
|
|
||||||
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
|
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
|
||||||
producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, ScramLoginModule(user, password).toString)
|
|
||||||
|
producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, JaasModule.scramLoginModule(user, password).toString)
|
||||||
|
|
||||||
consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
|
consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
|
||||||
consumerConfig.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
|
consumerConfig.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
|
||||||
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, s"$user-group")
|
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, s"$user-group")
|
||||||
consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, ScramLoginModule(user, password).toString)
|
consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, JaasModule.scramLoginModule(user, password).toString)
|
||||||
|
|
||||||
GroupedUser(user, userGroup, topic, servers(leader), producerClientId, consumerClientId,
|
GroupedUser(user, userGroup, topic, servers(leader), producerClientId, consumerClientId,
|
||||||
createProducer(), createConsumer(), adminClient)
|
createProducer(), createConsumer(), adminClient)
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
*/
|
*/
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
|
import kafka.security.JaasTestUtils
|
||||||
|
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
import kafka.zk.ConfigEntityChangeNotificationZNode
|
import kafka.zk.ConfigEntityChangeNotificationZNode
|
||||||
|
@ -43,11 +45,11 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
|
||||||
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
|
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
|
||||||
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
|
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
|
||||||
|
|
||||||
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramUser)
|
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_SCRAM_USER)
|
||||||
private val clientPassword = JaasTestUtils.KafkaScramPassword
|
private val clientPassword = JaasTestUtils.KAFKA_SCRAM_PASSWORD
|
||||||
|
|
||||||
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramAdmin)
|
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_SCRAM_ADMIN)
|
||||||
protected val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword
|
protected val kafkaPassword = JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD
|
||||||
|
|
||||||
protected val privilegedAdminClientConfig = new Properties()
|
protected val privilegedAdminClientConfig = new Properties()
|
||||||
|
|
||||||
|
@ -71,7 +73,7 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
|
||||||
override def addFormatterSettings(formatter: Formatter): Unit = {
|
override def addFormatterSettings(formatter: Formatter): Unit = {
|
||||||
formatter.setClusterId("XcZZOzUqS4yHOjhMQB6JLQ")
|
formatter.setClusterId("XcZZOzUqS4yHOjhMQB6JLQ")
|
||||||
formatter.setScramArguments(
|
formatter.setScramArguments(
|
||||||
List(s"SCRAM-SHA-256=[name=${JaasTestUtils.KafkaScramAdmin},password=${JaasTestUtils.KafkaScramAdminPassword}]").asJava)
|
List(s"SCRAM-SHA-256=[name=${JaasTestUtils.KAFKA_SCRAM_ADMIN},password=${JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD}]").asJava)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def createPrivilegedAdminClient(): Admin = createScramAdminClient(kafkaClientSaslMechanism, kafkaPrincipal.getName, kafkaPassword)
|
override def createPrivilegedAdminClient(): Admin = createScramAdminClient(kafkaClientSaslMechanism, kafkaPrincipal.getName, kafkaPassword)
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
|
import kafka.security.JaasTestUtils
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
import org.apache.kafka.clients.admin.{Admin, CreateDelegationTokenOptions, DescribeDelegationTokenOptions}
|
import org.apache.kafka.clients.admin.{Admin, CreateDelegationTokenOptions, DescribeDelegationTokenOptions}
|
||||||
import org.apache.kafka.common.acl._
|
import org.apache.kafka.common.acl._
|
||||||
|
@ -51,8 +52,8 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest extends DelegationTokenE
|
||||||
|
|
||||||
override def createDelegationTokenOptions(): CreateDelegationTokenOptions = new CreateDelegationTokenOptions().owner(clientPrincipal)
|
override def createDelegationTokenOptions(): CreateDelegationTokenOptions = new CreateDelegationTokenOptions().owner(clientPrincipal)
|
||||||
|
|
||||||
private val tokenRequesterPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramUser2)
|
private val tokenRequesterPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_SCRAM_USER_2)
|
||||||
private val tokenRequesterPassword = JaasTestUtils.KafkaScramPassword2
|
private val tokenRequesterPassword = JaasTestUtils.KAFKA_SCRAM_PASSWORD_2
|
||||||
|
|
||||||
private val otherClientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "other-client-principal")
|
private val otherClientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "other-client-principal")
|
||||||
private val otherClientPassword = "other-client-password"
|
private val otherClientPassword = "other-client-password"
|
||||||
|
|
|
@ -12,10 +12,11 @@
|
||||||
*/
|
*/
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
|
import kafka.security.JaasTestUtils
|
||||||
import java.util
|
import java.util
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
import kafka.security.authorizer.AclAuthorizer
|
import kafka.security.authorizer.AclAuthorizer
|
||||||
import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
|
import kafka.utils.{CoreUtils, TestUtils}
|
||||||
import org.apache.kafka.clients.admin._
|
import org.apache.kafka.clients.admin._
|
||||||
import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, CLUSTER_ACTION, DELETE, DESCRIBE}
|
import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, CLUSTER_ACTION, DELETE, DESCRIBE}
|
||||||
import org.apache.kafka.common.acl.AclPermissionType.ALLOW
|
import org.apache.kafka.common.acl.AclPermissionType.ALLOW
|
||||||
|
@ -40,27 +41,27 @@ object DescribeAuthorizedOperationsTest {
|
||||||
|
|
||||||
val Group1Acl = new AclBinding(
|
val Group1Acl = new AclBinding(
|
||||||
new ResourcePattern(ResourceType.GROUP, Group1, PatternType.LITERAL),
|
new ResourcePattern(ResourceType.GROUP, Group1, PatternType.LITERAL),
|
||||||
accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALL))
|
accessControlEntry(JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2, ALL))
|
||||||
|
|
||||||
val Group2Acl = new AclBinding(
|
val Group2Acl = new AclBinding(
|
||||||
new ResourcePattern(ResourceType.GROUP, Group2, PatternType.LITERAL),
|
new ResourcePattern(ResourceType.GROUP, Group2, PatternType.LITERAL),
|
||||||
accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DESCRIBE))
|
accessControlEntry(JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2, DESCRIBE))
|
||||||
|
|
||||||
val Group3Acl = new AclBinding(
|
val Group3Acl = new AclBinding(
|
||||||
new ResourcePattern(ResourceType.GROUP, Group3, PatternType.LITERAL),
|
new ResourcePattern(ResourceType.GROUP, Group3, PatternType.LITERAL),
|
||||||
accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DELETE))
|
accessControlEntry(JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2, DELETE))
|
||||||
|
|
||||||
val ClusterAllAcl = new AclBinding(
|
val ClusterAllAcl = new AclBinding(
|
||||||
new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL),
|
new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL),
|
||||||
accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALL))
|
accessControlEntry(JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2, ALL))
|
||||||
|
|
||||||
val Topic1Acl = new AclBinding(
|
val Topic1Acl = new AclBinding(
|
||||||
new ResourcePattern(ResourceType.TOPIC, Topic1, PatternType.LITERAL),
|
new ResourcePattern(ResourceType.TOPIC, Topic1, PatternType.LITERAL),
|
||||||
accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALL))
|
accessControlEntry(JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2, ALL))
|
||||||
|
|
||||||
val Topic2All = new AclBinding(
|
val Topic2All = new AclBinding(
|
||||||
new ResourcePattern(ResourceType.TOPIC, Topic2, PatternType.LITERAL),
|
new ResourcePattern(ResourceType.TOPIC, Topic2, PatternType.LITERAL),
|
||||||
accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DELETE))
|
accessControlEntry(JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2, DELETE))
|
||||||
|
|
||||||
private def accessControlEntry(
|
private def accessControlEntry(
|
||||||
userName: String,
|
userName: String,
|
||||||
|
@ -93,11 +94,11 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
|
||||||
authorizer.configure(this.configs.head.originals())
|
authorizer.configure(this.configs.head.originals())
|
||||||
val result = authorizer.createAcls(null, List(
|
val result = authorizer.createAcls(null, List(
|
||||||
new AclBinding(clusterResource, accessControlEntry(
|
new AclBinding(clusterResource, accessControlEntry(
|
||||||
JaasTestUtils.KafkaServerPrincipalUnqualifiedName, CLUSTER_ACTION)),
|
JaasTestUtils.KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME, CLUSTER_ACTION)),
|
||||||
new AclBinding(clusterResource, accessControlEntry(
|
new AclBinding(clusterResource, accessControlEntry(
|
||||||
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALTER)),
|
JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2, ALTER)),
|
||||||
new AclBinding(topicResource, accessControlEntry(
|
new AclBinding(topicResource, accessControlEntry(
|
||||||
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DESCRIBE))
|
JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2, DESCRIBE))
|
||||||
).asJava)
|
).asJava)
|
||||||
result.asScala.map(_.toCompletableFuture.get).foreach(result => assertFalse(result.exception.isPresent))
|
result.asScala.map(_.toCompletableFuture.get).foreach(result => assertFalse(result.exception.isPresent))
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -107,7 +108,7 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
override def setUp(testInfo: TestInfo): Unit = {
|
override def setUp(testInfo: TestInfo): Unit = {
|
||||||
startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName))
|
startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
|
||||||
super.setUp(testInfo)
|
super.setUp(testInfo)
|
||||||
TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
|
TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
|
||||||
client = Admin.create(createConfig())
|
client = Admin.create(createConfig())
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
import kafka.api.GroupEndToEndAuthorizationTest._
|
import kafka.api.GroupEndToEndAuthorizationTest._
|
||||||
import kafka.utils.JaasTestUtils
|
import kafka.security.JaasTestUtils
|
||||||
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
||||||
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, SaslAuthenticationContext}
|
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, SaslAuthenticationContext}
|
||||||
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
|
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
|
||||||
|
@ -29,7 +29,7 @@ object GroupEndToEndAuthorizationTest {
|
||||||
override def build(context: AuthenticationContext): KafkaPrincipal = {
|
override def build(context: AuthenticationContext): KafkaPrincipal = {
|
||||||
context match {
|
context match {
|
||||||
case ctx: SaslAuthenticationContext =>
|
case ctx: SaslAuthenticationContext =>
|
||||||
if (ctx.server.getAuthorizationID == JaasTestUtils.KafkaScramUser)
|
if (ctx.server.getAuthorizationID == JaasTestUtils.KAFKA_SCRAM_USER)
|
||||||
new KafkaPrincipal(GroupPrincipalType, ClientGroup)
|
new KafkaPrincipal(GroupPrincipalType, ClientGroup)
|
||||||
else
|
else
|
||||||
new KafkaPrincipal(GroupPrincipalType, ctx.server.getAuthorizationID)
|
new KafkaPrincipal(GroupPrincipalType, ctx.server.getAuthorizationID)
|
||||||
|
@ -42,6 +42,6 @@ object GroupEndToEndAuthorizationTest {
|
||||||
|
|
||||||
class GroupEndToEndAuthorizationTest extends SaslScramSslEndToEndAuthorizationTest {
|
class GroupEndToEndAuthorizationTest extends SaslScramSslEndToEndAuthorizationTest {
|
||||||
override val clientPrincipal = new KafkaPrincipal(GroupPrincipalType, ClientGroup)
|
override val clientPrincipal = new KafkaPrincipal(GroupPrincipalType, ClientGroup)
|
||||||
override val kafkaPrincipal = new KafkaPrincipal(GroupPrincipalType, JaasTestUtils.KafkaScramAdmin)
|
override val kafkaPrincipal = new KafkaPrincipal(GroupPrincipalType, JaasTestUtils.KAFKA_SCRAM_ADMIN)
|
||||||
this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[GroupPrincipalBuilder].getName)
|
this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[GroupPrincipalBuilder].getName)
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,27 +12,27 @@
|
||||||
*/
|
*/
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
import java.util.{Locale, Properties}
|
|
||||||
import kafka.server.KafkaServer
|
|
||||||
import kafka.utils.{JaasTestUtils, TestUtils}
|
|
||||||
import com.yammer.metrics.core.{Gauge, Histogram, Meter}
|
import com.yammer.metrics.core.{Gauge, Histogram, Meter}
|
||||||
|
import kafka.security.JaasTestUtils
|
||||||
|
import kafka.server.KafkaServer
|
||||||
|
import kafka.utils.TestUtils
|
||||||
import org.apache.kafka.clients.consumer.Consumer
|
import org.apache.kafka.clients.consumer.Consumer
|
||||||
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
|
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
|
||||||
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
|
|
||||||
import org.apache.kafka.common.config.{SaslConfigs, TopicConfig}
|
import org.apache.kafka.common.config.{SaslConfigs, TopicConfig}
|
||||||
import org.apache.kafka.common.errors.{InvalidTopicException, UnknownTopicOrPartitionException}
|
import org.apache.kafka.common.errors.{InvalidTopicException, UnknownTopicOrPartitionException}
|
||||||
import org.apache.kafka.common.network.ListenerName
|
import org.apache.kafka.common.network.ListenerName
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||||
import org.apache.kafka.common.security.authenticator.TestJaasConfig
|
import org.apache.kafka.common.security.authenticator.TestJaasConfig
|
||||||
import org.apache.kafka.server.config.ZkConfigs
|
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
|
||||||
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
|
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs, ZkConfigs}
|
||||||
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics}
|
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics}
|
||||||
import org.apache.kafka.server.metrics.KafkaYammerMetrics
|
import org.apache.kafka.server.metrics.KafkaYammerMetrics
|
||||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
|
|
||||||
import org.junit.jupiter.api.Assertions._
|
import org.junit.jupiter.api.Assertions._
|
||||||
|
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
|
||||||
import org.junit.jupiter.params.ParameterizedTest
|
import org.junit.jupiter.params.ParameterizedTest
|
||||||
import org.junit.jupiter.params.provider.ValueSource
|
import org.junit.jupiter.params.provider.ValueSource
|
||||||
|
|
||||||
|
import java.util.{Locale, Properties}
|
||||||
import scala.annotation.nowarn
|
import scala.annotation.nowarn
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
|
|
||||||
|
@ -44,7 +44,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
|
||||||
private val kafkaClientSaslMechanism = "PLAIN"
|
private val kafkaClientSaslMechanism = "PLAIN"
|
||||||
private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
|
private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
|
||||||
private val kafkaServerJaasEntryName =
|
private val kafkaServerJaasEntryName =
|
||||||
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
|
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME}"
|
||||||
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "false")
|
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "false")
|
||||||
this.serverConfig.setProperty(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false")
|
this.serverConfig.setProperty(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false")
|
||||||
this.serverConfig.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "2.8")
|
this.serverConfig.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "2.8")
|
||||||
|
|
|
@ -12,6 +12,7 @@
|
||||||
*/
|
*/
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
|
import kafka.security.JaasTestUtils
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.util.{Collections, Properties}
|
import java.util.{Collections, Properties}
|
||||||
import java.util.concurrent.{ExecutionException, TimeUnit}
|
import java.util.concurrent.{ExecutionException, TimeUnit}
|
||||||
|
@ -23,7 +24,7 @@ import org.apache.kafka.common.{KafkaException, TopicPartition}
|
||||||
import org.apache.kafka.common.errors.SaslAuthenticationException
|
import org.apache.kafka.common.errors.SaslAuthenticationException
|
||||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
|
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
|
||||||
import org.junit.jupiter.api.Assertions._
|
import org.junit.jupiter.api.Assertions._
|
||||||
import kafka.utils.{JaasTestUtils, TestUtils}
|
import kafka.utils.TestUtils
|
||||||
import kafka.zk.ConfigEntityChangeNotificationZNode
|
import kafka.zk.ConfigEntityChangeNotificationZNode
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||||
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
|
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
|
||||||
|
@ -54,18 +55,18 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
|
||||||
super.configureSecurityBeforeServersStart(testInfo)
|
super.configureSecurityBeforeServersStart(testInfo)
|
||||||
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
|
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
|
||||||
// Create broker credentials before starting brokers
|
// Create broker credentials before starting brokers
|
||||||
createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword)
|
createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def createPrivilegedAdminClient() = {
|
override def createPrivilegedAdminClient() = {
|
||||||
createAdminClient(bootstrapServers(), securityProtocol, trustStoreFile, clientSaslProperties,
|
createAdminClient(bootstrapServers(), securityProtocol, trustStoreFile, clientSaslProperties,
|
||||||
kafkaClientSaslMechanism, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword)
|
kafkaClientSaslMechanism, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
override def setUp(testInfo: TestInfo): Unit = {
|
override def setUp(testInfo: TestInfo): Unit = {
|
||||||
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both,
|
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both,
|
||||||
JaasTestUtils.KafkaServerContextName))
|
JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
|
||||||
super.setUp(testInfo)
|
super.setUp(testInfo)
|
||||||
createTopic(topic, numPartitions, brokerCount)
|
createTopic(topic, numPartitions, brokerCount)
|
||||||
}
|
}
|
||||||
|
@ -148,7 +149,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
|
||||||
try {
|
try {
|
||||||
val response = adminClient.describeTopics(Collections.singleton(topic)).allTopicNames.get
|
val response = adminClient.describeTopics(Collections.singleton(topic)).allTopicNames.get
|
||||||
assertEquals(1, response.size)
|
assertEquals(1, response.size)
|
||||||
response.forEach { (topic, description) =>
|
response.forEach { (_, description) =>
|
||||||
assertEquals(numPartitions, description.partitions.size)
|
assertEquals(numPartitions, description.partitions.size)
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
|
@ -167,7 +168,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def createClientCredential(): Unit = {
|
private def createClientCredential(): Unit = {
|
||||||
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2)
|
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KAFKA_SCRAM_USER_2, JaasTestUtils.KAFKA_SCRAM_PASSWORD_2)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def sendOneRecord(producer: KafkaProducer[Array[Byte], Array[Byte]], maxWaitMs: Long = 15000): Unit = {
|
private def sendOneRecord(producer: KafkaProducer[Array[Byte], Array[Byte]], maxWaitMs: Long = 15000): Unit = {
|
||||||
|
|
|
@ -16,20 +16,18 @@
|
||||||
*/
|
*/
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
|
import kafka.security.JaasTestUtils
|
||||||
import kafka.security.authorizer.AclAuthorizer
|
import kafka.security.authorizer.AclAuthorizer
|
||||||
import kafka.utils.JaasTestUtils
|
|
||||||
import org.apache.kafka.common.config.SslConfigs
|
import org.apache.kafka.common.config.SslConfigs
|
||||||
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
||||||
import org.apache.kafka.common.security.auth._
|
import org.apache.kafka.common.security.auth._
|
||||||
import org.junit.jupiter.api.Assertions.assertNull
|
import org.junit.jupiter.api.Assertions.assertNull
|
||||||
|
|
||||||
import scala.collection.immutable.List
|
|
||||||
|
|
||||||
class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
|
class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
|
||||||
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
|
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
|
||||||
JaasTestUtils.KafkaClientPrincipalUnqualifiedName)
|
JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME)
|
||||||
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
|
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
|
||||||
JaasTestUtils.KafkaServerPrincipalUnqualifiedName)
|
JaasTestUtils.KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME)
|
||||||
|
|
||||||
override protected def kafkaClientSaslMechanism = "GSSAPI"
|
override protected def kafkaClientSaslMechanism = "GSSAPI"
|
||||||
override protected def kafkaServerSaslMechanisms = List("GSSAPI")
|
override protected def kafkaServerSaslMechanisms = List("GSSAPI")
|
||||||
|
|
|
@ -12,10 +12,11 @@
|
||||||
*/
|
*/
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
|
import kafka.security.JaasTestUtils
|
||||||
import kafka.utils.{JaasTestUtils, TestUtils}
|
import kafka.utils.TestUtils
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||||
import org.apache.kafka.server.config.ZkConfigs
|
import org.apache.kafka.server.config.ZkConfigs
|
||||||
|
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
|
||||||
import org.junit.jupiter.params.ParameterizedTest
|
import org.junit.jupiter.params.ParameterizedTest
|
||||||
import org.junit.jupiter.params.provider.ValueSource
|
import org.junit.jupiter.params.provider.ValueSource
|
||||||
|
|
||||||
|
@ -34,7 +35,7 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup {
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
override def setUp(testInfo: TestInfo): Unit = {
|
override def setUp(testInfo: TestInfo): Unit = {
|
||||||
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both,
|
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both,
|
||||||
JaasTestUtils.KafkaServerContextName))
|
JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
|
||||||
super.setUp(testInfo)
|
super.setUp(testInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,12 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
import kafka.utils.JaasTestUtils
|
import kafka.security.JaasTestUtils
|
||||||
import org.apache.kafka.common.security.auth._
|
import org.apache.kafka.common.security.auth._
|
||||||
|
|
||||||
class SaslOAuthBearerSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
|
class SaslOAuthBearerSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
|
||||||
override protected def kafkaClientSaslMechanism = "OAUTHBEARER"
|
override protected def kafkaClientSaslMechanism = "OAUTHBEARER"
|
||||||
override protected def kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
|
override protected def kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
|
||||||
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaOAuthBearerUser)
|
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_OAUTH_BEARER_USER)
|
||||||
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaOAuthBearerAdmin)
|
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_OAUTH_BEARER_ADMIN)
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,8 +12,9 @@
|
||||||
*/
|
*/
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
|
import kafka.security.JaasTestUtils
|
||||||
|
import kafka.utils.TestUtils
|
||||||
import kafka.utils.TestUtils.{isAclUnsecure, secureZkPaths}
|
import kafka.utils.TestUtils.{isAclUnsecure, secureZkPaths}
|
||||||
import kafka.utils.{JaasTestUtils, TestUtils}
|
|
||||||
import org.apache.kafka.common.network.ListenerName
|
import org.apache.kafka.common.network.ListenerName
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||||
import org.apache.kafka.server.config.ZkConfigs
|
import org.apache.kafka.server.config.ZkConfigs
|
||||||
|
@ -28,7 +29,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
|
||||||
private val kafkaClientSaslMechanism = "PLAIN"
|
private val kafkaClientSaslMechanism = "PLAIN"
|
||||||
private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
|
private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
|
||||||
private val kafkaServerJaasEntryName =
|
private val kafkaServerJaasEntryName =
|
||||||
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
|
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME}"
|
||||||
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "false")
|
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "false")
|
||||||
// disable secure acls of zkClient in QuorumTestHarness
|
// disable secure acls of zkClient in QuorumTestHarness
|
||||||
override protected def zkAclsEnabled = Some(false)
|
override protected def zkAclsEnabled = Some(false)
|
||||||
|
|
|
@ -16,7 +16,8 @@
|
||||||
*/
|
*/
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
import kafka.utils.JaasTestUtils._
|
import kafka.security.JaasModule
|
||||||
|
import kafka.security.JaasTestUtils._
|
||||||
import kafka.utils.TestUtils
|
import kafka.utils.TestUtils
|
||||||
import kafka.utils.TestUtils.isAclSecure
|
import kafka.utils.TestUtils.isAclSecure
|
||||||
import kafka.zk.ZkData
|
import kafka.zk.ZkData
|
||||||
|
@ -30,11 +31,12 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
|
|
||||||
import java.security.AccessController
|
import java.security.AccessController
|
||||||
import java.util.Properties
|
import java.util.{Collections, Properties}
|
||||||
import javax.security.auth.Subject
|
import javax.security.auth.Subject
|
||||||
import javax.security.auth.callback._
|
import javax.security.auth.callback._
|
||||||
import javax.security.auth.login.AppConfigurationEntry
|
import javax.security.auth.login.AppConfigurationEntry
|
||||||
import scala.collection.Seq
|
import scala.collection.Seq
|
||||||
|
import scala.jdk.CollectionConverters._
|
||||||
|
|
||||||
object SaslPlainSslEndToEndAuthorizationTest {
|
object SaslPlainSslEndToEndAuthorizationTest {
|
||||||
|
|
||||||
|
@ -50,9 +52,9 @@ object SaslPlainSslEndToEndAuthorizationTest {
|
||||||
assertTrue(sslPrincipal.endsWith(s"CN=${TestUtils.SslCertificateCn}"), s"Unexpected SSL principal $sslPrincipal")
|
assertTrue(sslPrincipal.endsWith(s"CN=${TestUtils.SslCertificateCn}"), s"Unexpected SSL principal $sslPrincipal")
|
||||||
|
|
||||||
saslContext.server.getAuthorizationID match {
|
saslContext.server.getAuthorizationID match {
|
||||||
case KafkaPlainAdmin =>
|
case KAFKA_PLAIN_ADMIN =>
|
||||||
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, controllerPrincipalName)
|
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, controllerPrincipalName)
|
||||||
case KafkaPlainUser =>
|
case KAFKA_PLAIN_USER =>
|
||||||
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
|
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
|
||||||
case _ =>
|
case _ =>
|
||||||
KafkaPrincipal.ANONYMOUS
|
KafkaPrincipal.ANONYMOUS
|
||||||
|
@ -61,9 +63,9 @@ object SaslPlainSslEndToEndAuthorizationTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
object Credentials {
|
object Credentials {
|
||||||
val allUsers = Map(KafkaPlainUser -> "user1-password",
|
val allUsers = Map(KAFKA_PLAIN_USER -> "user1-password",
|
||||||
KafkaPlainUser2 -> KafkaPlainPassword2,
|
KAFKA_PLAIN_USER_2 -> KAFKA_PLAIN_PASSWORD_2,
|
||||||
KafkaPlainAdmin -> "broker-password")
|
KAFKA_PLAIN_ADMIN -> "broker-password")
|
||||||
}
|
}
|
||||||
|
|
||||||
class TestServerCallbackHandler extends AuthenticateCallbackHandler {
|
class TestServerCallbackHandler extends AuthenticateCallbackHandler {
|
||||||
|
@ -91,7 +93,7 @@ object SaslPlainSslEndToEndAuthorizationTest {
|
||||||
callback match {
|
callback match {
|
||||||
case nameCallback: NameCallback => nameCallback.setName(username)
|
case nameCallback: NameCallback => nameCallback.setName(username)
|
||||||
case passwordCallback: PasswordCallback =>
|
case passwordCallback: PasswordCallback =>
|
||||||
if (username == KafkaPlainUser || username == KafkaPlainAdmin)
|
if (username == KAFKA_PLAIN_USER || username == KAFKA_PLAIN_ADMIN)
|
||||||
passwordCallback.setPassword(Credentials.allUsers(username).toCharArray)
|
passwordCallback.setPassword(Credentials.allUsers(username).toCharArray)
|
||||||
case _ => throw new UnsupportedCallbackException(callback)
|
case _ => throw new UnsupportedCallbackException(callback)
|
||||||
}
|
}
|
||||||
|
@ -130,10 +132,10 @@ class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
|
||||||
kafkaClientSaslMechanism: Option[String],
|
kafkaClientSaslMechanism: Option[String],
|
||||||
mode: SaslSetupMode,
|
mode: SaslSetupMode,
|
||||||
kafkaServerEntryName: String): Seq[JaasSection] = {
|
kafkaServerEntryName: String): Seq[JaasSection] = {
|
||||||
val brokerLogin = PlainLoginModule(KafkaPlainAdmin, "") // Password provided by callback handler
|
val brokerLogin = JaasModule.plainLoginModule(KAFKA_PLAIN_ADMIN, "") // Password provided by callback handler
|
||||||
val clientLogin = PlainLoginModule(KafkaPlainUser2, KafkaPlainPassword2)
|
val clientLogin = JaasModule.plainLoginModule(KAFKA_PLAIN_USER_2, KAFKA_PLAIN_PASSWORD_2)
|
||||||
Seq(JaasSection(kafkaServerEntryName, Seq(brokerLogin)),
|
Seq(new JaasSection(kafkaServerEntryName, Collections.singletonList(brokerLogin)),
|
||||||
JaasSection(KafkaClientContextName, Seq(clientLogin))) ++ zkSections
|
new JaasSection(KAFKA_CLIENT_CONTEXT_NAME, Collections.singletonList(clientLogin))) ++ zkSections.asScala
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate SSL certificates for clients since we are enabling TLS mutual authentication
|
// Generate SSL certificates for clients since we are enabling TLS mutual authentication
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
*/
|
*/
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
import kafka.utils.JaasTestUtils
|
import kafka.security.JaasTestUtils
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
|
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@ class SaslPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
override def setUp(testInfo: TestInfo): Unit = {
|
override def setUp(testInfo: TestInfo): Unit = {
|
||||||
startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), KafkaSasl, JaasTestUtils.KafkaServerContextName))
|
startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
|
||||||
super.setUp(testInfo)
|
super.setUp(testInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
*/
|
*/
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
|
import kafka.security.JaasTestUtils
|
||||||
|
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
import kafka.zk.ConfigEntityChangeNotificationZNode
|
import kafka.zk.ConfigEntityChangeNotificationZNode
|
||||||
|
@ -33,9 +35,9 @@ import org.junit.jupiter.params.provider.ValueSource
|
||||||
class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
|
class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
|
||||||
override protected def kafkaClientSaslMechanism = "SCRAM-SHA-256"
|
override protected def kafkaClientSaslMechanism = "SCRAM-SHA-256"
|
||||||
override protected def kafkaServerSaslMechanisms = ScramMechanism.mechanismNames.asScala.toList
|
override protected def kafkaServerSaslMechanisms = ScramMechanism.mechanismNames.asScala.toList
|
||||||
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramUser)
|
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_SCRAM_USER)
|
||||||
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramAdmin)
|
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_SCRAM_ADMIN)
|
||||||
private val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword
|
private val kafkaPassword = JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD
|
||||||
|
|
||||||
override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = {
|
override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = {
|
||||||
super.configureSecurityBeforeServersStart(testInfo)
|
super.configureSecurityBeforeServersStart(testInfo)
|
||||||
|
@ -54,7 +56,7 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
|
||||||
override def addFormatterSettings(formatter: Formatter): Unit = {
|
override def addFormatterSettings(formatter: Formatter): Unit = {
|
||||||
formatter.setClusterId("XcZZOzUqS4yHOjhMQB6JLQ")
|
formatter.setClusterId("XcZZOzUqS4yHOjhMQB6JLQ")
|
||||||
formatter.setScramArguments(List(
|
formatter.setScramArguments(List(
|
||||||
s"SCRAM-SHA-256=[name=${JaasTestUtils.KafkaScramAdmin},password=${JaasTestUtils.KafkaScramAdminPassword}]").asJava)
|
s"SCRAM-SHA-256=[name=${JaasTestUtils.KAFKA_SCRAM_ADMIN},password=${JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD}]").asJava)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def configureListeners(props: collection.Seq[Properties]): Unit = {
|
override def configureListeners(props: collection.Seq[Properties]): Unit = {
|
||||||
|
@ -68,8 +70,8 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
|
||||||
override def setUp(testInfo: TestInfo): Unit = {
|
override def setUp(testInfo: TestInfo): Unit = {
|
||||||
super.setUp(testInfo)
|
super.setUp(testInfo)
|
||||||
// Create client credentials after starting brokers so that dynamic credential creation is also tested
|
// Create client credentials after starting brokers so that dynamic credential creation is also tested
|
||||||
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
|
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KAFKA_SCRAM_USER, JaasTestUtils.KAFKA_SCRAM_PASSWORD)
|
||||||
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2)
|
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KAFKA_SCRAM_USER_2, JaasTestUtils.KAFKA_SCRAM_PASSWORD_2)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
|
|
|
@ -17,15 +17,11 @@
|
||||||
|
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
import java.io.File
|
import kafka.security.JaasTestUtils
|
||||||
import java.util
|
import kafka.security.JaasTestUtils.JaasSection
|
||||||
import java.util.Properties
|
|
||||||
import javax.security.auth.login.Configuration
|
|
||||||
import scala.collection.Seq
|
|
||||||
import kafka.security.minikdc.MiniKdc
|
import kafka.security.minikdc.MiniKdc
|
||||||
import kafka.server.KafkaConfig
|
import kafka.server.KafkaConfig
|
||||||
import kafka.utils.JaasTestUtils.{JaasSection, Krb5LoginModule, ZkDigestModule}
|
import kafka.utils.TestUtils
|
||||||
import kafka.utils.{JaasTestUtils, TestUtils}
|
|
||||||
import kafka.zk.{AdminZkClient, KafkaZkClient}
|
import kafka.zk.{AdminZkClient, KafkaZkClient}
|
||||||
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, ScramCredentialInfo, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
|
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, ScramCredentialInfo, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
|
||||||
import org.apache.kafka.common.config.SaslConfigs
|
import org.apache.kafka.common.config.SaslConfigs
|
||||||
|
@ -38,6 +34,13 @@ import org.apache.kafka.common.utils.Time
|
||||||
import org.apache.kafka.server.config.ConfigType
|
import org.apache.kafka.server.config.ConfigType
|
||||||
import org.apache.zookeeper.client.ZKClientConfig
|
import org.apache.zookeeper.client.ZKClientConfig
|
||||||
|
|
||||||
|
import java.io.File
|
||||||
|
import java.util
|
||||||
|
import java.util.Properties
|
||||||
|
import javax.security.auth.login.Configuration
|
||||||
|
import scala.collection.Seq
|
||||||
|
import scala.jdk.CollectionConverters._
|
||||||
|
import scala.jdk.OptionConverters._
|
||||||
import scala.util.Using
|
import scala.util.Using
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -62,18 +65,17 @@ trait SaslSetup {
|
||||||
def startSasl(jaasSections: Seq[JaasSection]): Unit = {
|
def startSasl(jaasSections: Seq[JaasSection]): Unit = {
|
||||||
// Important if tests leak consumers, producers or brokers
|
// Important if tests leak consumers, producers or brokers
|
||||||
LoginManager.closeAll()
|
LoginManager.closeAll()
|
||||||
val hasKerberos = jaasSections.exists(_.modules.exists {
|
|
||||||
case _: Krb5LoginModule => true
|
val hasKerberos = jaasSections.exists(_.getModules.asScala.exists(_.name().endsWith("Krb5LoginModule")))
|
||||||
case _ => false
|
|
||||||
})
|
|
||||||
if (hasKerberos) {
|
if (hasKerberos) {
|
||||||
initializeKerberos()
|
initializeKerberos()
|
||||||
}
|
}
|
||||||
|
|
||||||
writeJaasConfigurationToFile(jaasSections)
|
writeJaasConfigurationToFile(jaasSections)
|
||||||
val hasZk = jaasSections.exists(_.modules.exists {
|
|
||||||
case _: ZkDigestModule => true
|
val hasZk = jaasSections.exists(_.getModules.asScala.exists(_.name() == "org.apache.zookeeper.server.auth.DigestLoginModule"))
|
||||||
case _ => false
|
|
||||||
})
|
|
||||||
if (hasZk)
|
if (hasZk)
|
||||||
System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
|
System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
|
||||||
}
|
}
|
||||||
|
@ -82,9 +84,9 @@ trait SaslSetup {
|
||||||
val (serverKeytabFile, clientKeytabFile) = maybeCreateEmptyKeytabFiles()
|
val (serverKeytabFile, clientKeytabFile) = maybeCreateEmptyKeytabFiles()
|
||||||
kdc = new MiniKdc(kdcConf, workDir)
|
kdc = new MiniKdc(kdcConf, workDir)
|
||||||
kdc.start()
|
kdc.start()
|
||||||
kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost")
|
kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME + "/localhost")
|
||||||
kdc.createPrincipal(clientKeytabFile,
|
kdc.createPrincipal(clientKeytabFile,
|
||||||
JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2)
|
JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME, JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2)
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Return a tuple with the path to the server keytab file and client keytab file */
|
/** Return a tuple with the path to the server keytab file and client keytab file */
|
||||||
|
@ -99,23 +101,23 @@ trait SaslSetup {
|
||||||
def jaasSections(kafkaServerSaslMechanisms: Seq[String],
|
def jaasSections(kafkaServerSaslMechanisms: Seq[String],
|
||||||
kafkaClientSaslMechanism: Option[String],
|
kafkaClientSaslMechanism: Option[String],
|
||||||
mode: SaslSetupMode = Both,
|
mode: SaslSetupMode = Both,
|
||||||
kafkaServerEntryName: String = JaasTestUtils.KafkaServerContextName): Seq[JaasSection] = {
|
kafkaServerEntryName: String = JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME): Seq[JaasSection] = {
|
||||||
val hasKerberos = mode != ZkSasl &&
|
val hasKerberos = mode != ZkSasl &&
|
||||||
(kafkaServerSaslMechanisms.contains("GSSAPI") || kafkaClientSaslMechanism.contains("GSSAPI"))
|
(kafkaServerSaslMechanisms.contains("GSSAPI") || kafkaClientSaslMechanism.contains("GSSAPI"))
|
||||||
if (hasKerberos)
|
if (hasKerberos)
|
||||||
maybeCreateEmptyKeytabFiles()
|
maybeCreateEmptyKeytabFiles()
|
||||||
mode match {
|
mode match {
|
||||||
case ZkSasl => JaasTestUtils.zkSections
|
case ZkSasl => JaasTestUtils.zkSections.asScala
|
||||||
case KafkaSasl =>
|
case KafkaSasl =>
|
||||||
Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, kafkaServerSaslMechanisms, serverKeytabFile),
|
Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, kafkaServerSaslMechanisms.asJava, serverKeytabFile.toJava),
|
||||||
JaasTestUtils.kafkaClientSection(kafkaClientSaslMechanism, clientKeytabFile))
|
JaasTestUtils.kafkaClientSection(kafkaClientSaslMechanism.toJava, clientKeytabFile.toJava))
|
||||||
case Both => Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, kafkaServerSaslMechanisms, serverKeytabFile),
|
case Both => Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, kafkaServerSaslMechanisms.asJava, serverKeytabFile.toJava),
|
||||||
JaasTestUtils.kafkaClientSection(kafkaClientSaslMechanism, clientKeytabFile)) ++ JaasTestUtils.zkSections
|
JaasTestUtils.kafkaClientSection(kafkaClientSaslMechanism.toJava, clientKeytabFile.toJava)) ++ JaasTestUtils.zkSections.asScala
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def writeJaasConfigurationToFile(jaasSections: Seq[JaasSection]): Unit = {
|
private def writeJaasConfigurationToFile(jaasSections: Seq[JaasSection]): Unit = {
|
||||||
val file = JaasTestUtils.writeJaasContextsToFile(jaasSections)
|
val file = JaasTestUtils.writeJaasContextsToFile(jaasSections.asJava)
|
||||||
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, file.getAbsolutePath)
|
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, file.getAbsolutePath)
|
||||||
// This will cause a reload of the Configuration singleton when `getConfiguration` is called
|
// This will cause a reload of the Configuration singleton when `getConfiguration` is called
|
||||||
Configuration.setConfiguration(null)
|
Configuration.setConfiguration(null)
|
||||||
|
@ -148,16 +150,16 @@ trait SaslSetup {
|
||||||
|
|
||||||
def jaasClientLoginModule(clientSaslMechanism: String, serviceName: Option[String] = None): String = {
|
def jaasClientLoginModule(clientSaslMechanism: String, serviceName: Option[String] = None): String = {
|
||||||
if (serviceName.isDefined)
|
if (serviceName.isDefined)
|
||||||
JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile, serviceName.get)
|
JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile.toJava, serviceName.get)
|
||||||
else
|
else
|
||||||
JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile)
|
JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile.toJava)
|
||||||
}
|
}
|
||||||
|
|
||||||
def jaasAdminLoginModule(clientSaslMechanism: String, serviceName: Option[String] = None): String = {
|
def jaasAdminLoginModule(clientSaslMechanism: String, serviceName: Option[String] = None): String = {
|
||||||
if (serviceName.isDefined)
|
if (serviceName.isDefined)
|
||||||
JaasTestUtils.adminLoginModule(clientSaslMechanism, serverKeytabFile, serviceName.get)
|
JaasTestUtils.adminLoginModule(clientSaslMechanism, serverKeytabFile.toJava, serviceName.get)
|
||||||
else
|
else
|
||||||
JaasTestUtils.adminLoginModule(clientSaslMechanism, serverKeytabFile)
|
JaasTestUtils.adminLoginModule(clientSaslMechanism, serverKeytabFile.toJava)
|
||||||
}
|
}
|
||||||
|
|
||||||
def jaasScramClientLoginModule(clientSaslScramMechanism: String, scramUser: String, scramPassword: String): String = {
|
def jaasScramClientLoginModule(clientSaslScramMechanism: String, scramUser: String, scramPassword: String): String = {
|
||||||
|
|
|
@ -12,9 +12,10 @@
|
||||||
*/
|
*/
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
|
import kafka.security.JaasTestUtils
|
||||||
import kafka.security.authorizer.AclAuthorizer
|
import kafka.security.authorizer.AclAuthorizer
|
||||||
import kafka.utils.TestUtils._
|
import kafka.utils.TestUtils._
|
||||||
import kafka.utils.{JaasTestUtils, TestInfoUtils, TestUtils}
|
import kafka.utils.{TestInfoUtils, TestUtils}
|
||||||
import org.apache.kafka.clients.admin._
|
import org.apache.kafka.clients.admin._
|
||||||
import org.apache.kafka.common.Uuid
|
import org.apache.kafka.common.Uuid
|
||||||
import org.apache.kafka.common.acl._
|
import org.apache.kafka.common.acl._
|
||||||
|
@ -47,7 +48,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
||||||
val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
|
val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
|
||||||
val zkAuthorizerClassName = classOf[AclAuthorizer].getName
|
val zkAuthorizerClassName = classOf[AclAuthorizer].getName
|
||||||
val kraftAuthorizerClassName = classOf[StandardAuthorizer].getName
|
val kraftAuthorizerClassName = classOf[StandardAuthorizer].getName
|
||||||
val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaServerPrincipalUnqualifiedName)
|
val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME)
|
||||||
var superUserAdmin: Admin = _
|
var superUserAdmin: Admin = _
|
||||||
override protected def securityProtocol = SecurityProtocol.SASL_SSL
|
override protected def securityProtocol = SecurityProtocol.SASL_SSL
|
||||||
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))
|
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))
|
||||||
|
@ -72,7 +73,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
||||||
}
|
}
|
||||||
|
|
||||||
def setUpSasl(): Unit = {
|
def setUpSasl(): Unit = {
|
||||||
startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName))
|
startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
|
||||||
|
|
||||||
val loginContext = jaasAdminLoginModule("GSSAPI")
|
val loginContext = jaasAdminLoginModule("GSSAPI")
|
||||||
superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, loginContext)
|
superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, loginContext)
|
||||||
|
|
|
@ -12,7 +12,8 @@
|
||||||
*/
|
*/
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
import kafka.utils.{JaasTestUtils, TestUtils}
|
import kafka.security.JaasTestUtils
|
||||||
|
import kafka.utils.TestUtils
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||||
import org.apache.kafka.server.config.ZkConfigs
|
import org.apache.kafka.server.config.ZkConfigs
|
||||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
|
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
|
||||||
|
@ -25,7 +26,7 @@ class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup {
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
override def setUp(testInfo: TestInfo): Unit = {
|
override def setUp(testInfo: TestInfo): Unit = {
|
||||||
startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName))
|
startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
|
||||||
super.setUp(testInfo)
|
super.setUp(testInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,8 +14,9 @@
|
||||||
|
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
|
import kafka.security.JaasTestUtils
|
||||||
import kafka.server.KafkaBroker
|
import kafka.server.KafkaBroker
|
||||||
import kafka.utils.{JaasTestUtils, TestUtils}
|
import kafka.utils.TestUtils
|
||||||
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
|
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
|
||||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
|
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
|
||||||
|
|
||||||
|
@ -30,7 +31,7 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
override def setUp(testInfo: TestInfo): Unit = {
|
override def setUp(testInfo: TestInfo): Unit = {
|
||||||
startSasl(jaasSections(kafkaServerSaslMechanisms, Some("GSSAPI"), KafkaSasl, JaasTestUtils.KafkaServerContextName))
|
startSasl(jaasSections(kafkaServerSaslMechanisms, Some("GSSAPI"), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
|
||||||
super.setUp(testInfo)
|
super.setUp(testInfo)
|
||||||
quotaTestClients.alterClientQuotas(
|
quotaTestClients.alterClientQuotas(
|
||||||
quotaTestClients.clientQuotaAlteration(
|
quotaTestClients.clientQuotaAlteration(
|
||||||
|
@ -53,7 +54,7 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
|
||||||
val adminClient = createAdminClient()
|
val adminClient = createAdminClient()
|
||||||
|
|
||||||
new QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producer, consumer, adminClient) {
|
new QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producer, consumer, adminClient) {
|
||||||
override val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2)
|
override val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2)
|
||||||
|
|
||||||
override def quotaMetricTags(clientId: String): Map[String, String] = {
|
override def quotaMetricTags(clientId: String): Map[String, String] = {
|
||||||
Map("user" -> userPrincipal.getName, "client-id" -> "")
|
Map("user" -> userPrincipal.getName, "client-id" -> "")
|
||||||
|
|
|
@ -33,6 +33,7 @@ import kafka.api.{KafkaSasl, SaslSetup}
|
||||||
import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
|
import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
|
||||||
import kafka.log.UnifiedLog
|
import kafka.log.UnifiedLog
|
||||||
import kafka.network.{DataPlaneAcceptor, Processor, RequestChannel}
|
import kafka.network.{DataPlaneAcceptor, Processor, RequestChannel}
|
||||||
|
import kafka.security.JaasTestUtils
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
import kafka.utils.Implicits._
|
import kafka.utils.Implicits._
|
||||||
import kafka.utils.TestUtils.TestControllerRequestCompletionHandler
|
import kafka.utils.TestUtils.TestControllerRequestCompletionHandler
|
||||||
|
@ -53,7 +54,7 @@ import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestEx
|
||||||
import org.apache.kafka.common.internals.Topic
|
import org.apache.kafka.common.internals.Topic
|
||||||
import org.apache.kafka.common.message.MetadataRequestData
|
import org.apache.kafka.common.message.MetadataRequestData
|
||||||
import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext, MetricsReporter, Quota}
|
import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext, MetricsReporter, Quota}
|
||||||
import org.apache.kafka.common.network.{ListenerName, ConnectionMode}
|
import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
|
||||||
import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS, TRUSTSTORE_PROPS}
|
import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS, TRUSTSTORE_PROPS}
|
||||||
import org.apache.kafka.common.record.TimestampType
|
import org.apache.kafka.common.record.TimestampType
|
||||||
import org.apache.kafka.common.requests.MetadataRequest
|
import org.apache.kafka.common.requests.MetadataRequest
|
||||||
|
@ -1146,12 +1147,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testAddRemoveSaslListeners(): Unit = {
|
def testAddRemoveSaslListeners(): Unit = {
|
||||||
createScramCredentials(adminClients.head, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
|
createScramCredentials(adminClients.head, JaasTestUtils.KAFKA_SCRAM_USER, JaasTestUtils.KAFKA_SCRAM_PASSWORD)
|
||||||
createScramCredentials(adminClients.head, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword)
|
createScramCredentials(adminClients.head, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
|
||||||
initializeKerberos()
|
initializeKerberos()
|
||||||
// make sure each server's credential cache has all the created credentials
|
// make sure each server's credential cache has all the created credentials
|
||||||
// (check after initializing Kerberos to minimize delays)
|
// (check after initializing Kerberos to minimize delays)
|
||||||
List(JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramAdmin).foreach { scramUser =>
|
List(JaasTestUtils.KAFKA_SCRAM_USER, JaasTestUtils.KAFKA_SCRAM_ADMIN).foreach { scramUser =>
|
||||||
servers.foreach { server =>
|
servers.foreach { server =>
|
||||||
ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach(mechanism =>
|
ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach(mechanism =>
|
||||||
TestUtils.waitUntilTrue(() => server.credentialProvider.credentialCache.cache(
|
TestUtils.waitUntilTrue(() => server.credentialProvider.credentialCache.cache(
|
||||||
|
@ -1395,7 +1396,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
|
||||||
|
|
||||||
private def awaitInitialPositions(consumer: Consumer[_, _]): Unit = {
|
private def awaitInitialPositions(consumer: Consumer[_, _]): Unit = {
|
||||||
TestUtils.pollUntilTrue(consumer, () => !consumer.assignment.isEmpty, "Timed out while waiting for assignment")
|
TestUtils.pollUntilTrue(consumer, () => !consumer.assignment.isEmpty, "Timed out while waiting for assignment")
|
||||||
consumer.assignment.forEach(consumer.position(_))
|
consumer.assignment.forEach(tp => consumer.position(tp))
|
||||||
}
|
}
|
||||||
|
|
||||||
private def clientProps(securityProtocol: SecurityProtocol, saslMechanism: Option[String] = None): Properties = {
|
private def clientProps(securityProtocol: SecurityProtocol, saslMechanism: Option[String] = None): Properties = {
|
||||||
|
@ -1748,7 +1749,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
|
||||||
props.put(prefix + SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka")
|
props.put(prefix + SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka")
|
||||||
mechanisms.foreach { mechanism =>
|
mechanisms.foreach { mechanism =>
|
||||||
val jaasSection = jaasSections(Seq(mechanism), None, KafkaSasl, "").head
|
val jaasSection = jaasSections(Seq(mechanism), None, KafkaSasl, "").head
|
||||||
val jaasConfig = jaasSection.modules.head.toString
|
val jaasConfig = jaasSection.getModules.get(0).toString
|
||||||
props.put(listenerName.saslMechanismConfigPrefix(mechanism) + SaslConfigs.SASL_JAAS_CONFIG, jaasConfig)
|
props.put(listenerName.saslMechanismConfigPrefix(mechanism) + SaslConfigs.SASL_JAAS_CONFIG, jaasConfig)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,12 +17,14 @@
|
||||||
*/
|
*/
|
||||||
package kafka.server
|
package kafka.server
|
||||||
|
|
||||||
|
import kafka.security.JaasTestUtils
|
||||||
|
import kafka.security.JaasTestUtils.JaasSection
|
||||||
|
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
|
|
||||||
import scala.collection.Seq
|
import scala.collection.Seq
|
||||||
|
import scala.jdk.OptionConverters._
|
||||||
|
import scala.jdk.CollectionConverters._
|
||||||
|
|
||||||
import kafka.utils.JaasTestUtils
|
|
||||||
import kafka.utils.JaasTestUtils.JaasSection
|
|
||||||
|
|
||||||
class MultipleListenersWithAdditionalJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest {
|
class MultipleListenersWithAdditionalJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest {
|
||||||
|
|
||||||
|
@ -30,15 +32,15 @@ class MultipleListenersWithAdditionalJaasContextTest extends MultipleListenersWi
|
||||||
|
|
||||||
override def staticJaasSections: Seq[JaasSection] = {
|
override def staticJaasSections: Seq[JaasSection] = {
|
||||||
val (serverKeytabFile, _) = maybeCreateEmptyKeytabFiles()
|
val (serverKeytabFile, _) = maybeCreateEmptyKeytabFiles()
|
||||||
JaasTestUtils.zkSections :+
|
JaasTestUtils.zkSections.asScala :+
|
||||||
JaasTestUtils.kafkaServerSection("secure_external.KafkaServer", kafkaServerSaslMechanisms(SecureExternal), Some(serverKeytabFile))
|
JaasTestUtils.kafkaServerSection("secure_external.KafkaServer", kafkaServerSaslMechanisms(SecureExternal).asJava, Some(serverKeytabFile).toJava)
|
||||||
}
|
}
|
||||||
|
|
||||||
override protected def dynamicJaasSections: Properties = {
|
override protected def dynamicJaasSections: Properties = {
|
||||||
val props = new Properties
|
val props = new Properties
|
||||||
kafkaServerSaslMechanisms(SecureInternal).foreach { mechanism =>
|
kafkaServerSaslMechanisms(SecureInternal).foreach { mechanism =>
|
||||||
addDynamicJaasSection(props, SecureInternal, mechanism,
|
addDynamicJaasSection(props, SecureInternal, mechanism,
|
||||||
JaasTestUtils.kafkaServerSection("secure_internal.KafkaServer", Seq(mechanism), None))
|
JaasTestUtils.kafkaServerSection("secure_internal.KafkaServer", Seq(mechanism).asJava, None.toJava))
|
||||||
}
|
}
|
||||||
props
|
props
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,16 +18,14 @@
|
||||||
package kafka.server
|
package kafka.server
|
||||||
|
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
|
|
||||||
import scala.collection.Seq
|
import scala.collection.Seq
|
||||||
|
|
||||||
import kafka.api.Both
|
import kafka.api.Both
|
||||||
import kafka.utils.JaasTestUtils.JaasSection
|
import kafka.security.JaasTestUtils.JaasSection
|
||||||
|
|
||||||
class MultipleListenersWithDefaultJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest {
|
class MultipleListenersWithDefaultJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest {
|
||||||
|
|
||||||
override def staticJaasSections: Seq[JaasSection] =
|
override def staticJaasSections: Seq[JaasSection] =
|
||||||
jaasSections(kafkaServerSaslMechanisms.values.flatMap(identity).toSeq, Some(kafkaClientSaslMechanism), Both)
|
jaasSections(kafkaServerSaslMechanisms.values.flatten.toSeq, Some(kafkaClientSaslMechanism), Both)
|
||||||
|
|
||||||
override protected def dynamicJaasSections: Properties = new Properties
|
override protected def dynamicJaasSections: Properties = new Properties
|
||||||
|
|
||||||
|
|
|
@ -21,15 +21,16 @@ package kafka.server
|
||||||
import java.util.{Collections, Objects, Properties}
|
import java.util.{Collections, Objects, Properties}
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import kafka.api.SaslSetup
|
import kafka.api.SaslSetup
|
||||||
import kafka.utils.JaasTestUtils.JaasSection
|
import kafka.security.JaasTestUtils
|
||||||
import kafka.utils.{JaasTestUtils, TestUtils}
|
import kafka.security.JaasTestUtils.JaasSection
|
||||||
|
import kafka.utils.TestUtils
|
||||||
import kafka.utils.Implicits._
|
import kafka.utils.Implicits._
|
||||||
import org.apache.kafka.clients.consumer.Consumer
|
import org.apache.kafka.clients.consumer.Consumer
|
||||||
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
|
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
|
||||||
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
||||||
import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
|
import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
|
||||||
import org.apache.kafka.common.internals.Topic
|
import org.apache.kafka.common.internals.Topic
|
||||||
import org.apache.kafka.common.network.{ListenerName, ConnectionMode}
|
import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
|
||||||
import org.apache.kafka.server.config.{ReplicationConfigs, ZkConfigs}
|
import org.apache.kafka.server.config.{ReplicationConfigs, ZkConfigs}
|
||||||
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
|
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
|
||||||
import org.apache.kafka.network.SocketServerConfigs
|
import org.apache.kafka.network.SocketServerConfigs
|
||||||
|
@ -113,7 +114,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
|
||||||
TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_DEFAULT,
|
TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_DEFAULT,
|
||||||
replicationFactor = 2, servers, servers.head.groupCoordinator.groupMetadataTopicConfigs)
|
replicationFactor = 2, servers, servers.head.groupCoordinator.groupMetadataTopicConfigs)
|
||||||
|
|
||||||
createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
|
createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_USER, JaasTestUtils.KAFKA_SCRAM_PASSWORD)
|
||||||
|
|
||||||
servers.head.config.listeners.foreach { endPoint =>
|
servers.head.config.listeners.foreach { endPoint =>
|
||||||
val listenerName = endPoint.listenerName
|
val listenerName = endPoint.listenerName
|
||||||
|
@ -176,7 +177,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
|
||||||
protected def addDynamicJaasSection(props: Properties, listener: String, mechanism: String, jaasSection: JaasSection): Unit = {
|
protected def addDynamicJaasSection(props: Properties, listener: String, mechanism: String, jaasSection: JaasSection): Unit = {
|
||||||
val listenerName = new ListenerName(listener)
|
val listenerName = new ListenerName(listener)
|
||||||
val prefix = listenerName.saslMechanismConfigPrefix(mechanism)
|
val prefix = listenerName.saslMechanismConfigPrefix(mechanism)
|
||||||
val jaasConfig = jaasSection.modules.head.toString
|
val jaasConfig = jaasSection.getModules.get(0).toString
|
||||||
props.put(s"${prefix}${SaslConfigs.SASL_JAAS_CONFIG}", jaasConfig)
|
props.put(s"${prefix}${SaslConfigs.SASL_JAAS_CONFIG}", jaasConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
package kafka.security.auth
|
package kafka.security.auth
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets
|
import java.nio.charset.StandardCharsets
|
||||||
|
|
||||||
import kafka.admin.ZkSecurityMigrator
|
import kafka.admin.ZkSecurityMigrator
|
||||||
import kafka.server.QuorumTestHarness
|
import kafka.server.QuorumTestHarness
|
||||||
import kafka.utils.{Logging, TestUtils}
|
import kafka.utils.{Logging, TestUtils}
|
||||||
|
@ -33,6 +32,7 @@ import scala.util.{Failure, Success, Try}
|
||||||
import javax.security.auth.login.Configuration
|
import javax.security.auth.login.Configuration
|
||||||
import kafka.cluster.{Broker, EndPoint}
|
import kafka.cluster.{Broker, EndPoint}
|
||||||
import kafka.controller.ReplicaAssignment
|
import kafka.controller.ReplicaAssignment
|
||||||
|
import kafka.security.JaasTestUtils
|
||||||
import org.apache.kafka.common.network.ListenerName
|
import org.apache.kafka.common.network.ListenerName
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||||
import org.apache.kafka.common.utils.Time
|
import org.apache.kafka.common.utils.Time
|
||||||
|
@ -43,7 +43,7 @@ import scala.jdk.CollectionConverters._
|
||||||
import scala.collection.Seq
|
import scala.collection.Seq
|
||||||
|
|
||||||
class ZkAuthorizationTest extends QuorumTestHarness with Logging {
|
class ZkAuthorizationTest extends QuorumTestHarness with Logging {
|
||||||
val jaasFile = kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections)
|
val jaasFile = JaasTestUtils.writeJaasContextsToFile(JaasTestUtils.zkSections)
|
||||||
val authProvider = "zookeeper.authProvider.1"
|
val authProvider = "zookeeper.authProvider.1"
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
|
|
|
@ -16,22 +16,16 @@
|
||||||
*/
|
*/
|
||||||
package kafka.security.authorizer
|
package kafka.security.authorizer
|
||||||
|
|
||||||
import java.net.InetAddress
|
|
||||||
import java.util
|
|
||||||
import java.util.UUID
|
|
||||||
import java.util.concurrent.{Executors, TimeUnit}
|
|
||||||
|
|
||||||
import javax.security.auth.Subject
|
|
||||||
import javax.security.auth.callback.CallbackHandler
|
|
||||||
import kafka.api.SaslSetup
|
import kafka.api.SaslSetup
|
||||||
|
import kafka.security.{JaasModule, JaasTestUtils}
|
||||||
import kafka.server.{KafkaConfig, QuorumTestHarness}
|
import kafka.server.{KafkaConfig, QuorumTestHarness}
|
||||||
import kafka.utils.JaasTestUtils.{JaasModule, JaasSection}
|
import kafka.security.JaasTestUtils.JaasSection
|
||||||
import kafka.utils.{JaasTestUtils, TestUtils}
|
import kafka.utils.TestUtils
|
||||||
import kafka.zk.KafkaZkClient
|
import kafka.zk.KafkaZkClient
|
||||||
import kafka.zookeeper.ZooKeeperClient
|
import kafka.zookeeper.ZooKeeperClient
|
||||||
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter}
|
|
||||||
import org.apache.kafka.common.acl.AclOperation.{READ, WRITE}
|
import org.apache.kafka.common.acl.AclOperation.{READ, WRITE}
|
||||||
import org.apache.kafka.common.acl.AclPermissionType.ALLOW
|
import org.apache.kafka.common.acl.AclPermissionType.ALLOW
|
||||||
|
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter}
|
||||||
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
|
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
|
||||||
import org.apache.kafka.common.protocol.ApiKeys
|
import org.apache.kafka.common.protocol.ApiKeys
|
||||||
import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
|
import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
|
||||||
|
@ -39,14 +33,19 @@ import org.apache.kafka.common.resource.PatternType.LITERAL
|
||||||
import org.apache.kafka.common.resource.ResourcePattern
|
import org.apache.kafka.common.resource.ResourcePattern
|
||||||
import org.apache.kafka.common.resource.ResourceType.TOPIC
|
import org.apache.kafka.common.resource.ResourceType.TOPIC
|
||||||
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
|
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
|
||||||
import org.apache.kafka.test.{TestUtils => JTestUtils}
|
|
||||||
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
|
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
|
||||||
|
import org.apache.kafka.test.{TestUtils => JTestUtils}
|
||||||
import org.apache.zookeeper.server.auth.DigestLoginModule
|
import org.apache.zookeeper.server.auth.DigestLoginModule
|
||||||
import org.junit.jupiter.api.Assertions.assertEquals
|
import org.junit.jupiter.api.Assertions.assertEquals
|
||||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
|
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
|
||||||
|
|
||||||
|
import java.net.InetAddress
|
||||||
|
import java.util
|
||||||
|
import java.util.UUID
|
||||||
|
import java.util.concurrent.{Executors, TimeUnit}
|
||||||
|
import javax.security.auth.Subject
|
||||||
|
import javax.security.auth.callback.CallbackHandler
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
import scala.collection.Seq
|
|
||||||
|
|
||||||
class AclAuthorizerWithZkSaslTest extends QuorumTestHarness with SaslSetup {
|
class AclAuthorizerWithZkSaslTest extends QuorumTestHarness with SaslSetup {
|
||||||
|
|
||||||
|
@ -67,9 +66,9 @@ class AclAuthorizerWithZkSaslTest extends QuorumTestHarness with SaslSetup {
|
||||||
// Configure ZK SASL with TestableDigestLoginModule for clients to inject failures
|
// Configure ZK SASL with TestableDigestLoginModule for clients to inject failures
|
||||||
TestableDigestLoginModule.reset()
|
TestableDigestLoginModule.reset()
|
||||||
val jaasSections = JaasTestUtils.zkSections
|
val jaasSections = JaasTestUtils.zkSections
|
||||||
val serverJaas = jaasSections.filter(_.contextName == "Server")
|
val serverJaas = jaasSections.asScala.filter(section => section.getContextName == "Server")
|
||||||
val clientJaas = jaasSections.filter(_.contextName == "Client")
|
val clientJaas = jaasSections.asScala.filter(section => section.getContextName == "Client")
|
||||||
.map(section => new TestableJaasSection(section.contextName, section.modules))
|
.map(section => new TestableJaasSection(section.getContextName, section.getModules))
|
||||||
startSasl(serverJaas ++ clientJaas)
|
startSasl(serverJaas ++ clientJaas)
|
||||||
|
|
||||||
// Increase maxUpdateRetries to avoid transient failures
|
// Increase maxUpdateRetries to avoid transient failures
|
||||||
|
@ -180,7 +179,7 @@ class TestableDigestLoginModule extends DigestLoginModule {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class TestableJaasSection(contextName: String, modules: Seq[JaasModule]) extends JaasSection(contextName, modules) {
|
class TestableJaasSection(contextName: String, modules: util.List[JaasModule]) extends JaasSection(contextName, modules) {
|
||||||
override def toString: String = {
|
override def toString: String = {
|
||||||
super.toString.replaceFirst(classOf[DigestLoginModule].getName, classOf[TestableDigestLoginModule].getName)
|
super.toString.replaceFirst(classOf[DigestLoginModule].getName, classOf[TestableDigestLoginModule].getName)
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,19 +16,18 @@
|
||||||
*/
|
*/
|
||||||
package kafka.server
|
package kafka.server
|
||||||
|
|
||||||
import kafka.api.IntegrationTestHarness
|
import kafka.api.{IntegrationTestHarness, KafkaSasl, SaslSetup}
|
||||||
import kafka.api.{KafkaSasl, SaslSetup}
|
import kafka.security.JaasTestUtils
|
||||||
import kafka.utils.{JaasTestUtils, TestUtils}
|
import kafka.utils.TestUtils
|
||||||
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateDelegationTokenOptions, DescribeDelegationTokenOptions}
|
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateDelegationTokenOptions, DescribeDelegationTokenOptions}
|
||||||
import org.apache.kafka.common.errors.InvalidPrincipalTypeException
|
import org.apache.kafka.common.errors.{DelegationTokenNotFoundException, InvalidPrincipalTypeException}
|
||||||
import org.apache.kafka.common.errors.DelegationTokenNotFoundException
|
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||||
import org.apache.kafka.common.utils.SecurityUtils
|
import org.apache.kafka.common.utils.SecurityUtils
|
||||||
import org.apache.kafka.server.config.DelegationTokenManagerConfigs
|
import org.apache.kafka.server.config.DelegationTokenManagerConfigs
|
||||||
import org.junit.jupiter.api.Assertions._
|
import org.junit.jupiter.api.Assertions._
|
||||||
|
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
|
||||||
import org.junit.jupiter.params.ParameterizedTest
|
import org.junit.jupiter.params.ParameterizedTest
|
||||||
import org.junit.jupiter.params.provider.ValueSource
|
import org.junit.jupiter.params.provider.ValueSource
|
||||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
|
|
||||||
|
|
||||||
import java.util
|
import java.util
|
||||||
import scala.concurrent.ExecutionException
|
import scala.concurrent.ExecutionException
|
||||||
|
@ -52,7 +51,7 @@ class DelegationTokenRequestsTest extends IntegrationTestHarness with SaslSetup
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
override def setUp(testInfo: TestInfo): Unit = {
|
override def setUp(testInfo: TestInfo): Unit = {
|
||||||
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
|
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
|
||||||
super.setUp(testInfo)
|
super.setUp(testInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,8 @@
|
||||||
package kafka.server
|
package kafka.server
|
||||||
|
|
||||||
import kafka.api.{KafkaSasl, SaslSetup}
|
import kafka.api.{KafkaSasl, SaslSetup}
|
||||||
import kafka.utils.{JaasTestUtils, TestUtils}
|
import kafka.security.JaasTestUtils
|
||||||
|
import kafka.utils.TestUtils
|
||||||
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
|
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
|
||||||
import org.apache.kafka.common.errors.DelegationTokenDisabledException
|
import org.apache.kafka.common.errors.DelegationTokenDisabledException
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||||
|
@ -41,7 +42,7 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
override def setUp(testInfo: TestInfo): Unit = {
|
override def setUp(testInfo: TestInfo): Unit = {
|
||||||
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
|
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
|
||||||
super.setUp(testInfo)
|
super.setUp(testInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,14 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package kafka.server
|
package kafka.server
|
||||||
|
|
||||||
import java.net.Socket
|
|
||||||
import java.util.Collections
|
|
||||||
import kafka.api.{KafkaSasl, SaslSetup}
|
import kafka.api.{KafkaSasl, SaslSetup}
|
||||||
|
import kafka.security.JaasTestUtils
|
||||||
import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, kafkaServerSaslMechanisms}
|
import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, kafkaServerSaslMechanisms}
|
||||||
import kafka.test.annotation.{ClusterTemplate, Type}
|
import kafka.test.annotation.{ClusterTemplate, Type}
|
||||||
import kafka.test.junit.ClusterTestExtensions
|
import kafka.test.junit.ClusterTestExtensions
|
||||||
import kafka.test.{ClusterConfig, ClusterInstance}
|
import kafka.test.{ClusterConfig, ClusterInstance}
|
||||||
import kafka.utils.JaasTestUtils
|
|
||||||
import org.apache.kafka.common.config.SaslConfigs
|
import org.apache.kafka.common.config.SaslConfigs
|
||||||
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
||||||
import org.apache.kafka.common.message.SaslHandshakeRequestData
|
import org.apache.kafka.common.message.SaslHandshakeRequestData
|
||||||
|
@ -35,6 +33,8 @@ import org.junit.jupiter.api.Assertions._
|
||||||
import org.junit.jupiter.api.extension.ExtendWith
|
import org.junit.jupiter.api.extension.ExtendWith
|
||||||
import org.junit.jupiter.api.{AfterEach, BeforeEach}
|
import org.junit.jupiter.api.{AfterEach, BeforeEach}
|
||||||
|
|
||||||
|
import java.net.Socket
|
||||||
|
import java.util.Collections
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
|
|
||||||
object SaslApiVersionsRequestTest {
|
object SaslApiVersionsRequestTest {
|
||||||
|
@ -75,7 +75,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
def setupSasl(): Unit = {
|
def setupSasl(): Unit = {
|
||||||
sasl = new SaslSetup() {}
|
sasl = new SaslSetup() {}
|
||||||
sasl.startSasl(sasl.jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
|
sasl.startSasl(sasl.jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ClusterTemplate("saslApiVersionsRequestClusterConfig")
|
@ClusterTemplate("saslApiVersionsRequestClusterConfig")
|
||||||
|
|
|
@ -1,303 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package kafka.utils
|
|
||||||
|
|
||||||
import java.io.{BufferedWriter, File, FileWriter}
|
|
||||||
import java.util.Properties
|
|
||||||
import scala.collection.Seq
|
|
||||||
import org.apache.kafka.clients.admin.ScramMechanism
|
|
||||||
import org.apache.kafka.common.config.SaslConfigs
|
|
||||||
import org.apache.kafka.common.utils.Java
|
|
||||||
|
|
||||||
object JaasTestUtils {
|
|
||||||
|
|
||||||
case class Krb5LoginModule(useKeyTab: Boolean,
|
|
||||||
storeKey: Boolean,
|
|
||||||
keyTab: String,
|
|
||||||
principal: String,
|
|
||||||
debug: Boolean,
|
|
||||||
serviceName: Option[String],
|
|
||||||
isIbmSecurity: Boolean) extends JaasModule {
|
|
||||||
|
|
||||||
def name =
|
|
||||||
if (isIbmSecurity)
|
|
||||||
"com.ibm.security.auth.module.Krb5LoginModule"
|
|
||||||
else
|
|
||||||
"com.sun.security.auth.module.Krb5LoginModule"
|
|
||||||
|
|
||||||
def entries: Map[String, String] =
|
|
||||||
if (isIbmSecurity)
|
|
||||||
Map(
|
|
||||||
"principal" -> principal,
|
|
||||||
"credsType" -> "both"
|
|
||||||
) ++ (if (useKeyTab) Map("useKeytab" -> s"file:$keyTab") else Map.empty)
|
|
||||||
else
|
|
||||||
Map(
|
|
||||||
"useKeyTab" -> useKeyTab.toString,
|
|
||||||
"storeKey" -> storeKey.toString,
|
|
||||||
"keyTab" -> keyTab,
|
|
||||||
"principal" -> principal
|
|
||||||
) ++ serviceName.map(s => Map("serviceName" -> s)).getOrElse(Map.empty)
|
|
||||||
}
|
|
||||||
|
|
||||||
case class PlainLoginModule(username: String,
|
|
||||||
password: String,
|
|
||||||
debug: Boolean = false,
|
|
||||||
validUsers: Map[String, String] = Map.empty) extends JaasModule {
|
|
||||||
|
|
||||||
def name = "org.apache.kafka.common.security.plain.PlainLoginModule"
|
|
||||||
|
|
||||||
def entries: Map[String, String] = Map(
|
|
||||||
"username" -> username,
|
|
||||||
"password" -> password
|
|
||||||
) ++ validUsers.map { case (user, pass) => s"user_$user" -> pass }
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
case class ZkDigestModule(debug: Boolean = false,
|
|
||||||
entries: Map[String, String] = Map.empty) extends JaasModule {
|
|
||||||
def name = "org.apache.zookeeper.server.auth.DigestLoginModule"
|
|
||||||
}
|
|
||||||
|
|
||||||
case class ScramLoginModule(username: String,
|
|
||||||
password: String,
|
|
||||||
debug: Boolean = false,
|
|
||||||
tokenProps: Map[String, String] = Map.empty) extends JaasModule {
|
|
||||||
|
|
||||||
def name = "org.apache.kafka.common.security.scram.ScramLoginModule"
|
|
||||||
|
|
||||||
def entries: Map[String, String] = Map(
|
|
||||||
"username" -> username,
|
|
||||||
"password" -> password
|
|
||||||
) ++ tokenProps.map { case (name, value) => name -> value }
|
|
||||||
}
|
|
||||||
|
|
||||||
case class OAuthBearerLoginModule(username: String,
|
|
||||||
debug: Boolean = false) extends JaasModule {
|
|
||||||
|
|
||||||
def name = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule"
|
|
||||||
|
|
||||||
def entries: Map[String, String] = Map(
|
|
||||||
"unsecuredLoginStringClaim_sub" -> username
|
|
||||||
)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
sealed trait JaasModule {
|
|
||||||
def name: String
|
|
||||||
def debug: Boolean
|
|
||||||
def entries: Map[String, String]
|
|
||||||
|
|
||||||
override def toString: String = {
|
|
||||||
s"""$name required
|
|
||||||
| debug=$debug
|
|
||||||
| ${entries.map { case (k, v) => s"""$k="$v"""" }.mkString("", "\n| ", ";")}
|
|
||||||
|""".stripMargin
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
case class JaasSection(contextName: String, modules: Seq[JaasModule]) {
|
|
||||||
override def toString: String = {
|
|
||||||
s"""|$contextName {
|
|
||||||
| ${modules.mkString("\n ")}
|
|
||||||
|};
|
|
||||||
|""".stripMargin
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private val isIbmSecurity = Java.isIbmJdk && !Java.isIbmJdkSemeru
|
|
||||||
|
|
||||||
private val ZkServerContextName = "Server"
|
|
||||||
private val ZkClientContextName = "Client"
|
|
||||||
private val ZkUserSuperPasswd = "adminpasswd"
|
|
||||||
private val ZkUser = "fpj"
|
|
||||||
private val ZkUserPassword = "fpjsecret"
|
|
||||||
|
|
||||||
val KafkaServerContextName = "KafkaServer"
|
|
||||||
val KafkaServerPrincipalUnqualifiedName = "kafka"
|
|
||||||
private val KafkaServerPrincipal = KafkaServerPrincipalUnqualifiedName + "/localhost@EXAMPLE.COM"
|
|
||||||
val KafkaClientContextName = "KafkaClient"
|
|
||||||
val KafkaClientPrincipalUnqualifiedName = "client"
|
|
||||||
private val KafkaClientPrincipal = KafkaClientPrincipalUnqualifiedName + "@EXAMPLE.COM"
|
|
||||||
val KafkaClientPrincipalUnqualifiedName2 = "client2"
|
|
||||||
private val KafkaClientPrincipal2 = KafkaClientPrincipalUnqualifiedName2 + "@EXAMPLE.COM"
|
|
||||||
|
|
||||||
val KafkaPlainUser = "plain-user"
|
|
||||||
private val KafkaPlainPassword = "plain-user-secret"
|
|
||||||
val KafkaPlainUser2 = "plain-user2"
|
|
||||||
val KafkaPlainPassword2 = "plain-user2-secret"
|
|
||||||
val KafkaPlainAdmin = "plain-admin"
|
|
||||||
private val KafkaPlainAdminPassword = "plain-admin-secret"
|
|
||||||
|
|
||||||
val KafkaScramUser = "scram-user"
|
|
||||||
val KafkaScramPassword = "scram-user-secret"
|
|
||||||
val KafkaScramUser2 = "scram-user2"
|
|
||||||
val KafkaScramPassword2 = "scram-user2-secret"
|
|
||||||
val KafkaScramAdmin = "scram-admin"
|
|
||||||
val KafkaScramAdminPassword = "scram-admin-secret"
|
|
||||||
|
|
||||||
val KafkaOAuthBearerUser = "oauthbearer-user"
|
|
||||||
val KafkaOAuthBearerUser2 = "oauthbearer-user2"
|
|
||||||
val KafkaOAuthBearerAdmin = "oauthbearer-admin"
|
|
||||||
|
|
||||||
val serviceName = "kafka"
|
|
||||||
|
|
||||||
def saslConfigs(saslProperties: Option[Properties]): Properties = {
|
|
||||||
val result = saslProperties.getOrElse(new Properties)
|
|
||||||
// IBM Kerberos module doesn't support the serviceName JAAS property, hence it needs to be
|
|
||||||
// passed as a Kafka property
|
|
||||||
if (isIbmSecurity && !result.contains(SaslConfigs.SASL_KERBEROS_SERVICE_NAME))
|
|
||||||
result.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, serviceName)
|
|
||||||
result
|
|
||||||
}
|
|
||||||
|
|
||||||
def writeJaasContextsToFile(jaasSections: Seq[JaasSection]): File = {
|
|
||||||
val jaasFile = TestUtils.tempFile()
|
|
||||||
writeToFile(jaasFile, jaasSections)
|
|
||||||
jaasFile
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns a SASL/SCRAM configuration using credentials for the given user and password
|
|
||||||
def scramClientLoginModule(mechanism: String, scramUser: String, scramPassword: String): String = {
|
|
||||||
if (ScramMechanism.fromMechanismName(mechanism) == ScramMechanism.UNKNOWN) {
|
|
||||||
throw new IllegalArgumentException("Unsupported SCRAM mechanism " + mechanism)
|
|
||||||
}
|
|
||||||
ScramLoginModule(
|
|
||||||
scramUser,
|
|
||||||
scramPassword
|
|
||||||
).toString
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns the dynamic configuration, using credentials for user #1
|
|
||||||
def clientLoginModule(mechanism: String, keytabLocation: Option[File], serviceName: String = serviceName): String =
|
|
||||||
kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword, KafkaOAuthBearerUser, serviceName).toString
|
|
||||||
|
|
||||||
// Returns the dynamic configuration, using credentials for admin
|
|
||||||
def adminLoginModule(mechanism: String, keytabLocation: Option[File], serviceName: String = serviceName): String =
|
|
||||||
kafkaClientModule(mechanism, keytabLocation, KafkaServerPrincipal, KafkaPlainAdmin, KafkaPlainAdminPassword,
|
|
||||||
KafkaScramAdmin, KafkaScramAdminPassword, KafkaOAuthBearerAdmin, serviceName).toString
|
|
||||||
|
|
||||||
def tokenClientLoginModule(tokenId: String, password: String): String = {
|
|
||||||
ScramLoginModule(
|
|
||||||
tokenId,
|
|
||||||
password,
|
|
||||||
debug = false,
|
|
||||||
Map(
|
|
||||||
"tokenauth" -> "true"
|
|
||||||
)).toString
|
|
||||||
}
|
|
||||||
|
|
||||||
def zkSections: Seq[JaasSection] = Seq(
|
|
||||||
JaasSection(ZkServerContextName, Seq(ZkDigestModule(debug = false,
|
|
||||||
Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)))),
|
|
||||||
JaasSection(ZkClientContextName, Seq(ZkDigestModule(debug = false,
|
|
||||||
Map("username" -> ZkUser, "password" -> ZkUserPassword))))
|
|
||||||
)
|
|
||||||
|
|
||||||
def kafkaServerSection(contextName: String, mechanisms: Seq[String], keytabLocation: Option[File]): JaasSection = {
|
|
||||||
val modules = mechanisms.map {
|
|
||||||
case "GSSAPI" =>
|
|
||||||
Krb5LoginModule(
|
|
||||||
useKeyTab = true,
|
|
||||||
storeKey = true,
|
|
||||||
keyTab = keytabLocation.getOrElse(throw new IllegalArgumentException("Keytab location not specified for GSSAPI")).getAbsolutePath,
|
|
||||||
principal = KafkaServerPrincipal,
|
|
||||||
debug = true,
|
|
||||||
serviceName = Some(serviceName),
|
|
||||||
isIbmSecurity)
|
|
||||||
case "PLAIN" =>
|
|
||||||
PlainLoginModule(
|
|
||||||
KafkaPlainAdmin,
|
|
||||||
KafkaPlainAdminPassword,
|
|
||||||
debug = false,
|
|
||||||
Map(
|
|
||||||
KafkaPlainAdmin -> KafkaPlainAdminPassword,
|
|
||||||
KafkaPlainUser -> KafkaPlainPassword,
|
|
||||||
KafkaPlainUser2 -> KafkaPlainPassword2
|
|
||||||
))
|
|
||||||
case "OAUTHBEARER" =>
|
|
||||||
OAuthBearerLoginModule(KafkaOAuthBearerAdmin)
|
|
||||||
case mechanism => {
|
|
||||||
if (ScramMechanism.fromMechanismName(mechanism) != ScramMechanism.UNKNOWN) {
|
|
||||||
ScramLoginModule(
|
|
||||||
KafkaScramAdmin,
|
|
||||||
KafkaScramAdminPassword)
|
|
||||||
} else {
|
|
||||||
throw new IllegalArgumentException("Unsupported server mechanism " + mechanism)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
JaasSection(contextName, modules)
|
|
||||||
}
|
|
||||||
|
|
||||||
// consider refactoring if more mechanisms are added
|
|
||||||
private def kafkaClientModule(mechanism: String,
|
|
||||||
keytabLocation: Option[File], clientPrincipal: String,
|
|
||||||
plainUser: String, plainPassword: String,
|
|
||||||
scramUser: String, scramPassword: String,
|
|
||||||
oauthBearerUser: String, serviceName: String = serviceName): JaasModule = {
|
|
||||||
mechanism match {
|
|
||||||
case "GSSAPI" =>
|
|
||||||
Krb5LoginModule(
|
|
||||||
useKeyTab = true,
|
|
||||||
storeKey = true,
|
|
||||||
keyTab = keytabLocation.getOrElse(throw new IllegalArgumentException("Keytab location not specified for GSSAPI")).getAbsolutePath,
|
|
||||||
principal = clientPrincipal,
|
|
||||||
debug = true,
|
|
||||||
serviceName = Some(serviceName),
|
|
||||||
isIbmSecurity
|
|
||||||
)
|
|
||||||
case "PLAIN" =>
|
|
||||||
PlainLoginModule(
|
|
||||||
plainUser,
|
|
||||||
plainPassword
|
|
||||||
)
|
|
||||||
case "OAUTHBEARER" =>
|
|
||||||
OAuthBearerLoginModule(
|
|
||||||
oauthBearerUser
|
|
||||||
)
|
|
||||||
case mechanism => {
|
|
||||||
if (ScramMechanism.fromMechanismName(mechanism) != ScramMechanism.UNKNOWN) {
|
|
||||||
ScramLoginModule(
|
|
||||||
scramUser,
|
|
||||||
scramPassword
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
throw new IllegalArgumentException("Unsupported client mechanism " + mechanism)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Used for the static JAAS configuration and it uses the credentials for client#2
|
|
||||||
*/
|
|
||||||
def kafkaClientSection(mechanism: Option[String], keytabLocation: Option[File]): JaasSection = {
|
|
||||||
JaasSection(KafkaClientContextName, mechanism.map(m =>
|
|
||||||
kafkaClientModule(m, keytabLocation, KafkaClientPrincipal2, KafkaPlainUser2, KafkaPlainPassword2, KafkaScramUser2, KafkaScramPassword2, KafkaOAuthBearerUser2)).toSeq)
|
|
||||||
}
|
|
||||||
|
|
||||||
private def jaasSectionsToString(jaasSections: Seq[JaasSection]): String =
|
|
||||||
jaasSections.mkString
|
|
||||||
|
|
||||||
private def writeToFile(file: File, jaasSections: Seq[JaasSection]): Unit = {
|
|
||||||
val writer = new BufferedWriter(new FileWriter(file))
|
|
||||||
try writer.write(jaasSectionsToString(jaasSections))
|
|
||||||
finally writer.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -21,6 +21,7 @@ import kafka.api._
|
||||||
import kafka.controller.ControllerEventManager
|
import kafka.controller.ControllerEventManager
|
||||||
import kafka.log._
|
import kafka.log._
|
||||||
import kafka.network.RequestChannel
|
import kafka.network.RequestChannel
|
||||||
|
import kafka.security.JaasTestUtils
|
||||||
import kafka.server._
|
import kafka.server._
|
||||||
import kafka.server.checkpoints.OffsetCheckpointFile
|
import kafka.server.checkpoints.OffsetCheckpointFile
|
||||||
import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
|
import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
|
||||||
|
@ -42,7 +43,7 @@ import org.apache.kafka.common.internals.Topic
|
||||||
import org.apache.kafka.common.memory.MemoryPool
|
import org.apache.kafka.common.memory.MemoryPool
|
||||||
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
|
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
|
||||||
import org.apache.kafka.common.metrics.Metrics
|
import org.apache.kafka.common.metrics.Metrics
|
||||||
import org.apache.kafka.common.network.{ClientInformation, ListenerName, ConnectionMode}
|
import org.apache.kafka.common.network.{ClientInformation, ConnectionMode, ListenerName}
|
||||||
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
||||||
import org.apache.kafka.common.record._
|
import org.apache.kafka.common.record._
|
||||||
import org.apache.kafka.common.requests._
|
import org.apache.kafka.common.requests._
|
||||||
|
@ -88,6 +89,7 @@ import scala.collection.{Map, Seq, mutable}
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import scala.concurrent.duration.FiniteDuration
|
||||||
import scala.concurrent.{Await, ExecutionContext, Future}
|
import scala.concurrent.{Await, ExecutionContext, Future}
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
|
import scala.jdk.OptionConverters.RichOption
|
||||||
import scala.util.{Failure, Success, Try}
|
import scala.util.{Failure, Success, Try}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -360,7 +362,7 @@ object TestUtils extends Logging {
|
||||||
props ++= sslConfigs(ConnectionMode.SERVER, false, trustStoreFile, s"server$nodeId")
|
props ++= sslConfigs(ConnectionMode.SERVER, false, trustStoreFile, s"server$nodeId")
|
||||||
|
|
||||||
if (protocolAndPorts.exists { case (protocol, _) => usesSaslAuthentication(protocol) })
|
if (protocolAndPorts.exists { case (protocol, _) => usesSaslAuthentication(protocol) })
|
||||||
props ++= JaasTestUtils.saslConfigs(saslProperties)
|
props ++= JaasTestUtils.saslConfigs(saslProperties.toJava)
|
||||||
|
|
||||||
interBrokerSecurityProtocol.foreach { protocol =>
|
interBrokerSecurityProtocol.foreach { protocol =>
|
||||||
props.put(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG, protocol.name)
|
props.put(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG, protocol.name)
|
||||||
|
@ -670,7 +672,7 @@ object TestUtils extends Logging {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (usesSaslAuthentication(securityProtocol))
|
if (usesSaslAuthentication(securityProtocol))
|
||||||
props ++= JaasTestUtils.saslConfigs(saslProperties)
|
props ++= JaasTestUtils.saslConfigs(saslProperties.toJava)
|
||||||
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
|
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
|
||||||
props
|
props
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,7 @@ package org.apache.kafka.tools.consumer.group;
|
||||||
|
|
||||||
import kafka.api.AbstractSaslTest;
|
import kafka.api.AbstractSaslTest;
|
||||||
import kafka.api.Both$;
|
import kafka.api.Both$;
|
||||||
import kafka.utils.JaasTestUtils;
|
import kafka.security.JaasTestUtils;
|
||||||
import kafka.zk.ConfigEntityChangeNotificationZNode;
|
import kafka.zk.ConfigEntityChangeNotificationZNode;
|
||||||
|
|
||||||
import org.apache.kafka.clients.admin.Admin;
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
|
@ -91,20 +91,20 @@ public class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
|
||||||
super.configureSecurityBeforeServersStart(testInfo);
|
super.configureSecurityBeforeServersStart(testInfo);
|
||||||
zkClient().makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path());
|
zkClient().makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path());
|
||||||
// Create broker credentials before starting brokers
|
// Create broker credentials before starting brokers
|
||||||
createScramCredentials(zkConnect(), JaasTestUtils.KafkaScramAdmin(), JaasTestUtils.KafkaScramAdminPassword());
|
createScramCredentials(zkConnect(), JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Admin createPrivilegedAdminClient() {
|
public Admin createPrivilegedAdminClient() {
|
||||||
return createAdminClient(bootstrapServers(listenerName()), securityProtocol(), trustStoreFile(), clientSaslProperties(),
|
return createAdminClient(bootstrapServers(listenerName()), securityProtocol(), trustStoreFile(), clientSaslProperties(),
|
||||||
KAFKA_CLIENT_SASL_MECHANISM, JaasTestUtils.KafkaScramAdmin(), JaasTestUtils.KafkaScramAdminPassword());
|
KAFKA_CLIENT_SASL_MECHANISM, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD);
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
@Override
|
@Override
|
||||||
public void setUp(TestInfo testInfo) {
|
public void setUp(TestInfo testInfo) {
|
||||||
startSasl(jaasSections(KAFKA_SERVER_SASL_MECHANISMS, Some$.MODULE$.apply(KAFKA_CLIENT_SASL_MECHANISM), Both$.MODULE$,
|
startSasl(jaasSections(KAFKA_SERVER_SASL_MECHANISMS, Some$.MODULE$.apply(KAFKA_CLIENT_SASL_MECHANISM), Both$.MODULE$,
|
||||||
JaasTestUtils.KafkaServerContextName()));
|
JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME));
|
||||||
super.setUp(testInfo);
|
super.setUp(testInfo);
|
||||||
createTopic(
|
createTopic(
|
||||||
TOPIC,
|
TOPIC,
|
||||||
|
@ -136,7 +136,7 @@ public class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConsumerGroupServiceWithAuthenticationSuccess() throws Exception {
|
public void testConsumerGroupServiceWithAuthenticationSuccess() throws Exception {
|
||||||
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser2(), JaasTestUtils.KafkaScramPassword2());
|
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KAFKA_SCRAM_USER_2, JaasTestUtils.KAFKA_SCRAM_PASSWORD_2);
|
||||||
ConsumerGroupCommand.ConsumerGroupService consumerGroupService = prepareConsumerGroupService();
|
ConsumerGroupCommand.ConsumerGroupService consumerGroupService = prepareConsumerGroupService();
|
||||||
try (Consumer<byte[], byte[]> consumer = createConsumer()) {
|
try (Consumer<byte[], byte[]> consumer = createConsumer()) {
|
||||||
consumer.subscribe(Collections.singletonList(TOPIC));
|
consumer.subscribe(Collections.singletonList(TOPIC));
|
||||||
|
|
Loading…
Reference in New Issue