mirror of https://github.com/apache/kafka.git
KAFKA-16181: Use incrementalAlterConfigs when updating broker configs by kafka-configs.sh (#15304)
This PR implement KIP-1011, kafka-configs.sh now uses incrementalAlterConfigs API to alter broker configurations instead of the deprecated alterConfigs API, and it will fall directly if the broker doesn't support incrementalAlterConfigs. Reviewers: David Jacot <djacot@confluent.io>, OmniaGM <o.g.h.ibrahim@gmail.com>.
This commit is contained in:
parent
7ca02fd908
commit
615f1a0bf9
|
@ -18,22 +18,24 @@
|
||||||
package kafka.admin
|
package kafka.admin
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets
|
import java.nio.charset.StandardCharsets
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.{ExecutionException, TimeUnit}
|
||||||
import java.util.{Collections, Properties}
|
import java.util.{Collections, Properties}
|
||||||
import joptsimple._
|
import joptsimple._
|
||||||
import kafka.server.DynamicConfig
|
import kafka.server.DynamicConfig
|
||||||
import kafka.utils.Implicits._
|
import kafka.utils.Implicits._
|
||||||
import kafka.utils.Logging
|
import kafka.utils.Logging
|
||||||
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, ScramMechanism => PublicScramMechanism}
|
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
|
||||||
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
|
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
|
||||||
import org.apache.kafka.common.errors.InvalidConfigurationException
|
import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException}
|
||||||
import org.apache.kafka.common.internals.Topic
|
import org.apache.kafka.common.internals.Topic
|
||||||
|
import org.apache.kafka.common.protocol.ApiKeys
|
||||||
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
|
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
|
||||||
import org.apache.kafka.common.security.scram.internals.ScramMechanism
|
import org.apache.kafka.common.security.scram.internals.ScramMechanism
|
||||||
import org.apache.kafka.common.utils.{Exit, Utils}
|
import org.apache.kafka.common.utils.{Exit, Utils}
|
||||||
import org.apache.kafka.server.config.{ConfigType, QuotaConfig}
|
import org.apache.kafka.server.config.{ConfigType, QuotaConfig}
|
||||||
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
|
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
|
||||||
import org.apache.kafka.storage.internals.log.LogConfig
|
import org.apache.kafka.storage.internals.log.LogConfig
|
||||||
|
|
||||||
import scala.annotation.nowarn
|
import scala.annotation.nowarn
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
import scala.collection._
|
import scala.collection._
|
||||||
|
@ -80,6 +82,11 @@ object ConfigCommand extends Logging {
|
||||||
System.err.println(e.getMessage)
|
System.err.println(e.getMessage)
|
||||||
Exit.exit(1)
|
Exit.exit(1)
|
||||||
|
|
||||||
|
case e: UnsupportedVersionException =>
|
||||||
|
logger.debug(s"Unsupported API encountered in server when executing config command with args '${args.mkString(" ")}'")
|
||||||
|
System.err.println(e.getMessage)
|
||||||
|
Exit.exit(1)
|
||||||
|
|
||||||
case t: Throwable =>
|
case t: Throwable =>
|
||||||
logger.debug(s"Error while executing config command with args '${args.mkString(" ")}'", t)
|
logger.debug(s"Error while executing config command with args '${args.mkString(" ")}'", t)
|
||||||
System.err.println(s"Error while executing config command with args '${args.mkString(" ")}'")
|
System.err.println(s"Error while executing config command with args '${args.mkString(" ")}'")
|
||||||
|
@ -161,7 +168,6 @@ object ConfigCommand extends Logging {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@nowarn("cat=deprecation")
|
|
||||||
def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
|
def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
|
||||||
val entityTypes = opts.entityTypes
|
val entityTypes = opts.entityTypes
|
||||||
val entityNames = opts.entityNames
|
val entityNames = opts.entityNames
|
||||||
|
@ -172,27 +178,25 @@ object ConfigCommand extends Logging {
|
||||||
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
|
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
|
||||||
|
|
||||||
entityTypeHead match {
|
entityTypeHead match {
|
||||||
case ConfigType.TOPIC =>
|
case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER | ConfigType.GROUP =>
|
||||||
alterResourceConfig(adminClient, entityTypeHead, entityNameHead, configsToBeDeleted, configsToBeAdded, ConfigResource.Type.TOPIC)
|
val configResourceType = entityTypeHead match {
|
||||||
|
case ConfigType.TOPIC => ConfigResource.Type.TOPIC
|
||||||
case ConfigType.BROKER =>
|
case ConfigType.CLIENT_METRICS => ConfigResource.Type.CLIENT_METRICS
|
||||||
val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false)
|
case ConfigType.BROKER => ConfigResource.Type.BROKER
|
||||||
.map { entry => (entry.name, entry) }.toMap
|
case ConfigType.GROUP => ConfigResource.Type.GROUP
|
||||||
|
}
|
||||||
// fail the command if any of the configs to be deleted does not exist
|
try {
|
||||||
val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
|
alterResourceConfig(adminClient, entityTypeHead, entityNameHead, configsToBeDeleted, configsToBeAdded, configResourceType)
|
||||||
if (invalidConfigs.nonEmpty)
|
} catch {
|
||||||
throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
|
case e: ExecutionException =>
|
||||||
|
e.getCause match {
|
||||||
val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
|
case _: UnsupportedVersionException =>
|
||||||
val sensitiveEntries = newEntries.filter(_._2.value == null)
|
throw new UnsupportedVersionException(s"The ${ApiKeys.INCREMENTAL_ALTER_CONFIGS} API is not supported by the cluster. The API is supported starting from version 2.3.0."
|
||||||
if (sensitiveEntries.nonEmpty)
|
+ " You may want to use an older version of this tool to interact with your cluster, or upgrade your brokers to version 2.3.0 or newer to avoid this error.")
|
||||||
throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}")
|
case _ => throw e
|
||||||
val newConfig = new JConfig(newEntries.asJava.values)
|
}
|
||||||
|
case e: Throwable => throw e
|
||||||
val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityNameHead)
|
}
|
||||||
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
|
|
||||||
adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
|
|
||||||
|
|
||||||
case BrokerLoggerConfigType =>
|
case BrokerLoggerConfigType =>
|
||||||
val validLoggers = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = true, describeAll = false).map(_.name)
|
val validLoggers = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = true, describeAll = false).map(_.name)
|
||||||
|
@ -203,10 +207,10 @@ object ConfigCommand extends Logging {
|
||||||
|
|
||||||
val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityNameHead)
|
val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityNameHead)
|
||||||
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
|
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
|
||||||
val alterLogLevelEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
|
val addEntries = configsToBeAdded.values.map(k => new AlterConfigOp(k, AlterConfigOp.OpType.SET))
|
||||||
++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
|
val deleteEntries = configsToBeDeleted.map(k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE))
|
||||||
).asJavaCollection
|
val alterEntries = (deleteEntries ++ addEntries).asJavaCollection
|
||||||
adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
|
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
|
||||||
|
|
||||||
case ConfigType.USER | ConfigType.CLIENT =>
|
case ConfigType.USER | ConfigType.CLIENT =>
|
||||||
val hasQuotaConfigsToAdd = configsToBeAdded.keys.exists(QuotaConfig.isClientOrUserQuotaConfig)
|
val hasQuotaConfigsToAdd = configsToBeAdded.keys.exists(QuotaConfig.isClientOrUserQuotaConfig)
|
||||||
|
@ -250,13 +254,8 @@ object ConfigCommand extends Logging {
|
||||||
throw new IllegalArgumentException(s"Only connection quota configs can be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config names: ${unknownConfigs.mkString(",")}")
|
throw new IllegalArgumentException(s"Only connection quota configs can be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config names: ${unknownConfigs.mkString(",")}")
|
||||||
alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted)
|
alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted)
|
||||||
|
|
||||||
case ConfigType.CLIENT_METRICS =>
|
case _ =>
|
||||||
alterResourceConfig(adminClient, entityTypeHead, entityNameHead, configsToBeDeleted, configsToBeAdded, ConfigResource.Type.CLIENT_METRICS)
|
throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead")
|
||||||
|
|
||||||
case ConfigType.GROUP =>
|
|
||||||
alterResourceConfig(adminClient, entityTypeHead, entityNameHead, configsToBeDeleted, configsToBeAdded, ConfigResource.Type.GROUP)
|
|
||||||
|
|
||||||
case _ => throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (entityNameHead.nonEmpty)
|
if (entityNameHead.nonEmpty)
|
||||||
|
@ -380,9 +379,9 @@ object ConfigCommand extends Logging {
|
||||||
|
|
||||||
val configResource = new ConfigResource(resourceType, entityNameHead)
|
val configResource = new ConfigResource(resourceType, entityNameHead)
|
||||||
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
|
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
|
||||||
val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
|
val addEntries = configsToBeAdded.values.map(k => new AlterConfigOp(k, AlterConfigOp.OpType.SET))
|
||||||
++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
|
val deleteEntries = configsToBeDeleted.map(k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE))
|
||||||
).asJavaCollection
|
val alterEntries = (deleteEntries ++ addEntries).asJavaCollection
|
||||||
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
|
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,16 +17,25 @@
|
||||||
package kafka.admin;
|
package kafka.admin;
|
||||||
|
|
||||||
import org.apache.kafka.clients.admin.Admin;
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
|
import org.apache.kafka.clients.admin.AdminClientTestUtils;
|
||||||
|
import org.apache.kafka.clients.admin.AlterConfigsOptions;
|
||||||
|
import org.apache.kafka.clients.admin.AlterConfigsResult;
|
||||||
import org.apache.kafka.clients.admin.ConfigEntry;
|
import org.apache.kafka.clients.admin.ConfigEntry;
|
||||||
|
import org.apache.kafka.clients.admin.NewTopic;
|
||||||
import org.apache.kafka.common.config.ConfigResource;
|
import org.apache.kafka.common.config.ConfigResource;
|
||||||
|
import org.apache.kafka.common.errors.InvalidConfigurationException;
|
||||||
|
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||||
|
import org.apache.kafka.common.test.api.ClusterConfigProperty;
|
||||||
import org.apache.kafka.common.test.api.ClusterInstance;
|
import org.apache.kafka.common.test.api.ClusterInstance;
|
||||||
import org.apache.kafka.common.test.api.ClusterTest;
|
import org.apache.kafka.common.test.api.ClusterTest;
|
||||||
import org.apache.kafka.common.test.api.ClusterTestExtensions;
|
import org.apache.kafka.common.test.api.ClusterTestExtensions;
|
||||||
import org.apache.kafka.common.test.api.Type;
|
import org.apache.kafka.common.test.api.Type;
|
||||||
import org.apache.kafka.common.utils.Exit;
|
import org.apache.kafka.common.utils.Exit;
|
||||||
|
import org.apache.kafka.server.common.MetadataVersion;
|
||||||
import org.apache.kafka.test.TestUtils;
|
import org.apache.kafka.test.TestUtils;
|
||||||
|
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
|
@ -57,9 +66,11 @@ import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBA
|
||||||
import static org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG;
|
import static org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG;
|
||||||
import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
|
import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
|
||||||
@ExtendWith(value = ClusterTestExtensions.class)
|
@ExtendWith(value = ClusterTestExtensions.class)
|
||||||
public class ConfigCommandIntegrationTest {
|
public class ConfigCommandIntegrationTest {
|
||||||
|
@ -159,11 +170,11 @@ public class ConfigCommandIntegrationTest {
|
||||||
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"), alterOpts);
|
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"), alterOpts);
|
||||||
// Per-broker config configured at default cluster-level should fail
|
// Per-broker config configured at default cluster-level should fail
|
||||||
assertThrows(ExecutionException.class,
|
assertThrows(ExecutionException.class,
|
||||||
() -> alterConfigWithKraft(client, Optional.empty(),
|
() -> alterConfigWithAdmin(client, Optional.empty(),
|
||||||
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"), alterOpts));
|
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"), alterOpts));
|
||||||
deleteAndVerifyConfigValue(client, defaultBrokerId,
|
deleteAndVerifyConfigValue(client, defaultBrokerId,
|
||||||
singleton("listener.name.internal.ssl.keystore.location"), false, alterOpts);
|
singleton("listener.name.internal.ssl.keystore.location"), false, alterOpts);
|
||||||
alterConfigWithKraft(client, Optional.of(defaultBrokerId),
|
alterConfigWithAdmin(client, Optional.of(defaultBrokerId),
|
||||||
singletonMap("listener.name.external.ssl.keystore.password", "secret"), alterOpts);
|
singletonMap("listener.name.external.ssl.keystore.password", "secret"), alterOpts);
|
||||||
|
|
||||||
// Password config update with encoder secret should succeed and encoded password must be stored in ZK
|
// Password config update with encoder secret should succeed and encoded password must be stored in ZK
|
||||||
|
@ -175,7 +186,7 @@ public class ConfigCommandIntegrationTest {
|
||||||
|
|
||||||
// Password config update at default cluster-level should fail
|
// Password config update at default cluster-level should fail
|
||||||
assertThrows(ExecutionException.class,
|
assertThrows(ExecutionException.class,
|
||||||
() -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), configs, alterOpts));
|
() -> alterConfigWithAdmin(client, Optional.of(defaultBrokerId), configs, alterOpts));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,7 +215,7 @@ public class ConfigCommandIntegrationTest {
|
||||||
deleteAndVerifyGroupConfigValue(client, defaultGroupName, configs, alterOpts);
|
deleteAndVerifyGroupConfigValue(client, defaultGroupName, configs, alterOpts);
|
||||||
|
|
||||||
// Unknown config configured should fail
|
// Unknown config configured should fail
|
||||||
assertThrows(ExecutionException.class, () -> alterConfigWithKraft(client, singletonMap("unknown.config", "20000"), alterOpts));
|
assertThrows(ExecutionException.class, () -> alterConfigWithAdmin(client, singletonMap("unknown.config", "20000"), alterOpts));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,7 +243,7 @@ public class ConfigCommandIntegrationTest {
|
||||||
deleteAndVerifyClientMetricsConfigValue(client, defaultClientMetricsName, configs.keySet(), alterOpts);
|
deleteAndVerifyClientMetricsConfigValue(client, defaultClientMetricsName, configs.keySet(), alterOpts);
|
||||||
|
|
||||||
// Unknown config configured should fail
|
// Unknown config configured should fail
|
||||||
assertThrows(ExecutionException.class, () -> alterConfigWithKraft(client, singletonMap("unknown.config", "20000"), alterOpts));
|
assertThrows(ExecutionException.class, () -> alterConfigWithAdmin(client, singletonMap("unknown.config", "20000"), alterOpts));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -242,13 +253,13 @@ public class ConfigCommandIntegrationTest {
|
||||||
|
|
||||||
try (Admin client = cluster.admin()) {
|
try (Admin client = cluster.admin()) {
|
||||||
assertThrows(ExecutionException.class,
|
assertThrows(ExecutionException.class,
|
||||||
() -> alterConfigWithKraft(client, Optional.of(defaultBrokerId),
|
() -> alterConfigWithAdmin(client, Optional.of(defaultBrokerId),
|
||||||
singletonMap(AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false"), alterOpts));
|
singletonMap(AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false"), alterOpts));
|
||||||
assertThrows(ExecutionException.class,
|
assertThrows(ExecutionException.class,
|
||||||
() -> alterConfigWithKraft(client, Optional.of(defaultBrokerId),
|
() -> alterConfigWithAdmin(client, Optional.of(defaultBrokerId),
|
||||||
singletonMap(AUTO_LEADER_REBALANCE_ENABLE_CONFIG, "false"), alterOpts));
|
singletonMap(AUTO_LEADER_REBALANCE_ENABLE_CONFIG, "false"), alterOpts));
|
||||||
assertThrows(ExecutionException.class,
|
assertThrows(ExecutionException.class,
|
||||||
() -> alterConfigWithKraft(client, Optional.of(defaultBrokerId),
|
() -> alterConfigWithAdmin(client, Optional.of(defaultBrokerId),
|
||||||
singletonMap("broker.id", "1"), alterOpts));
|
singletonMap("broker.id", "1"), alterOpts));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -277,7 +288,7 @@ public class ConfigCommandIntegrationTest {
|
||||||
singletonMap(listenerName + "ssl.truststore.type", "PKCS12"), alterOpts);
|
singletonMap(listenerName + "ssl.truststore.type", "PKCS12"), alterOpts);
|
||||||
alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
|
alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
|
||||||
singletonMap(listenerName + "ssl.truststore.location", "/temp/test.jks"), alterOpts);
|
singletonMap(listenerName + "ssl.truststore.location", "/temp/test.jks"), alterOpts);
|
||||||
alterConfigWithKraft(client, Optional.of(defaultBrokerId),
|
alterConfigWithAdmin(client, Optional.of(defaultBrokerId),
|
||||||
singletonMap(listenerName + "ssl.truststore.password", "password"), alterOpts);
|
singletonMap(listenerName + "ssl.truststore.password", "password"), alterOpts);
|
||||||
verifyConfigSecretValue(client, Optional.of(defaultBrokerId),
|
verifyConfigSecretValue(client, Optional.of(defaultBrokerId),
|
||||||
singleton(listenerName + "ssl.truststore.password"));
|
singleton(listenerName + "ssl.truststore.password"));
|
||||||
|
@ -290,17 +301,119 @@ public class ConfigCommandIntegrationTest {
|
||||||
|
|
||||||
try (Admin client = cluster.admin()) {
|
try (Admin client = cluster.admin()) {
|
||||||
assertThrows(ExecutionException.class,
|
assertThrows(ExecutionException.class,
|
||||||
() -> alterConfigWithKraft(client, Optional.of(defaultBrokerId),
|
() -> alterConfigWithAdmin(client, Optional.of(defaultBrokerId),
|
||||||
singletonMap(SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12"), alterOpts));
|
singletonMap(SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12"), alterOpts));
|
||||||
assertThrows(ExecutionException.class,
|
assertThrows(ExecutionException.class,
|
||||||
() -> alterConfigWithKraft(client, Optional.of(defaultBrokerId),
|
() -> alterConfigWithAdmin(client, Optional.of(defaultBrokerId),
|
||||||
singletonMap(SSL_TRUSTSTORE_LOCATION_CONFIG, "/temp/test.jks"), alterOpts));
|
singletonMap(SSL_TRUSTSTORE_LOCATION_CONFIG, "/temp/test.jks"), alterOpts));
|
||||||
assertThrows(ExecutionException.class,
|
assertThrows(ExecutionException.class,
|
||||||
() -> alterConfigWithKraft(client, Optional.of(defaultBrokerId),
|
() -> alterConfigWithAdmin(client, Optional.of(defaultBrokerId),
|
||||||
singletonMap(SSL_TRUSTSTORE_PASSWORD_CONFIG, "password"), alterOpts));
|
singletonMap(SSL_TRUSTSTORE_PASSWORD_CONFIG, "password"), alterOpts));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testUpdateInvalidBrokerConfigs() {
|
||||||
|
updateAndCheckInvalidBrokerConfig(Optional.empty());
|
||||||
|
updateAndCheckInvalidBrokerConfig(Optional.of(cluster.anyBrokerSocketServer().config().brokerId() + ""));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateAndCheckInvalidBrokerConfig(Optional<String> brokerIdOrDefault) {
|
||||||
|
List<String> alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
|
||||||
|
try (Admin client = cluster.admin()) {
|
||||||
|
alterConfigWithAdmin(client, brokerIdOrDefault, Collections.singletonMap("invalid", "2"), alterOpts);
|
||||||
|
|
||||||
|
Stream<String> describeCommand = Stream.concat(
|
||||||
|
Stream.concat(
|
||||||
|
Stream.of("--bootstrap-server", cluster.bootstrapServers()),
|
||||||
|
Stream.of(entityOp(brokerIdOrDefault).toArray(new String[0]))),
|
||||||
|
Stream.of("--entity-type", "brokers", "--describe"));
|
||||||
|
String describeResult = captureStandardStream(false, run(describeCommand));
|
||||||
|
|
||||||
|
// We will treat unknown config as sensitive
|
||||||
|
assertTrue(describeResult.contains("sensitive=true"), describeResult);
|
||||||
|
// Sensitive config will not return
|
||||||
|
assertTrue(describeResult.contains("invalid=null"), describeResult);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testUpdateInvalidTopicConfigs() throws ExecutionException, InterruptedException {
|
||||||
|
List<String> alterOpts = asList("--bootstrap-server", cluster.bootstrapServers(), "--entity-type", "topics", "--alter");
|
||||||
|
try (Admin client = cluster.admin()) {
|
||||||
|
client.createTopics(Collections.singletonList(new NewTopic("test-config-topic", 1, (short) 1))).all().get();
|
||||||
|
assertInstanceOf(
|
||||||
|
InvalidConfigurationException.class,
|
||||||
|
assertThrows(
|
||||||
|
ExecutionException.class,
|
||||||
|
() -> ConfigCommand.alterConfig(
|
||||||
|
client,
|
||||||
|
new ConfigCommand.ConfigCommandOptions(
|
||||||
|
toArray(alterOpts,
|
||||||
|
asList("--add-config", "invalid=2", "--entity-type", "topics", "--entity-name", "test-config-topic"))))
|
||||||
|
).getCause()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test case from KAFKA-13788
|
||||||
|
@ClusterTest(serverProperties = {
|
||||||
|
// Must be at greater than 1MB per cleaner thread, set to 2M+2 so that we can set 2 cleaner threads.
|
||||||
|
@ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size", value = "2097154"),
|
||||||
|
})
|
||||||
|
public void testUpdateBrokerConfigNotAffectedByInvalidConfig() {
|
||||||
|
try (Admin client = cluster.admin()) {
|
||||||
|
ConfigCommand.alterConfig(client, new ConfigCommand.ConfigCommandOptions(
|
||||||
|
toArray(asList("--bootstrap-server", cluster.bootstrapServers(),
|
||||||
|
"--alter",
|
||||||
|
"--add-config", "log.cleaner.threadzz=2",
|
||||||
|
"--entity-type", "brokers",
|
||||||
|
"--entity-default"))));
|
||||||
|
|
||||||
|
ConfigCommand.alterConfig(client, new ConfigCommand.ConfigCommandOptions(
|
||||||
|
toArray(asList("--bootstrap-server", cluster.bootstrapServers(),
|
||||||
|
"--alter",
|
||||||
|
"--add-config", "log.cleaner.threads=2",
|
||||||
|
"--entity-type", "brokers",
|
||||||
|
"--entity-default"))));
|
||||||
|
kafka.utils.TestUtils.waitUntilTrue(
|
||||||
|
() -> cluster.brokerSocketServers().stream().allMatch(broker -> broker.config().getInt("log.cleaner.threads") == 2),
|
||||||
|
() -> "Timeout waiting for topic config propagating to broker",
|
||||||
|
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
|
||||||
|
100L);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest(
|
||||||
|
// Must be at greater than 1MB per cleaner thread, set to 2M+2 so that we can set 2 cleaner threads.
|
||||||
|
serverProperties = {@ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size", value = "2097154")},
|
||||||
|
// Zk code has been removed, use kraft and mockito to mock this situation
|
||||||
|
metadataVersion = MetadataVersion.IBP_3_3_IV0
|
||||||
|
)
|
||||||
|
public void testUnsupportedVersionException() {
|
||||||
|
try (Admin client = cluster.admin()) {
|
||||||
|
Admin spyAdmin = Mockito.spy(client);
|
||||||
|
|
||||||
|
AlterConfigsResult mockResult = AdminClientTestUtils.alterConfigsResult(
|
||||||
|
new ConfigResource(ConfigResource.Type.BROKER, ""), new UnsupportedVersionException("simulated error"));
|
||||||
|
Mockito.doReturn(mockResult).when(spyAdmin)
|
||||||
|
.incrementalAlterConfigs(any(java.util.Map.class), any(AlterConfigsOptions.class));
|
||||||
|
assertEquals(
|
||||||
|
"The INCREMENTAL_ALTER_CONFIGS API is not supported by the cluster. The API is supported starting from version 2.3.0. You may want to use an older version of this tool to interact with your cluster, or upgrade your brokers to version 2.3.0 or newer to avoid this error.",
|
||||||
|
assertThrows(UnsupportedVersionException.class, () -> {
|
||||||
|
ConfigCommand.alterConfig(spyAdmin, new ConfigCommand.ConfigCommandOptions(
|
||||||
|
toArray(asList(
|
||||||
|
"--bootstrap-server", cluster.bootstrapServers(),
|
||||||
|
"--alter",
|
||||||
|
"--add-config", "log.cleaner.threads=2",
|
||||||
|
"--entity-type", "brokers",
|
||||||
|
"--entity-default"))));
|
||||||
|
}).getMessage()
|
||||||
|
);
|
||||||
|
Mockito.verify(spyAdmin).incrementalAlterConfigs(any(java.util.Map.class), any(AlterConfigsOptions.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void assertNonZeroStatusExit(Stream<String> args, Consumer<String> checkErrOut) {
|
private void assertNonZeroStatusExit(Stream<String> args, Consumer<String> checkErrOut) {
|
||||||
AtomicReference<Integer> exitStatus = new AtomicReference<>();
|
AtomicReference<Integer> exitStatus = new AtomicReference<>();
|
||||||
Exit.setExitProcedure((status, __) -> {
|
Exit.setExitProcedure((status, __) -> {
|
||||||
|
@ -333,7 +446,7 @@ public class ConfigCommandIntegrationTest {
|
||||||
Optional<String> brokerId,
|
Optional<String> brokerId,
|
||||||
Map<String, String> config,
|
Map<String, String> config,
|
||||||
List<String> alterOpts) throws Exception {
|
List<String> alterOpts) throws Exception {
|
||||||
alterConfigWithKraft(client, brokerId, config, alterOpts);
|
alterConfigWithAdmin(client, brokerId, config, alterOpts);
|
||||||
verifyConfig(client, brokerId, config);
|
verifyConfig(client, brokerId, config);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -341,7 +454,7 @@ public class ConfigCommandIntegrationTest {
|
||||||
String groupName,
|
String groupName,
|
||||||
Map<String, String> config,
|
Map<String, String> config,
|
||||||
List<String> alterOpts) throws Exception {
|
List<String> alterOpts) throws Exception {
|
||||||
alterConfigWithKraft(client, config, alterOpts);
|
alterConfigWithAdmin(client, config, alterOpts);
|
||||||
verifyGroupConfig(client, groupName, config);
|
verifyGroupConfig(client, groupName, config);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -349,11 +462,11 @@ public class ConfigCommandIntegrationTest {
|
||||||
String clientMetricsName,
|
String clientMetricsName,
|
||||||
Map<String, String> config,
|
Map<String, String> config,
|
||||||
List<String> alterOpts) throws Exception {
|
List<String> alterOpts) throws Exception {
|
||||||
alterConfigWithKraft(client, config, alterOpts);
|
alterConfigWithAdmin(client, config, alterOpts);
|
||||||
verifyClientMetricsConfig(client, clientMetricsName, config);
|
verifyClientMetricsConfig(client, clientMetricsName, config);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void alterConfigWithKraft(Admin client, Optional<String> resourceName, Map<String, String> config, List<String> alterOpts) {
|
private void alterConfigWithAdmin(Admin client, Optional<String> resourceName, Map<String, String> config, List<String> alterOpts) {
|
||||||
String configStr = transferConfigMapToString(config);
|
String configStr = transferConfigMapToString(config);
|
||||||
List<String> bootstrapOpts = quorumArgs().collect(Collectors.toList());
|
List<String> bootstrapOpts = quorumArgs().collect(Collectors.toList());
|
||||||
ConfigCommand.ConfigCommandOptions addOpts =
|
ConfigCommand.ConfigCommandOptions addOpts =
|
||||||
|
@ -365,7 +478,7 @@ public class ConfigCommandIntegrationTest {
|
||||||
ConfigCommand.alterConfig(client, addOpts);
|
ConfigCommand.alterConfig(client, addOpts);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void alterConfigWithKraft(Admin client, Map<String, String> config, List<String> alterOpts) {
|
private void alterConfigWithAdmin(Admin client, Map<String, String> config, List<String> alterOpts) {
|
||||||
String configStr = transferConfigMapToString(config);
|
String configStr = transferConfigMapToString(config);
|
||||||
List<String> bootstrapOpts = quorumArgs().collect(Collectors.toList());
|
List<String> bootstrapOpts = quorumArgs().collect(Collectors.toList());
|
||||||
ConfigCommand.ConfigCommandOptions addOpts =
|
ConfigCommand.ConfigCommandOptions addOpts =
|
||||||
|
|
|
@ -1017,15 +1017,14 @@ public class ConfigCommandTest {
|
||||||
return describeResult;
|
return describeResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options) {
|
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
|
||||||
assertEquals(1, configs.size());
|
assertEquals(1, configs.size());
|
||||||
Map.Entry<ConfigResource, Config> entry = configs.entrySet().iterator().next();
|
Map.Entry<ConfigResource, Collection<AlterConfigOp>> entry = configs.entrySet().iterator().next();
|
||||||
ConfigResource res = entry.getKey();
|
ConfigResource res = entry.getKey();
|
||||||
Config config = entry.getValue();
|
Collection<AlterConfigOp> config = entry.getValue();
|
||||||
assertEquals(ConfigResource.Type.BROKER, res.type());
|
assertEquals(ConfigResource.Type.BROKER, res.type());
|
||||||
config.entries().forEach(e -> brokerConfigs.put(e.name(), e.value()));
|
config.forEach(e -> brokerConfigs.put(e.configEntry().name(), e.configEntry().value()));
|
||||||
return alterResult;
|
return alterResult;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -1115,9 +1114,9 @@ public class ConfigCommandTest {
|
||||||
assertEquals(3, alterConfigOps.size());
|
assertEquals(3, alterConfigOps.size());
|
||||||
|
|
||||||
List<AlterConfigOp> expectedConfigOps = Arrays.asList(
|
List<AlterConfigOp> expectedConfigOps = Arrays.asList(
|
||||||
new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner", "DEBUG"), AlterConfigOp.OpType.SET),
|
|
||||||
new AlterConfigOp(new ConfigEntry("kafka.server.ReplicaManager", ""), AlterConfigOp.OpType.DELETE),
|
new AlterConfigOp(new ConfigEntry("kafka.server.ReplicaManager", ""), AlterConfigOp.OpType.DELETE),
|
||||||
new AlterConfigOp(new ConfigEntry("kafka.server.KafkaApi", ""), AlterConfigOp.OpType.DELETE)
|
new AlterConfigOp(new ConfigEntry("kafka.server.KafkaApi", ""), AlterConfigOp.OpType.DELETE),
|
||||||
|
new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner", "DEBUG"), AlterConfigOp.OpType.SET)
|
||||||
);
|
);
|
||||||
assertEquals(expectedConfigOps.size(), alterConfigOps.size());
|
assertEquals(expectedConfigOps.size(), alterConfigOps.size());
|
||||||
Iterator<AlterConfigOp> alterConfigOpsIter = alterConfigOps.iterator();
|
Iterator<AlterConfigOp> alterConfigOpsIter = alterConfigOps.iterator();
|
||||||
|
@ -1248,9 +1247,9 @@ public class ConfigCommandTest {
|
||||||
assertEquals(3, alterConfigOps.size());
|
assertEquals(3, alterConfigOps.size());
|
||||||
|
|
||||||
List<AlterConfigOp> expectedConfigOps = Arrays.asList(
|
List<AlterConfigOp> expectedConfigOps = Arrays.asList(
|
||||||
|
new AlterConfigOp(new ConfigEntry("interval.ms", ""), AlterConfigOp.OpType.DELETE),
|
||||||
new AlterConfigOp(new ConfigEntry("match", "client_software_name=kafka.python,client_software_version=1\\.2\\..*"), AlterConfigOp.OpType.SET),
|
new AlterConfigOp(new ConfigEntry("match", "client_software_name=kafka.python,client_software_version=1\\.2\\..*"), AlterConfigOp.OpType.SET),
|
||||||
new AlterConfigOp(new ConfigEntry("metrics", "org.apache.kafka.consumer."), AlterConfigOp.OpType.SET),
|
new AlterConfigOp(new ConfigEntry("metrics", "org.apache.kafka.consumer."), AlterConfigOp.OpType.SET)
|
||||||
new AlterConfigOp(new ConfigEntry("interval.ms", ""), AlterConfigOp.OpType.DELETE)
|
|
||||||
);
|
);
|
||||||
assertEquals(expectedConfigOps.size(), alterConfigOps.size());
|
assertEquals(expectedConfigOps.size(), alterConfigOps.size());
|
||||||
Iterator<AlterConfigOp> alterConfigOpsIter = alterConfigOps.iterator();
|
Iterator<AlterConfigOp> alterConfigOpsIter = alterConfigOps.iterator();
|
||||||
|
@ -1358,8 +1357,8 @@ public class ConfigCommandTest {
|
||||||
assertEquals(2, alterConfigOps.size());
|
assertEquals(2, alterConfigOps.size());
|
||||||
|
|
||||||
List<AlterConfigOp> expectedConfigOps = Arrays.asList(
|
List<AlterConfigOp> expectedConfigOps = Arrays.asList(
|
||||||
new AlterConfigOp(new ConfigEntry("consumer.heartbeat.interval.ms", "6000"), AlterConfigOp.OpType.SET),
|
new AlterConfigOp(new ConfigEntry("consumer.session.timeout.ms", ""), AlterConfigOp.OpType.DELETE),
|
||||||
new AlterConfigOp(new ConfigEntry("consumer.session.timeout.ms", ""), AlterConfigOp.OpType.DELETE)
|
new AlterConfigOp(new ConfigEntry("consumer.heartbeat.interval.ms", "6000"), AlterConfigOp.OpType.SET)
|
||||||
);
|
);
|
||||||
assertEquals(expectedConfigOps.size(), alterConfigOps.size());
|
assertEquals(expectedConfigOps.size(), alterConfigOps.size());
|
||||||
Iterator<AlterConfigOp> alterConfigOpsIter = alterConfigOps.iterator();
|
Iterator<AlterConfigOp> alterConfigOpsIter = alterConfigOps.iterator();
|
||||||
|
|
|
@ -126,6 +126,10 @@
|
||||||
<li>The <code>--broker-list</code> option was removed from the <code>kafka-verifiable-consumer</code> command line tool.
|
<li>The <code>--broker-list</code> option was removed from the <code>kafka-verifiable-consumer</code> command line tool.
|
||||||
Please use <code>--bootstrap-server</code> instead.
|
Please use <code>--bootstrap-server</code> instead.
|
||||||
</li>
|
</li>
|
||||||
|
<li>kafka-configs.sh now uses incrementalAlterConfigs API to alter broker configurations instead of the deprecated alterConfigs API,
|
||||||
|
and it will fall directly if the broker doesn't support incrementalAlterConfigs API, which means the broker version is prior to 2.3.x.
|
||||||
|
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1011%3A+Use+incrementalAlterConfigs+when+updating+broker+configs+by+kafka-configs.sh">KIP-1011</a> for more details.
|
||||||
|
</li>
|
||||||
</ul>
|
</ul>
|
||||||
</li>
|
</li>
|
||||||
<li><b>Connect</b>
|
<li><b>Connect</b>
|
||||||
|
|
|
@ -174,7 +174,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, quorumVoterStringBuilder.toString());
|
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, quorumVoterStringBuilder.toString());
|
||||||
|
|
||||||
// reduce log cleaner offset map memory usage
|
// reduce log cleaner offset map memory usage
|
||||||
props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
|
props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
|
||||||
|
|
||||||
// Add associated broker node property overrides
|
// Add associated broker node property overrides
|
||||||
if (brokerNode != null) {
|
if (brokerNode != null) {
|
||||||
|
|
Loading…
Reference in New Issue