From c020c94e043ca4a997797da91a226735803774ab Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 15 Jul 2022 15:48:35 -0400 Subject: [PATCH] KAFKA-14039 Fix AlterConfigPolicy usage in KRaft (#12374) Only pass configs from the request to the AlterConfigPolicy. This changes the KRaft usage of the AlterConfigPolicy to match the usage in ZK mode. Reviewers: Jason Gustafson --- ...minClientWithPoliciesIntegrationTest.scala | 40 +++++++++++++++++-- .../ConfigurationControlManager.java | 14 ++++--- .../ConfigurationControlManagerTest.java | 26 +++++++++--- 3 files changed, 64 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala index 4d48bf5a865..5b2213a65e9 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala @@ -20,17 +20,19 @@ import kafka.log.LogConfig import kafka.server.{Defaults, KafkaConfig} import kafka.utils.TestUtils.assertFutureExceptionTypeEquals import kafka.utils.{Logging, TestInfoUtils, TestUtils} -import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigsOptions, Config, ConfigEntry} +import org.apache.kafka.clients.admin.AlterConfigOp.OpType +import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, AlterConfigsOptions, Config, ConfigEntry} import org.apache.kafka.common.config.{ConfigResource, TopicConfig} import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, PolicyViolationException} import org.apache.kafka.common.utils.Utils import org.apache.kafka.server.policy.AlterConfigPolicy -import org.junit.jupiter.api.Assertions.{assertEquals, assertNull} +import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import scala.annotation.nowarn +import scala.collection.mutable import scala.jdk.CollectionConverters._ /** @@ -121,6 +123,14 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with val topicResource3 = new ConfigResource(ConfigResource.Type.TOPIC, topic3) createTopic(topic3, 1, 1) + // Set a mutable broker config + val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, brokers.head.config.brokerId.toString) + val brokerConfigs = Seq(new ConfigEntry(KafkaConfig.MessageMaxBytesProp, "50000")).asJava + val alterResult1 = client.alterConfigs(Map(brokerResource -> new Config(brokerConfigs)).asJava) + alterResult1.all.get + assertEquals(Set(KafkaConfig.MessageMaxBytesProp), validationsForResource(brokerResource).head.configs().keySet().asScala) + validations.clear() + val topicConfigEntries1 = Seq( new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"), new ConfigEntry(LogConfig.MinInSyncReplicasProp, "2") // policy doesn't allow this @@ -130,7 +140,6 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with val topicConfigEntries3 = Seq(new ConfigEntry(LogConfig.MinInSyncReplicasProp, "-1")).asJava - val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, brokers.head.config.brokerId.toString) val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.SslTruststorePasswordProp, "12313")).asJava // Alter configs: second is valid, the others are invalid @@ -146,6 +155,9 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with alterResult.values.get(topicResource2).get assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException]) assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException]) + assertTrue(validationsForResource(brokerResource).isEmpty, + "Should not see the broker resource in the AlterConfig policy when the broker configs are not being updated.") + validations.clear() // Verify that the second resource was updated and the others were not ensureConsistentKRaftMetadata() @@ -175,6 +187,9 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with alterResult.values.get(topicResource2).get assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException]) assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException]) + assertTrue(validationsForResource(brokerResource).isEmpty, + "Should not see the broker resource in the AlterConfig policy when the broker configs are not being updated.") + validations.clear() // Verify that no resources are updated since validate_only = true ensureConsistentKRaftMetadata() @@ -188,27 +203,44 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with assertEquals("0.8", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value) assertNull(configs.get(brokerResource).get(KafkaConfig.SslTruststorePasswordProp).value) + + // Do an incremental alter config on the broker, ensure we don't see the broker config we set earlier in the policy + alterResult = client.incrementalAlterConfigs(Map( + brokerResource -> + Seq(new AlterConfigOp( + new ConfigEntry(KafkaConfig.MaxConnectionsProp, "9999"), OpType.SET) + ).asJavaCollection + ).asJava) + alterResult.all.get + assertEquals(Set(KafkaConfig.MaxConnectionsProp), validationsForResource(brokerResource).head.configs().keySet().asScala) } } object AdminClientWithPoliciesIntegrationTest { + val validations = new mutable.ListBuffer[AlterConfigPolicy.RequestMetadata]() + + def validationsForResource(resource: ConfigResource): Seq[AlterConfigPolicy.RequestMetadata] = { + validations.filter { req => req.resource().equals(resource) }.toSeq + } + class Policy extends AlterConfigPolicy { var configs: Map[String, _] = _ var closed = false def configure(configs: util.Map[String, _]): Unit = { + validations.clear() this.configs = configs.asScala.toMap } def validate(requestMetadata: AlterConfigPolicy.RequestMetadata): Unit = { + validations.append(requestMetadata) require(!closed, "Policy should not be closed") require(!configs.isEmpty, "configure should have been called with non empty configs") require(!requestMetadata.configs.isEmpty, "request configs should not be empty") require(requestMetadata.resource.name.nonEmpty, "resource name should not be empty") - require(requestMetadata.resource.name.contains("topic")) if (requestMetadata.configs.containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) throw new PolicyViolationException("Min in sync replicas cannot be updated") } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index cde9d395692..4b8561a4d90 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -248,24 +248,26 @@ public class ConfigurationControlManager { private ApiError validateAlterConfig(ConfigResource configResource, List newRecords, boolean newlyCreatedResource) { - Map newConfigs = new HashMap<>(); + Map allConfigs = new HashMap<>(); + Map alteredConfigs = new HashMap<>(); TimelineHashMap existingConfigs = configData.get(configResource); - if (existingConfigs != null) newConfigs.putAll(existingConfigs); + if (existingConfigs != null) allConfigs.putAll(existingConfigs); for (ApiMessageAndVersion newRecord : newRecords) { ConfigRecord configRecord = (ConfigRecord) newRecord.message(); if (configRecord.value() == null) { - newConfigs.remove(configRecord.name()); + allConfigs.remove(configRecord.name()); } else { - newConfigs.put(configRecord.name(), configRecord.value()); + allConfigs.put(configRecord.name(), configRecord.value()); } + alteredConfigs.put(configRecord.name(), configRecord.value()); } try { - validator.validate(configResource, newConfigs); + validator.validate(configResource, allConfigs); if (!newlyCreatedResource) { existenceChecker.accept(configResource); } if (alterConfigPolicy.isPresent()) { - alterConfigPolicy.get().validate(new RequestMetadata(configResource, newConfigs)); + alterConfigPolicy.get().validate(new RequestMetadata(configResource, alteredConfigs)); } } catch (ConfigException e) { return new ApiError(INVALID_CONFIG, e.getMessage()); diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index 007e84ffc0d..1c598924448 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -281,18 +281,30 @@ public class ConfigurationControlManagerTest { public void testIncrementalAlterConfigsWithPolicy() { MockAlterConfigsPolicy policy = new MockAlterConfigsPolicy(asList( new RequestMetadata(MYTOPIC, Collections.emptyMap()), - new RequestMetadata(BROKER0, toMap(entry("foo.bar", "123"), - entry("quux", "456"))))); + new RequestMetadata(BROKER0, toMap( + entry("foo.bar", "123"), + entry("quux", "456"), + entry("broker.config.to.remove", null))))); ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). setKafkaConfigSchema(SCHEMA). setAlterConfigPolicy(Optional.of(policy)). build(); + // Existing configs should not be passed to the policy + manager.replay(new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0"). + setName("broker.config").setValue("123")); + manager.replay(new ConfigRecord().setResourceType(TOPIC.id()).setResourceName(MYTOPIC.name()). + setName("topic.config").setValue("123")); + manager.replay(new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0"). + setName("broker.config.to.remove").setValue("123")); assertEquals(ControllerResult.atomicOf(asList(new ApiMessageAndVersion( new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0"). setName("foo.bar").setValue("123"), CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion( new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0"). - setName("quux").setValue("456"), CONFIG_RECORD.highestSupportedVersion())), - toMap(entry(MYTOPIC, new ApiError(Errors.POLICY_VIOLATION, + setName("quux").setValue("456"), CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion( + new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0"). + setName("broker.config.to.remove").setValue(null), CONFIG_RECORD.highestSupportedVersion()) + ), + toMap(entry(MYTOPIC, new ApiError(Errors.POLICY_VIOLATION, "Expected: AlterConfigPolicy.RequestMetadata(resource=ConfigResource(" + "type=TOPIC, name='mytopic'), configs={}). Got: " + "AlterConfigPolicy.RequestMetadata(resource=ConfigResource(" + @@ -301,8 +313,10 @@ public class ConfigurationControlManagerTest { manager.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap( entry("foo.bar", entry(SET, "123")))), entry(BROKER0, toMap( - entry("foo.bar", entry(SET, "123")), - entry("quux", entry(SET, "456"))))), + entry("foo.bar", entry(SET, "123")), + entry("quux", entry(SET, "456")), + entry("broker.config.to.remove", entry(DELETE, null)) + ))), true)); }