mirror of https://github.com/apache/kafka.git
KAFKA-16629 Add broker-related tests to ConfigCommandIntegrationTest (#15840)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
0971924ebc
commit
21caf6b123
|
@ -130,5 +130,6 @@
|
||||||
<allow pkg="kafka.test"/>
|
<allow pkg="kafka.test"/>
|
||||||
<allow pkg="kafka.test.annotation"/>
|
<allow pkg="kafka.test.annotation"/>
|
||||||
<allow pkg="kafka.test.junit"/>
|
<allow pkg="kafka.test.junit"/>
|
||||||
|
<allow pkg="org.apache.kafka.clients.admin" />
|
||||||
</subpackage>
|
</subpackage>
|
||||||
</import-control>
|
</import-control>
|
||||||
|
|
|
@ -19,7 +19,7 @@ package kafka.admin
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets
|
import java.nio.charset.StandardCharsets
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.{Collections, Properties}
|
import java.util.{Collections, Optional, Properties}
|
||||||
import joptsimple._
|
import joptsimple._
|
||||||
import kafka.server.{DynamicBrokerConfig, DynamicConfig, KafkaConfig}
|
import kafka.server.{DynamicBrokerConfig, DynamicConfig, KafkaConfig}
|
||||||
import kafka.utils.Implicits._
|
import kafka.utils.Implicits._
|
||||||
|
@ -210,15 +210,19 @@ object ConfigCommand extends Logging {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def createPasswordEncoder(encoderConfigs: Map[String, String]): PasswordEncoder = {
|
def createPasswordEncoder(encoderConfigs: java.util.Map[String, String]): PasswordEncoder = {
|
||||||
encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG)
|
val encoderSecret = Optional.ofNullable(encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG))
|
||||||
val encoderSecret = encoderConfigs.getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG,
|
.orElseThrow(() => new IllegalArgumentException("Password encoder secret not specified"))
|
||||||
throw new IllegalArgumentException("Password encoder secret not specified"))
|
|
||||||
PasswordEncoder.encrypting(new Password(encoderSecret),
|
PasswordEncoder.encrypting(new Password(encoderSecret),
|
||||||
null,
|
null,
|
||||||
encoderConfigs.getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT),
|
encoderConfigs.getOrDefault(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT),
|
||||||
encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG).map(_.toInt).getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT),
|
Optional.ofNullable(encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG))
|
||||||
encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG).map(_.toInt).getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT))
|
.map[Int](Integer.parseInt)
|
||||||
|
.orElse(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT),
|
||||||
|
Optional.ofNullable(encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG))
|
||||||
|
.map[Int](Integer.parseInt)
|
||||||
|
.orElse(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT)
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -244,8 +248,11 @@ object ConfigCommand extends Logging {
|
||||||
" to override the default encoding parameters. Password encoder configs will not be persisted" +
|
" to override the default encoding parameters. Password encoder configs will not be persisted" +
|
||||||
" in ZooKeeper."
|
" in ZooKeeper."
|
||||||
)
|
)
|
||||||
|
val passwordConfigsMap = new java.util.HashMap[String, String]
|
||||||
val passwordEncoder = createPasswordEncoder(passwordEncoderConfigs.asScala)
|
passwordEncoderConfigs.forEach { (key, value) =>
|
||||||
|
passwordConfigsMap.put(key.toString, value.toString)
|
||||||
|
}
|
||||||
|
val passwordEncoder = createPasswordEncoder(passwordConfigsMap)
|
||||||
passwordConfigs.foreach { configName =>
|
passwordConfigs.foreach { configName =>
|
||||||
val encodedValue = passwordEncoder.encode(new Password(configsToBeAdded.getProperty(configName)))
|
val encodedValue = passwordEncoder.encode(new Password(configsToBeAdded.getProperty(configName)))
|
||||||
configsToBeAdded.setProperty(configName, encodedValue)
|
configsToBeAdded.setProperty(configName, encodedValue)
|
||||||
|
|
|
@ -43,6 +43,10 @@ object Broker {
|
||||||
new Broker(id, endPoints, rack, emptySupportedFeatures)
|
new Broker(id, endPoints, rack, emptySupportedFeatures)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def apply(id: Int, endPoint: EndPoint, rack: Option[String]): Broker = {
|
||||||
|
new Broker(id, Seq(endPoint), rack, emptySupportedFeatures)
|
||||||
|
}
|
||||||
|
|
||||||
private def supportedFeatures(features: java.util.Map[String, VersionRange]): java.util
|
private def supportedFeatures(features: java.util.Map[String, VersionRange]): java.util
|
||||||
.Map[String, SupportedVersionRange] = {
|
.Map[String, SupportedVersionRange] = {
|
||||||
features.asScala.map { case (name, range) =>
|
features.asScala.map { case (name, range) =>
|
||||||
|
|
|
@ -26,23 +26,22 @@ import kafka.test.junit.ZkClusterInvocationContext;
|
||||||
import kafka.zk.AdminZkClient;
|
import kafka.zk.AdminZkClient;
|
||||||
import kafka.zk.BrokerInfo;
|
import kafka.zk.BrokerInfo;
|
||||||
import kafka.zk.KafkaZkClient;
|
import kafka.zk.KafkaZkClient;
|
||||||
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
import org.apache.kafka.common.config.ConfigException;
|
import org.apache.kafka.common.config.ConfigException;
|
||||||
|
import org.apache.kafka.common.config.ConfigResource;
|
||||||
import org.apache.kafka.common.network.ListenerName;
|
import org.apache.kafka.common.network.ListenerName;
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
import org.apache.kafka.common.utils.Exit;
|
import org.apache.kafka.common.utils.Exit;
|
||||||
import org.apache.kafka.security.PasswordEncoder;
|
import org.apache.kafka.security.PasswordEncoder;
|
||||||
import org.apache.kafka.security.PasswordEncoderConfigs;
|
|
||||||
import org.apache.kafka.server.common.MetadataVersion;
|
import org.apache.kafka.server.common.MetadataVersion;
|
||||||
import org.apache.kafka.server.config.ZooKeeperInternals;
|
import org.apache.kafka.server.config.ZooKeeperInternals;
|
||||||
|
import org.apache.kafka.test.TestUtils;
|
||||||
import org.junit.jupiter.api.Tag;
|
import org.junit.jupiter.api.Tag;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
import scala.collection.JavaConverters;
|
import org.junit.platform.commons.util.StringUtils;
|
||||||
import scala.collection.Seq;
|
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -50,68 +49,364 @@ import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static java.util.Arrays.asList;
|
||||||
|
import static java.util.Collections.singleton;
|
||||||
|
import static java.util.Collections.singletonList;
|
||||||
|
import static java.util.Collections.singletonMap;
|
||||||
|
import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG;
|
||||||
|
import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG;
|
||||||
|
import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG;
|
||||||
|
import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG;
|
||||||
|
import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
|
|
||||||
@ExtendWith(value = ClusterTestExtensions.class)
|
@ExtendWith(value = ClusterTestExtensions.class)
|
||||||
@Tag("integration")
|
@Tag("integration")
|
||||||
public class ConfigCommandIntegrationTest {
|
public class ConfigCommandIntegrationTest {
|
||||||
AdminZkClient adminZkClient;
|
|
||||||
List<String> alterOpts;
|
|
||||||
|
|
||||||
|
private List<String> alterOpts;
|
||||||
|
private final String defaultBrokerId = "0";
|
||||||
private final ClusterInstance cluster;
|
private final ClusterInstance cluster;
|
||||||
|
|
||||||
|
private static Runnable run(Stream<String> command) {
|
||||||
|
return () -> {
|
||||||
|
try {
|
||||||
|
ConfigCommand.main(command.toArray(String[]::new));
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
// do nothing.
|
||||||
|
} finally {
|
||||||
|
Exit.resetExitProcedure();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
public ConfigCommandIntegrationTest(ClusterInstance cluster) {
|
public ConfigCommandIntegrationTest(ClusterInstance cluster) {
|
||||||
this.cluster = cluster;
|
this.cluster = cluster;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ClusterTest(types = {Type.ZK, Type.KRAFT})
|
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT})
|
||||||
public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
|
public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
|
||||||
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
|
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
|
||||||
"--entity-name", cluster.isKRaftTest() ? "0" : "1",
|
"--entity-name", cluster.isKRaftTest() ? "0" : "1",
|
||||||
"--entity-type", "brokers",
|
"--entity-type", "brokers",
|
||||||
"--alter",
|
"--alter",
|
||||||
"--add-config", "security.inter.broker.protocol=PLAINTEXT")),
|
"--add-config", "security.inter.broker.protocol=PLAINTEXT")),
|
||||||
errOut ->
|
errOut -> assertTrue(errOut.contains("Cannot update these configs dynamically: Set(security.inter.broker.protocol)"), errOut));
|
||||||
assertTrue(errOut.contains("Cannot update these configs dynamically: Set(security.inter.broker.protocol)"), errOut));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ClusterTest(types = {Type.ZK})
|
@ClusterTest(types = {Type.ZK})
|
||||||
public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() {
|
public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() {
|
||||||
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
|
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
|
||||||
"--entity-type", "users",
|
"--entity-type", "users",
|
||||||
"--entity-name", "admin",
|
"--entity-name", "admin",
|
||||||
"--alter", "--add-config", "consumer_byte_rate=20000")),
|
"--alter", "--add-config", "consumer_byte_rate=20000")),
|
||||||
errOut ->
|
errOut -> assertTrue(errOut.contains("User configuration updates using ZooKeeper are only supported for SCRAM credential updates."), errOut));
|
||||||
assertTrue(errOut.contains("User configuration updates using ZooKeeper are only supported for SCRAM credential updates."), errOut));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void assertNonZeroStatusExit(Stream<String> args, Consumer<String> checkErrOut) {
|
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
|
||||||
|
public void testNullStatusOnKraftCommandAlterUserQuota() {
|
||||||
|
Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
|
||||||
|
"--entity-type", "users",
|
||||||
|
"--entity-name", "admin",
|
||||||
|
"--alter", "--add-config", "consumer_byte_rate=20000"));
|
||||||
|
String message = captureStandardMsg(run(command));
|
||||||
|
|
||||||
|
assertTrue(StringUtils.isBlank(message), message);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest(types = Type.ZK)
|
||||||
|
public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception {
|
||||||
|
cluster.shutdownBroker(0);
|
||||||
|
String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect();
|
||||||
|
KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient();
|
||||||
|
|
||||||
|
String brokerId = "1";
|
||||||
|
AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
|
||||||
|
alterOpts = asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter");
|
||||||
|
|
||||||
|
// Add config
|
||||||
|
alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
|
||||||
|
singletonMap("message.max.bytes", "110000"));
|
||||||
|
alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
|
||||||
|
singletonMap("message.max.bytes", "120000"));
|
||||||
|
|
||||||
|
// Change config
|
||||||
|
alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
|
||||||
|
singletonMap("message.max.bytes", "130000"));
|
||||||
|
alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
|
||||||
|
singletonMap("message.max.bytes", "140000"));
|
||||||
|
|
||||||
|
// Delete config
|
||||||
|
deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
|
||||||
|
singleton("message.max.bytes"));
|
||||||
|
deleteAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
|
||||||
|
singleton("message.max.bytes"));
|
||||||
|
|
||||||
|
// Listener configs: should work only with listener name
|
||||||
|
alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
|
||||||
|
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"));
|
||||||
|
assertThrows(ConfigException.class,
|
||||||
|
() -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId),
|
||||||
|
singletonMap("ssl.keystore.location", "/tmp/test.jks")));
|
||||||
|
|
||||||
|
// Per-broker config configured at default cluster-level should fail
|
||||||
|
assertThrows(ConfigException.class,
|
||||||
|
() -> alterConfigWithZk(zkClient, adminZkClient, Optional.empty(),
|
||||||
|
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks")));
|
||||||
|
deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
|
||||||
|
singleton("listener.name.internal.ssl.keystore.location"));
|
||||||
|
|
||||||
|
// Password config update without encoder secret should fail
|
||||||
|
assertThrows(IllegalArgumentException.class,
|
||||||
|
() -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId),
|
||||||
|
singletonMap("listener.name.external.ssl.keystore.password", "secret")));
|
||||||
|
|
||||||
|
// Password config update with encoder secret should succeed and encoded password must be stored in ZK
|
||||||
|
Map<String, String> configs = new HashMap<>();
|
||||||
|
configs.put("listener.name.external.ssl.keystore.password", "secret");
|
||||||
|
configs.put("log.cleaner.threads", "2");
|
||||||
|
Map<String, String> encoderConfigs = new HashMap<>(configs);
|
||||||
|
encoderConfigs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
|
||||||
|
alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), encoderConfigs);
|
||||||
|
Properties brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId);
|
||||||
|
assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper");
|
||||||
|
assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")); // not encoded
|
||||||
|
String encodedPassword = brokerConfigs.getProperty("listener.name.external.ssl.keystore.password");
|
||||||
|
PasswordEncoder passwordEncoder = ConfigCommand.createPasswordEncoder(encoderConfigs);
|
||||||
|
assertEquals("secret", passwordEncoder.decode(encodedPassword).value());
|
||||||
|
assertEquals(configs.size(), brokerConfigs.size());
|
||||||
|
|
||||||
|
// Password config update with overrides for encoder parameters
|
||||||
|
Map<String, String> encoderConfigs2 = generateEncodeConfig();
|
||||||
|
alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), encoderConfigs2);
|
||||||
|
Properties brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId);
|
||||||
|
String encodedPassword2 = brokerConfigs2.getProperty("listener.name.external.ssl.keystore.password");
|
||||||
|
assertEquals("secret2", ConfigCommand.createPasswordEncoder(encoderConfigs)
|
||||||
|
.decode(encodedPassword2).value());
|
||||||
|
assertEquals("secret2", ConfigCommand.createPasswordEncoder(encoderConfigs2)
|
||||||
|
.decode(encodedPassword2).value());
|
||||||
|
|
||||||
|
// Password config update at default cluster-level should fail
|
||||||
|
assertThrows(ConfigException.class,
|
||||||
|
() -> alterConfigWithZk(zkClient, adminZkClient, Optional.empty(), encoderConfigs));
|
||||||
|
|
||||||
|
// Dynamic config updates using ZK should fail if broker is running.
|
||||||
|
registerBrokerInZk(zkClient, Integer.parseInt(brokerId));
|
||||||
|
assertThrows(IllegalArgumentException.class,
|
||||||
|
() -> alterConfigWithZk(zkClient, adminZkClient,
|
||||||
|
Optional.of(brokerId), singletonMap("message.max.bytes", "210000")));
|
||||||
|
assertThrows(IllegalArgumentException.class,
|
||||||
|
() -> alterConfigWithZk(zkClient, adminZkClient,
|
||||||
|
Optional.empty(), singletonMap("message.max.bytes", "220000")));
|
||||||
|
|
||||||
|
// Dynamic config updates using ZK should for a different broker that is not running should succeed
|
||||||
|
alterAndVerifyConfig(zkClient, adminZkClient, Optional.of("2"), singletonMap("message.max.bytes", "230000"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
|
||||||
|
public void testDynamicBrokerConfigUpdateUsingKraft() throws Exception {
|
||||||
|
alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
|
||||||
|
|
||||||
|
try (Admin client = cluster.createAdminClient()) {
|
||||||
|
// Add config
|
||||||
|
alterAndVerifyConfig(client, Optional.of(defaultBrokerId), singletonMap("message.max.bytes", "110000"));
|
||||||
|
alterAndVerifyConfig(client, Optional.empty(), singletonMap("message.max.bytes", "120000"));
|
||||||
|
|
||||||
|
// Change config
|
||||||
|
alterAndVerifyConfig(client, Optional.of(defaultBrokerId), singletonMap("message.max.bytes", "130000"));
|
||||||
|
alterAndVerifyConfig(client, Optional.empty(), singletonMap("message.max.bytes", "140000"));
|
||||||
|
|
||||||
|
// Delete config
|
||||||
|
deleteAndVerifyConfig(client, Optional.of(defaultBrokerId), singleton("message.max.bytes"));
|
||||||
|
|
||||||
|
// Listener configs: should work only with listener name
|
||||||
|
alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"));
|
||||||
|
alterConfigWithKraft(client, Optional.empty(),
|
||||||
|
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"));
|
||||||
|
deleteAndVerifyConfig(client, Optional.of(defaultBrokerId),
|
||||||
|
singleton("listener.name.internal.ssl.keystore.location"));
|
||||||
|
alterConfigWithKraft(client, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap("listener.name.external.ssl.keystore.password", "secret"));
|
||||||
|
|
||||||
|
// Password config update with encoder secret should succeed and encoded password must be stored in ZK
|
||||||
|
Map<String, String> configs = new HashMap<>();
|
||||||
|
configs.put("listener.name.external.ssl.keystore.password", "secret");
|
||||||
|
configs.put("log.cleaner.threads", "2");
|
||||||
|
// Password encoder configs
|
||||||
|
configs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
|
||||||
|
|
||||||
|
// Password config update at default cluster-level should fail
|
||||||
|
assertThrows(ExecutionException.class,
|
||||||
|
() -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), configs));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest(types = {Type.ZK})
|
||||||
|
public void testAlterReadOnlyConfigInZookeeperThenShouldFail() {
|
||||||
|
cluster.shutdownBroker(0);
|
||||||
|
String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect();
|
||||||
|
KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient();
|
||||||
|
AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
|
||||||
|
alterOpts = generateDefaultAlterOpts(zkConnect);
|
||||||
|
|
||||||
|
assertThrows(ConfigException.class,
|
||||||
|
() -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap("auto.create.topics.enable", "false")));
|
||||||
|
assertThrows(ConfigException.class,
|
||||||
|
() -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap("auto.leader.rebalance.enable", "false")));
|
||||||
|
assertThrows(ConfigException.class,
|
||||||
|
() -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap("broker.id", "1")));
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
|
||||||
|
public void testAlterReadOnlyConfigInKRaftThenShouldFail() {
|
||||||
|
alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
|
||||||
|
|
||||||
|
try (Admin client = cluster.createAdminClient()) {
|
||||||
|
assertThrows(ExecutionException.class,
|
||||||
|
() -> alterConfigWithKraft(client, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap("auto.create.topics.enable", "false")));
|
||||||
|
assertThrows(ExecutionException.class,
|
||||||
|
() -> alterConfigWithKraft(client, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap("auto.leader.rebalance.enable", "false")));
|
||||||
|
assertThrows(ExecutionException.class,
|
||||||
|
() -> alterConfigWithKraft(client, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap("broker.id", "1")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest(types = {Type.ZK})
|
||||||
|
public void testUpdateClusterWideConfigInZookeeperThenShouldSuccessful() {
|
||||||
|
cluster.shutdownBroker(0);
|
||||||
|
String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect();
|
||||||
|
KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient();
|
||||||
|
AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
|
||||||
|
alterOpts = generateDefaultAlterOpts(zkConnect);
|
||||||
|
|
||||||
|
Map<String, String> configs = new HashMap<>();
|
||||||
|
configs.put("log.flush.interval.messages", "100");
|
||||||
|
configs.put("log.retention.bytes", "20");
|
||||||
|
configs.put("log.retention.ms", "2");
|
||||||
|
|
||||||
|
alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(defaultBrokerId), configs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
|
||||||
|
public void testUpdateClusterWideConfigInKRaftThenShouldSuccessful() throws Exception {
|
||||||
|
alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
|
||||||
|
|
||||||
|
try (Admin client = cluster.createAdminClient()) {
|
||||||
|
alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap("log.flush.interval.messages", "100"));
|
||||||
|
alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap("log.retention.bytes", "20"));
|
||||||
|
alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap("log.retention.ms", "2"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest(types = {Type.ZK})
|
||||||
|
public void testUpdatePerBrokerConfigWithListenerNameInZookeeperThenShouldSuccessful() {
|
||||||
|
cluster.shutdownBroker(0);
|
||||||
|
String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect();
|
||||||
|
KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient();
|
||||||
|
AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
|
||||||
|
alterOpts = generateDefaultAlterOpts(zkConnect);
|
||||||
|
|
||||||
|
String listenerName = "listener.name.internal.";
|
||||||
|
String sslTruststoreType = listenerName + "ssl.truststore.type";
|
||||||
|
String sslTruststoreLocation = listenerName + "ssl.truststore.location";
|
||||||
|
String sslTruststorePassword = listenerName + "ssl.truststore.password";
|
||||||
|
|
||||||
|
Map<String, String> configs = new HashMap<>();
|
||||||
|
configs.put(sslTruststoreType, "PKCS12");
|
||||||
|
configs.put(sslTruststoreLocation, "/temp/test.jks");
|
||||||
|
configs.put("password.encoder.secret", "encoder-secret");
|
||||||
|
configs.put(sslTruststorePassword, "password");
|
||||||
|
|
||||||
|
alterConfigWithZk(zkClient, adminZkClient, Optional.of(defaultBrokerId), configs);
|
||||||
|
|
||||||
|
Properties properties = zkClient.getEntityConfigs("brokers", defaultBrokerId);
|
||||||
|
assertTrue(properties.containsKey(sslTruststorePassword));
|
||||||
|
assertEquals(configs.get(sslTruststoreType), properties.getProperty(sslTruststoreType));
|
||||||
|
assertEquals(configs.get(sslTruststoreLocation), properties.getProperty(sslTruststoreLocation));
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
|
||||||
|
public void testUpdatePerBrokerConfigWithListenerNameInKRaftThenShouldSuccessful() throws Exception {
|
||||||
|
alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
|
||||||
|
String listenerName = "listener.name.internal.";
|
||||||
|
|
||||||
|
try (Admin client = cluster.createAdminClient()) {
|
||||||
|
alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap(listenerName + "ssl.truststore.type", "PKCS12"));
|
||||||
|
alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap(listenerName + "ssl.truststore.location", "/temp/test.jks"));
|
||||||
|
|
||||||
|
alterConfigWithKraft(client, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap(listenerName + "ssl.truststore.password", "password"));
|
||||||
|
verifyConfigDefaultValue(client, Optional.of(defaultBrokerId),
|
||||||
|
singleton(listenerName + "ssl.truststore.password"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest(types = {Type.ZK})
|
||||||
|
public void testUpdatePerBrokerConfigInZookeeperThenShouldFail() {
|
||||||
|
cluster.shutdownBroker(0);
|
||||||
|
String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect();
|
||||||
|
KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient();
|
||||||
|
AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
|
||||||
|
alterOpts = generateDefaultAlterOpts(zkConnect);
|
||||||
|
|
||||||
|
assertThrows(ConfigException.class, () ->
|
||||||
|
alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap("ssl.truststore.type", "PKCS12")));
|
||||||
|
assertThrows(ConfigException.class, () ->
|
||||||
|
alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap("ssl.truststore.location", "/temp/test.jks")));
|
||||||
|
assertThrows(ConfigException.class, () ->
|
||||||
|
alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap("ssl.truststore.password", "password")));
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
|
||||||
|
public void testUpdatePerBrokerConfigInKRaftThenShouldFail() {
|
||||||
|
alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
|
||||||
|
|
||||||
|
try (Admin client = cluster.createAdminClient()) {
|
||||||
|
assertThrows(ExecutionException.class,
|
||||||
|
() -> alterConfigWithKraft(client, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap("ssl.truststore.type", "PKCS12")));
|
||||||
|
assertThrows(ExecutionException.class,
|
||||||
|
() -> alterConfigWithKraft(client, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap("ssl.truststore.location", "/temp/test.jks")));
|
||||||
|
assertThrows(ExecutionException.class,
|
||||||
|
() -> alterConfigWithKraft(client, Optional.of(defaultBrokerId),
|
||||||
|
singletonMap("ssl.truststore.password", "password")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertNonZeroStatusExit(Stream<String> args, Consumer<String> checkErrOut) {
|
||||||
AtomicReference<Integer> exitStatus = new AtomicReference<>();
|
AtomicReference<Integer> exitStatus = new AtomicReference<>();
|
||||||
Exit.setExitProcedure((status, __) -> {
|
Exit.setExitProcedure((status, __) -> {
|
||||||
exitStatus.set(status);
|
exitStatus.set(status);
|
||||||
throw new RuntimeException();
|
throw new RuntimeException();
|
||||||
});
|
});
|
||||||
|
|
||||||
String errOut = captureStandardErr(() -> {
|
String errOut = captureStandardMsg(run(args));
|
||||||
try {
|
|
||||||
ConfigCommand.main(args.toArray(String[]::new));
|
|
||||||
} catch (RuntimeException e) {
|
|
||||||
// do nothing.
|
|
||||||
} finally {
|
|
||||||
Exit.resetExitProcedure();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
checkErrOut.accept(errOut);
|
checkErrOut.accept(errOut);
|
||||||
assertNotNull(exitStatus.get());
|
assertNotNull(exitStatus.get());
|
||||||
|
@ -120,164 +415,146 @@ public class ConfigCommandIntegrationTest {
|
||||||
|
|
||||||
private Stream<String> quorumArgs() {
|
private Stream<String> quorumArgs() {
|
||||||
return cluster.isKRaftTest()
|
return cluster.isKRaftTest()
|
||||||
? Stream.of("--bootstrap-server", cluster.bootstrapServers())
|
? Stream.of("--bootstrap-server", cluster.bootstrapServers())
|
||||||
: Stream.of("--zookeeper", ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect());
|
: Stream.of("--zookeeper", ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect());
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<String> entityOp(Optional<String> brokerId) {
|
private void verifyConfig(KafkaZkClient zkClient, Optional<String> brokerId, Map<String, String> config) {
|
||||||
return brokerId.map(id -> Arrays.asList("--entity-name", id)).orElse(Collections.singletonList("--entity-default"));
|
Properties entityConfigs = zkClient.getEntityConfigs("brokers",
|
||||||
|
brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING));
|
||||||
|
assertEquals(config, entityConfigs);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void alterConfigWithZk(KafkaZkClient zkClient, Map<String, String> configs, Optional<String> brokerId) throws Exception {
|
private void alterAndVerifyConfig(KafkaZkClient zkClient, AdminZkClient adminZkClient,
|
||||||
alterConfigWithZk(zkClient, configs, brokerId, Collections.emptyMap());
|
Optional<String> brokerId, Map<String, String> configs) {
|
||||||
|
alterConfigWithZk(zkClient, adminZkClient, brokerId, configs);
|
||||||
|
verifyConfig(zkClient, brokerId, configs);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void alterConfigWithZk(KafkaZkClient zkClient, Map<String, String> configs, Optional<String> brokerId, Map<String, String> encoderConfigs) {
|
private void alterConfigWithZk(KafkaZkClient zkClient, AdminZkClient adminZkClient,
|
||||||
String configStr = Stream.of(configs.entrySet(), encoderConfigs.entrySet())
|
Optional<String> brokerId, Map<String, String> config) {
|
||||||
.flatMap(Set::stream)
|
String configStr = transferConfigMapToString(config);
|
||||||
.map(e -> e.getKey() + "=" + e.getValue())
|
ConfigCommand.ConfigCommandOptions addOpts =
|
||||||
.collect(Collectors.joining(","));
|
new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), asList("--add-config", configStr)));
|
||||||
ConfigCommand.ConfigCommandOptions addOpts = new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), Arrays.asList("--add-config", configStr)));
|
|
||||||
ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient);
|
ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient);
|
||||||
}
|
}
|
||||||
|
|
||||||
void verifyConfig(KafkaZkClient zkClient, Map<String, String> configs, Optional<String> brokerId) {
|
private List<String> entityOp(Optional<String> brokerId) {
|
||||||
Properties entityConfigs = zkClient.getEntityConfigs("brokers", brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING));
|
return brokerId.map(id -> asList("--entity-name", id))
|
||||||
assertEquals(configs, entityConfigs);
|
.orElse(singletonList("--entity-default"));
|
||||||
}
|
}
|
||||||
|
|
||||||
void alterAndVerifyConfig(KafkaZkClient zkClient, Map<String, String> configs, Optional<String> brokerId) throws Exception {
|
private void deleteAndVerifyConfig(KafkaZkClient zkClient, AdminZkClient adminZkClient,
|
||||||
alterConfigWithZk(zkClient, configs, brokerId);
|
Optional<String> brokerId, Set<String> configNames) {
|
||||||
verifyConfig(zkClient, configs, brokerId);
|
ConfigCommand.ConfigCommandOptions deleteOpts =
|
||||||
}
|
new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId),
|
||||||
|
asList("--delete-config", String.join(",", configNames))));
|
||||||
void deleteAndVerifyConfig(KafkaZkClient zkClient, Set<String> configNames, Optional<String> brokerId) {
|
|
||||||
ConfigCommand.ConfigCommandOptions deleteOpts = new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), Arrays.asList("--delete-config", String.join(",", configNames))));
|
|
||||||
ConfigCommand.alterConfigWithZk(zkClient, deleteOpts, adminZkClient);
|
ConfigCommand.alterConfigWithZk(zkClient, deleteOpts, adminZkClient);
|
||||||
verifyConfig(zkClient, Collections.emptyMap(), brokerId);
|
verifyConfig(zkClient, brokerId, Collections.emptyMap());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ClusterTest(types = {Type.ZK})
|
private Map<String, String> generateEncodeConfig() {
|
||||||
public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception {
|
Map<String, String> map = new HashMap<>();
|
||||||
cluster.shutdownBroker(0);
|
map.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
|
||||||
String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect();
|
map.put(PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, "DES/CBC/PKCS5Padding");
|
||||||
KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient();
|
map.put(PASSWORD_ENCODER_ITERATIONS_CONFIG, "1024");
|
||||||
|
map.put(PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, "PBKDF2WithHmacSHA1");
|
||||||
String brokerId = "1";
|
map.put(PASSWORD_ENCODER_KEY_LENGTH_CONFIG, "64");
|
||||||
adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
|
map.put("listener.name.external.ssl.keystore.password", "secret2");
|
||||||
alterOpts = Arrays.asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter");
|
return map;
|
||||||
|
|
||||||
// Add config
|
|
||||||
alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "110000"), Optional.of(brokerId));
|
|
||||||
alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "120000"), Optional.empty());
|
|
||||||
|
|
||||||
// Change config
|
|
||||||
alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "130000"), Optional.of(brokerId));
|
|
||||||
alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "140000"), Optional.empty());
|
|
||||||
|
|
||||||
// Delete config
|
|
||||||
deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.of(brokerId));
|
|
||||||
deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.empty());
|
|
||||||
|
|
||||||
// Listener configs: should work only with listener name
|
|
||||||
alterAndVerifyConfig(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId));
|
|
||||||
assertThrows(ConfigException.class,
|
|
||||||
() -> alterConfigWithZk(zkClient, Collections.singletonMap("ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId)));
|
|
||||||
|
|
||||||
// Per-broker config configured at default cluster-level should fail
|
|
||||||
assertThrows(ConfigException.class,
|
|
||||||
() -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.empty()));
|
|
||||||
deleteAndVerifyConfig(zkClient, Collections.singleton("listener.name.external.ssl.keystore.location"), Optional.of(brokerId));
|
|
||||||
|
|
||||||
// Password config update without encoder secret should fail
|
|
||||||
assertThrows(IllegalArgumentException.class,
|
|
||||||
() -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.password", "secret"), Optional.of(brokerId)));
|
|
||||||
|
|
||||||
// Password config update with encoder secret should succeed and encoded password must be stored in ZK
|
|
||||||
Map<String, String> configs = new HashMap<>();
|
|
||||||
configs.put("listener.name.external.ssl.keystore.password", "secret");
|
|
||||||
configs.put("log.cleaner.threads", "2");
|
|
||||||
Map<String, String> encoderConfigs = Collections.singletonMap(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
|
|
||||||
alterConfigWithZk(zkClient, configs, Optional.of(brokerId), encoderConfigs);
|
|
||||||
Properties brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId);
|
|
||||||
assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper");
|
|
||||||
assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")); // not encoded
|
|
||||||
String encodedPassword = brokerConfigs.getProperty("listener.name.external.ssl.keystore.password");
|
|
||||||
PasswordEncoder passwordEncoder = ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs));
|
|
||||||
assertEquals("secret", passwordEncoder.decode(encodedPassword).value());
|
|
||||||
assertEquals(configs.size(), brokerConfigs.size());
|
|
||||||
|
|
||||||
// Password config update with overrides for encoder parameters
|
|
||||||
Map<String, String> configs2 = Collections.singletonMap("listener.name.internal.ssl.keystore.password", "secret2");
|
|
||||||
Map<String, String> encoderConfigs2 = new HashMap<>();
|
|
||||||
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
|
|
||||||
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, "DES/CBC/PKCS5Padding");
|
|
||||||
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, "1024");
|
|
||||||
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, "PBKDF2WithHmacSHA1");
|
|
||||||
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, "64");
|
|
||||||
alterConfigWithZk(zkClient, configs2, Optional.of(brokerId), encoderConfigs2);
|
|
||||||
Properties brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId);
|
|
||||||
String encodedPassword2 = brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password");
|
|
||||||
assertEquals("secret2", ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)).decode(encodedPassword2).value());
|
|
||||||
assertEquals("secret2", ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs2)).decode(encodedPassword2).value());
|
|
||||||
|
|
||||||
// Password config update at default cluster-level should fail
|
|
||||||
assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient, configs, Optional.empty(), encoderConfigs));
|
|
||||||
|
|
||||||
// Dynamic config updates using ZK should fail if broker is running.
|
|
||||||
registerBrokerInZk(zkClient, Integer.parseInt(brokerId));
|
|
||||||
assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", "210000"), Optional.of(brokerId)));
|
|
||||||
assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", "220000"), Optional.empty()));
|
|
||||||
|
|
||||||
// Dynamic config updates using ZK should for a different broker that is not running should succeed
|
|
||||||
alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "230000"), Optional.of("2"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void registerBrokerInZk(KafkaZkClient zkClient, int id) {
|
private void registerBrokerInZk(KafkaZkClient zkClient, int id) {
|
||||||
zkClient.createTopLevelPaths();
|
zkClient.createTopLevelPaths();
|
||||||
SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
|
SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
|
||||||
EndPoint endpoint = new EndPoint("localhost", 9092, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol);
|
EndPoint endpoint = new EndPoint("localhost", 9092,
|
||||||
BrokerInfo brokerInfo = BrokerInfo.apply(Broker.apply(id, seq(endpoint), scala.None$.empty()), MetadataVersion.latestTesting(), 9192);
|
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol);
|
||||||
|
BrokerInfo brokerInfo = BrokerInfo.apply(Broker.apply(id, endpoint,
|
||||||
|
scala.None$.empty()), MetadataVersion.latestTesting(), 9192);
|
||||||
zkClient.registerBroker(brokerInfo);
|
zkClient.registerBroker(brokerInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SafeVarargs
|
private List<String> generateDefaultAlterOpts(String bootstrapServers) {
|
||||||
static <T> Seq<T> seq(T...seq) {
|
return asList("--bootstrap-server", bootstrapServers,
|
||||||
return seq(Arrays.asList(seq));
|
"--entity-type", "brokers",
|
||||||
|
"--entity-name", "0", "--alter");
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings({"deprecation"})
|
private void alterAndVerifyConfig(Admin client, Optional<String> brokerId, Map<String, String> config) throws Exception {
|
||||||
static <T> Seq<T> seq(Collection<T> seq) {
|
alterConfigWithKraft(client, brokerId, config);
|
||||||
return JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq();
|
verifyConfig(client, brokerId, config);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void alterConfigWithKraft(Admin client, Optional<String> brokerId, Map<String, String> config) {
|
||||||
|
String configStr = transferConfigMapToString(config);
|
||||||
|
ConfigCommand.ConfigCommandOptions addOpts =
|
||||||
|
new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), asList("--add-config", configStr)));
|
||||||
|
ConfigCommand.alterConfig(client, addOpts);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyConfig(Admin client, Optional<String> brokerId, Map<String, String> config) throws Exception {
|
||||||
|
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(defaultBrokerId));
|
||||||
|
TestUtils.waitForCondition(() -> {
|
||||||
|
Map<String, String> current = client.describeConfigs(singletonList(configResource))
|
||||||
|
.all()
|
||||||
|
.get()
|
||||||
|
.values()
|
||||||
|
.stream()
|
||||||
|
.flatMap(e -> e.entries().stream())
|
||||||
|
.collect(HashMap::new, (map, entry) -> map.put(entry.name(), entry.value()), HashMap::putAll);
|
||||||
|
return config.entrySet().stream().allMatch(e -> e.getValue().equals(current.get(e.getKey())));
|
||||||
|
}, 10000, config + " are not updated");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteAndVerifyConfig(Admin client, Optional<String> brokerId, Set<String> config) throws Exception {
|
||||||
|
ConfigCommand.ConfigCommandOptions deleteOpts =
|
||||||
|
new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId),
|
||||||
|
asList("--delete-config", String.join(",", config))));
|
||||||
|
ConfigCommand.alterConfig(client, deleteOpts);
|
||||||
|
verifyConfigDefaultValue(client, brokerId, config);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyConfigDefaultValue(Admin client, Optional<String> brokerId, Set<String> config) throws Exception {
|
||||||
|
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(defaultBrokerId));
|
||||||
|
TestUtils.waitForCondition(() -> {
|
||||||
|
Map<String, String> current = client.describeConfigs(singletonList(configResource))
|
||||||
|
.all()
|
||||||
|
.get()
|
||||||
|
.values()
|
||||||
|
.stream()
|
||||||
|
.flatMap(e -> e.entries().stream())
|
||||||
|
.collect(HashMap::new, (map, entry) -> map.put(entry.name(), entry.value()), HashMap::putAll);
|
||||||
|
return config.stream().allMatch(current::containsKey);
|
||||||
|
}, 5000, config + " are not updated");
|
||||||
}
|
}
|
||||||
|
|
||||||
@SafeVarargs
|
@SafeVarargs
|
||||||
public static String[] toArray(List<String>... lists) {
|
private static String[] toArray(List<String>... lists) {
|
||||||
return Stream.of(lists).flatMap(List::stream).toArray(String[]::new);
|
return Stream.of(lists).flatMap(List::stream).toArray(String[]::new);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String captureStandardErr(Runnable runnable) {
|
private String captureStandardMsg(Runnable runnable) {
|
||||||
return captureStandardStream(true, runnable);
|
return captureStandardStream(runnable);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String captureStandardStream(boolean isErr, Runnable runnable) {
|
private String transferConfigMapToString(Map<String, String> configs) {
|
||||||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
return configs.entrySet()
|
||||||
PrintStream currentStream = isErr ? System.err : System.out;
|
.stream()
|
||||||
PrintStream tempStream = new PrintStream(outputStream);
|
.map(e -> e.getKey() + "=" + e.getValue())
|
||||||
if (isErr)
|
.collect(Collectors.joining(","));
|
||||||
System.setErr(tempStream);
|
}
|
||||||
else
|
|
||||||
System.setOut(tempStream);
|
|
||||||
try {
|
|
||||||
runnable.run();
|
|
||||||
return outputStream.toString().trim();
|
|
||||||
} finally {
|
|
||||||
if (isErr)
|
|
||||||
System.setErr(currentStream);
|
|
||||||
else
|
|
||||||
System.setOut(currentStream);
|
|
||||||
|
|
||||||
tempStream.close();
|
private String captureStandardStream(Runnable runnable) {
|
||||||
|
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||||
|
PrintStream currentStream = System.err;
|
||||||
|
try (PrintStream tempStream = new PrintStream(outputStream)) {
|
||||||
|
System.setErr(tempStream);
|
||||||
|
try {
|
||||||
|
runnable.run();
|
||||||
|
return outputStream.toString().trim();
|
||||||
|
} finally {
|
||||||
|
System.setErr(currentStream);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -421,8 +421,8 @@ public class ConfigCommandTest {
|
||||||
public void testExpectedEntityTypeNames(List<String> expectedTypes, List<String> expectedNames, List<String> connectOpts, String...args) {
|
public void testExpectedEntityTypeNames(List<String> expectedTypes, List<String> expectedNames, List<String> connectOpts, String...args) {
|
||||||
ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(toArray(Arrays.asList(connectOpts.get(0), connectOpts.get(1), "--describe"), Arrays.asList(args)));
|
ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(toArray(Arrays.asList(connectOpts.get(0), connectOpts.get(1), "--describe"), Arrays.asList(args)));
|
||||||
createOpts.checkArgs();
|
createOpts.checkArgs();
|
||||||
assertEquals(createOpts.entityTypes().toSeq(), ConfigCommandIntegrationTest.seq(expectedTypes));
|
assertEquals(createOpts.entityTypes().toSeq(), seq(expectedTypes));
|
||||||
assertEquals(createOpts.entityNames().toSeq(), ConfigCommandIntegrationTest.seq(expectedNames));
|
assertEquals(createOpts.entityNames().toSeq(), seq(expectedNames));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void doTestOptionEntityTypeNames(boolean zkConfig) {
|
public void doTestOptionEntityTypeNames(boolean zkConfig) {
|
||||||
|
@ -1710,7 +1710,7 @@ public class ConfigCommandTest {
|
||||||
public void checkEntities(List<String> opts, Map<String, List<String>> expectedFetches, List<String> expectedEntityNames) {
|
public void checkEntities(List<String> opts, Map<String, List<String>> expectedFetches, List<String> expectedEntityNames) {
|
||||||
ConfigCommand.ConfigEntity entity = ConfigCommand.parseEntity(new ConfigCommand.ConfigCommandOptions(toArray(opts, Collections.singletonList("--describe"))));
|
ConfigCommand.ConfigEntity entity = ConfigCommand.parseEntity(new ConfigCommand.ConfigCommandOptions(toArray(opts, Collections.singletonList("--describe"))));
|
||||||
expectedFetches.forEach((name, values) ->
|
expectedFetches.forEach((name, values) ->
|
||||||
when(zkClient.getAllEntitiesWithConfig(name)).thenReturn(ConfigCommandIntegrationTest.seq(values)));
|
when(zkClient.getAllEntitiesWithConfig(name)).thenReturn(seq(values)));
|
||||||
Seq<ConfigCommand.ConfigEntity> entities0 = entity.getAllEntities(zkClient);
|
Seq<ConfigCommand.ConfigEntity> entities0 = entity.getAllEntities(zkClient);
|
||||||
List<ConfigCommand.ConfigEntity> entities = new ArrayList<>();
|
List<ConfigCommand.ConfigEntity> entities = new ArrayList<>();
|
||||||
entities0.foreach(e -> {
|
entities0.foreach(e -> {
|
||||||
|
@ -1996,4 +1996,9 @@ public class ConfigCommandTest {
|
||||||
return mock(AlterClientQuotasResult.class);
|
return mock(AlterClientQuotasResult.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings({"deprecation"})
|
||||||
|
private <T> Seq<T> seq(Collection<T> seq) {
|
||||||
|
return JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue