KAFKA-14039 Fix AlterConfigPolicy usage in KRaft (#12374)

Only pass configs from the request to the AlterConfigPolicy. This changes the KRaft usage of the AlterConfigPolicy to match the usage in ZK mode.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
David Arthur 2022-07-15 15:48:35 -04:00 committed by GitHub
parent ddbc030036
commit c020c94e04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 16 deletions

View File

@ -20,17 +20,19 @@ import kafka.log.LogConfig
import kafka.server.{Defaults, KafkaConfig} import kafka.server.{Defaults, KafkaConfig}
import kafka.utils.TestUtils.assertFutureExceptionTypeEquals import kafka.utils.TestUtils.assertFutureExceptionTypeEquals
import kafka.utils.{Logging, TestInfoUtils, TestUtils} import kafka.utils.{Logging, TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigsOptions, Config, ConfigEntry} import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, AlterConfigsOptions, Config, ConfigEntry}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig} import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, PolicyViolationException} import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, PolicyViolationException}
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.policy.AlterConfigPolicy import org.apache.kafka.server.policy.AlterConfigPolicy
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull} import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource import org.junit.jupiter.params.provider.ValueSource
import scala.annotation.nowarn import scala.annotation.nowarn
import scala.collection.mutable
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
/** /**
@ -121,6 +123,14 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
val topicResource3 = new ConfigResource(ConfigResource.Type.TOPIC, topic3) val topicResource3 = new ConfigResource(ConfigResource.Type.TOPIC, topic3)
createTopic(topic3, 1, 1) createTopic(topic3, 1, 1)
// Set a mutable broker config
val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, brokers.head.config.brokerId.toString)
val brokerConfigs = Seq(new ConfigEntry(KafkaConfig.MessageMaxBytesProp, "50000")).asJava
val alterResult1 = client.alterConfigs(Map(brokerResource -> new Config(brokerConfigs)).asJava)
alterResult1.all.get
assertEquals(Set(KafkaConfig.MessageMaxBytesProp), validationsForResource(brokerResource).head.configs().keySet().asScala)
validations.clear()
val topicConfigEntries1 = Seq( val topicConfigEntries1 = Seq(
new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"), new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"),
new ConfigEntry(LogConfig.MinInSyncReplicasProp, "2") // policy doesn't allow this new ConfigEntry(LogConfig.MinInSyncReplicasProp, "2") // policy doesn't allow this
@ -130,7 +140,6 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
val topicConfigEntries3 = Seq(new ConfigEntry(LogConfig.MinInSyncReplicasProp, "-1")).asJava val topicConfigEntries3 = Seq(new ConfigEntry(LogConfig.MinInSyncReplicasProp, "-1")).asJava
val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, brokers.head.config.brokerId.toString)
val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.SslTruststorePasswordProp, "12313")).asJava val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.SslTruststorePasswordProp, "12313")).asJava
// Alter configs: second is valid, the others are invalid // Alter configs: second is valid, the others are invalid
@ -146,6 +155,9 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
alterResult.values.get(topicResource2).get alterResult.values.get(topicResource2).get
assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException]) assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException])
assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException]) assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException])
assertTrue(validationsForResource(brokerResource).isEmpty,
"Should not see the broker resource in the AlterConfig policy when the broker configs are not being updated.")
validations.clear()
// Verify that the second resource was updated and the others were not // Verify that the second resource was updated and the others were not
ensureConsistentKRaftMetadata() ensureConsistentKRaftMetadata()
@ -175,6 +187,9 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
alterResult.values.get(topicResource2).get alterResult.values.get(topicResource2).get
assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException]) assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException])
assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException]) assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException])
assertTrue(validationsForResource(brokerResource).isEmpty,
"Should not see the broker resource in the AlterConfig policy when the broker configs are not being updated.")
validations.clear()
// Verify that no resources are updated since validate_only = true // Verify that no resources are updated since validate_only = true
ensureConsistentKRaftMetadata() ensureConsistentKRaftMetadata()
@ -188,27 +203,44 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
assertEquals("0.8", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value) assertEquals("0.8", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
assertNull(configs.get(brokerResource).get(KafkaConfig.SslTruststorePasswordProp).value) assertNull(configs.get(brokerResource).get(KafkaConfig.SslTruststorePasswordProp).value)
// Do an incremental alter config on the broker, ensure we don't see the broker config we set earlier in the policy
alterResult = client.incrementalAlterConfigs(Map(
brokerResource ->
Seq(new AlterConfigOp(
new ConfigEntry(KafkaConfig.MaxConnectionsProp, "9999"), OpType.SET)
).asJavaCollection
).asJava)
alterResult.all.get
assertEquals(Set(KafkaConfig.MaxConnectionsProp), validationsForResource(brokerResource).head.configs().keySet().asScala)
} }
} }
object AdminClientWithPoliciesIntegrationTest { object AdminClientWithPoliciesIntegrationTest {
val validations = new mutable.ListBuffer[AlterConfigPolicy.RequestMetadata]()
def validationsForResource(resource: ConfigResource): Seq[AlterConfigPolicy.RequestMetadata] = {
validations.filter { req => req.resource().equals(resource) }.toSeq
}
class Policy extends AlterConfigPolicy { class Policy extends AlterConfigPolicy {
var configs: Map[String, _] = _ var configs: Map[String, _] = _
var closed = false var closed = false
def configure(configs: util.Map[String, _]): Unit = { def configure(configs: util.Map[String, _]): Unit = {
validations.clear()
this.configs = configs.asScala.toMap this.configs = configs.asScala.toMap
} }
def validate(requestMetadata: AlterConfigPolicy.RequestMetadata): Unit = { def validate(requestMetadata: AlterConfigPolicy.RequestMetadata): Unit = {
validations.append(requestMetadata)
require(!closed, "Policy should not be closed") require(!closed, "Policy should not be closed")
require(!configs.isEmpty, "configure should have been called with non empty configs") require(!configs.isEmpty, "configure should have been called with non empty configs")
require(!requestMetadata.configs.isEmpty, "request configs should not be empty") require(!requestMetadata.configs.isEmpty, "request configs should not be empty")
require(requestMetadata.resource.name.nonEmpty, "resource name should not be empty") require(requestMetadata.resource.name.nonEmpty, "resource name should not be empty")
require(requestMetadata.resource.name.contains("topic"))
if (requestMetadata.configs.containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) if (requestMetadata.configs.containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG))
throw new PolicyViolationException("Min in sync replicas cannot be updated") throw new PolicyViolationException("Min in sync replicas cannot be updated")
} }

View File

@ -248,24 +248,26 @@ public class ConfigurationControlManager {
private ApiError validateAlterConfig(ConfigResource configResource, private ApiError validateAlterConfig(ConfigResource configResource,
List<ApiMessageAndVersion> newRecords, List<ApiMessageAndVersion> newRecords,
boolean newlyCreatedResource) { boolean newlyCreatedResource) {
Map<String, String> newConfigs = new HashMap<>(); Map<String, String> allConfigs = new HashMap<>();
Map<String, String> alteredConfigs = new HashMap<>();
TimelineHashMap<String, String> existingConfigs = configData.get(configResource); TimelineHashMap<String, String> existingConfigs = configData.get(configResource);
if (existingConfigs != null) newConfigs.putAll(existingConfigs); if (existingConfigs != null) allConfigs.putAll(existingConfigs);
for (ApiMessageAndVersion newRecord : newRecords) { for (ApiMessageAndVersion newRecord : newRecords) {
ConfigRecord configRecord = (ConfigRecord) newRecord.message(); ConfigRecord configRecord = (ConfigRecord) newRecord.message();
if (configRecord.value() == null) { if (configRecord.value() == null) {
newConfigs.remove(configRecord.name()); allConfigs.remove(configRecord.name());
} else { } else {
newConfigs.put(configRecord.name(), configRecord.value()); allConfigs.put(configRecord.name(), configRecord.value());
} }
alteredConfigs.put(configRecord.name(), configRecord.value());
} }
try { try {
validator.validate(configResource, newConfigs); validator.validate(configResource, allConfigs);
if (!newlyCreatedResource) { if (!newlyCreatedResource) {
existenceChecker.accept(configResource); existenceChecker.accept(configResource);
} }
if (alterConfigPolicy.isPresent()) { if (alterConfigPolicy.isPresent()) {
alterConfigPolicy.get().validate(new RequestMetadata(configResource, newConfigs)); alterConfigPolicy.get().validate(new RequestMetadata(configResource, alteredConfigs));
} }
} catch (ConfigException e) { } catch (ConfigException e) {
return new ApiError(INVALID_CONFIG, e.getMessage()); return new ApiError(INVALID_CONFIG, e.getMessage());

View File

@ -281,18 +281,30 @@ public class ConfigurationControlManagerTest {
public void testIncrementalAlterConfigsWithPolicy() { public void testIncrementalAlterConfigsWithPolicy() {
MockAlterConfigsPolicy policy = new MockAlterConfigsPolicy(asList( MockAlterConfigsPolicy policy = new MockAlterConfigsPolicy(asList(
new RequestMetadata(MYTOPIC, Collections.emptyMap()), new RequestMetadata(MYTOPIC, Collections.emptyMap()),
new RequestMetadata(BROKER0, toMap(entry("foo.bar", "123"), new RequestMetadata(BROKER0, toMap(
entry("quux", "456"))))); entry("foo.bar", "123"),
entry("quux", "456"),
entry("broker.config.to.remove", null)))));
ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
setKafkaConfigSchema(SCHEMA). setKafkaConfigSchema(SCHEMA).
setAlterConfigPolicy(Optional.of(policy)). setAlterConfigPolicy(Optional.of(policy)).
build(); build();
// Existing configs should not be passed to the policy
manager.replay(new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
setName("broker.config").setValue("123"));
manager.replay(new ConfigRecord().setResourceType(TOPIC.id()).setResourceName(MYTOPIC.name()).
setName("topic.config").setValue("123"));
manager.replay(new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
setName("broker.config.to.remove").setValue("123"));
assertEquals(ControllerResult.atomicOf(asList(new ApiMessageAndVersion( assertEquals(ControllerResult.atomicOf(asList(new ApiMessageAndVersion(
new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0"). new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
setName("foo.bar").setValue("123"), CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion( setName("foo.bar").setValue("123"), CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion(
new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0"). new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
setName("quux").setValue("456"), CONFIG_RECORD.highestSupportedVersion())), setName("quux").setValue("456"), CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion(
toMap(entry(MYTOPIC, new ApiError(Errors.POLICY_VIOLATION, new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
setName("broker.config.to.remove").setValue(null), CONFIG_RECORD.highestSupportedVersion())
),
toMap(entry(MYTOPIC, new ApiError(Errors.POLICY_VIOLATION,
"Expected: AlterConfigPolicy.RequestMetadata(resource=ConfigResource(" + "Expected: AlterConfigPolicy.RequestMetadata(resource=ConfigResource(" +
"type=TOPIC, name='mytopic'), configs={}). Got: " + "type=TOPIC, name='mytopic'), configs={}). Got: " +
"AlterConfigPolicy.RequestMetadata(resource=ConfigResource(" + "AlterConfigPolicy.RequestMetadata(resource=ConfigResource(" +
@ -301,8 +313,10 @@ public class ConfigurationControlManagerTest {
manager.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap( manager.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(
entry("foo.bar", entry(SET, "123")))), entry("foo.bar", entry(SET, "123")))),
entry(BROKER0, toMap( entry(BROKER0, toMap(
entry("foo.bar", entry(SET, "123")), entry("foo.bar", entry(SET, "123")),
entry("quux", entry(SET, "456"))))), entry("quux", entry(SET, "456")),
entry("broker.config.to.remove", entry(DELETE, null))
))),
true)); true));
} }