diff --git a/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java b/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java
new file mode 100644
index 00000000000..ca47efa6645
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.policy;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.PolicyViolationException;
+
+import java.util.Map;
+
+/**
+ * An interface for enforcing a policy on alter configs requests.
+ *
+ * Common use cases are requiring that the replication factor, min.insync.replicas and/or retention settings for a
+ * topic remain within an allowable range.
+ *
+ * If alter.config.policy.class.name is defined, Kafka will create an instance of the specified class
+ * using the default constructor and will then pass the broker configs to its configure() method. During
+ * broker shutdown, the close() method will be invoked so that resources can be released (if necessary).
+ */
+public interface AlterConfigPolicy extends Configurable, AutoCloseable {
+
+ /**
+ * Class containing the create request parameters.
+ */
+ class RequestMetadata {
+
+ private final ConfigResource resource;
+ private final Map configs;
+
+ /**
+ * Create an instance of this class with the provided parameters.
+ *
+ * This constructor is public to make testing of AlterConfigPolicy implementations easier.
+ */
+ public RequestMetadata(ConfigResource resource, Map configs) {
+ this.resource = resource;
+ this.configs = configs;
+ }
+
+ /**
+ * Return the configs in the request.
+ */
+ public Map configs() {
+ return configs;
+ }
+
+ public ConfigResource resource() {
+ return resource;
+ }
+
+ @Override
+ public String toString() {
+ return "AlterConfigPolicy.RequestMetadata(resource=" + resource +
+ ", configs=" + configs + ")";
+ }
+ }
+
+ /**
+ * Validate the request parameters and throw a PolicyViolationException with a suitable error
+ * message if the alter configs request parameters for the provided resource do not satisfy this policy.
+ *
+ * Clients will receive the POLICY_VIOLATION error code along with the exception's message. Note that validation
+ * failure only affects the relevant resource, other resources in the request will still be processed.
+ *
+ * @param requestMetadata the alter configs request parameters for the provided resource (topic is the only resource
+ * type whose configs can be updated currently).
+ * @throws PolicyViolationException if the request parameters do not satisfy this policy.
+ */
+ void validate(RequestMetadata requestMetadata) throws PolicyViolationException;
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 6f9e6af814b..c0e86e9ef97 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -67,7 +67,7 @@ import static org.junit.Assert.fail;
/**
* A unit test for KafkaAdminClient.
*
- * See KafkaAdminClientIntegrationTest for an integration test of the KafkaAdminClient.
+ * See AdminClientIntegrationTest for an integration test.
*/
public class KafkaAdminClientTest {
@Rule
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index c147593d0b5..33c6b77d410 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -23,14 +23,14 @@ import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.log.LogConfig
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
import org.apache.kafka.common.errors.{ApiException, InvalidRequestException, PolicyViolationException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.CreateTopicsRequest._
import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse, Resource, ResourceType}
-import org.apache.kafka.server.policy.CreateTopicPolicy
+import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
import scala.collection._
@@ -47,6 +47,9 @@ class AdminManager(val config: KafkaConfig,
private val createTopicPolicy =
Option(config.getConfiguredInstance(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[CreateTopicPolicy]))
+ private val alterConfigPolicy =
+ Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))
+
def hasDelayedTopicOperations = topicPurgatory.delayed != 0
/**
@@ -255,14 +258,28 @@ class AdminManager(val config: KafkaConfig,
resource.`type` match {
case ResourceType.TOPIC =>
val topic = resource.name
+
val properties = new Properties
config.entries.asScala.foreach { configEntry =>
properties.setProperty(configEntry.name(), configEntry.value())
}
- if (validateOnly)
- AdminUtils.validateTopicConfig(zkUtils, topic, properties)
- else
- AdminUtils.changeTopicConfig(zkUtils, topic, properties)
+
+ alterConfigPolicy match {
+ case Some(policy) =>
+ AdminUtils.validateTopicConfig(zkUtils, topic, properties)
+
+ val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap
+ policy.validate(new AlterConfigPolicy.RequestMetadata(
+ new ConfigResource(ConfigResource.Type.TOPIC, resource.name), configEntriesMap.asJava))
+
+ if (!validateOnly)
+ AdminUtils.changeTopicConfig(zkUtils, topic, properties)
+ case None =>
+ if (validateOnly)
+ AdminUtils.validateTopicConfig(zkUtils, topic, properties)
+ else
+ AdminUtils.changeTopicConfig(zkUtils, topic, properties)
+ }
resource -> new ApiError(Errors.NONE, null)
case resourceType =>
throw new InvalidRequestException(s"AlterConfigs is only supported for topics, but resource type is $resourceType")
@@ -274,8 +291,8 @@ class AdminManager(val config: KafkaConfig,
resource -> ApiError.fromThrowable(new InvalidRequestException(message, e))
case e: Throwable =>
// Log client errors at a lower level than unexpected exceptions
- val message = s"Error processing alter configs request for resource $resource"
- if (e.isInstanceOf[ApiException])
+ val message = s"Error processing alter configs request for resource $resource, config $config"
+ if (e.isInstanceOf[ApiException] || e.isInstanceOf[PolicyViolationException])
info(message, e)
else
error(message, e)
@@ -287,5 +304,6 @@ class AdminManager(val config: KafkaConfig,
def shutdown() {
topicPurgatory.shutdown()
CoreUtils.swallow(createTopicPolicy.foreach(_.close()))
+ CoreUtils.swallow(alterConfigPolicy.foreach(_.close()))
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 6e94043c4fb..fe47fd0f8cb 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -301,6 +301,7 @@ object KafkaConfig {
val AutoCreateTopicsEnableProp = "auto.create.topics.enable"
val MinInSyncReplicasProp = "min.insync.replicas"
val CreateTopicPolicyClassNameProp = "create.topic.policy.class.name"
+ val AlterConfigPolicyClassNameProp = "alter.config.policy.class.name"
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms"
val DefaultReplicationFactorProp = "default.replication.factor"
@@ -529,6 +530,9 @@ object KafkaConfig {
val CreateTopicPolicyClassNameDoc = "The create topic policy class that should be used for validation. The class should " +
"implement the org.apache.kafka.server.policy.CreateTopicPolicy interface."
+ val AlterConfigPolicyClassNameDoc = "The alter configs policy class that should be used for validation. The class should " +
+ "implement the org.apache.kafka.server.policy.AlterConfigPolicy interface."
+
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels"
val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels"
@@ -748,6 +752,7 @@ object KafkaConfig {
.define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc)
.define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
.define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc)
+ .define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, AlterConfigPolicyClassNameDoc)
/** ********* Replication configuration ***********/
.define(ControllerSocketTimeoutMsProp, INT, Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc)
diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
similarity index 95%
rename from core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
rename to core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 0e21da7fd01..0a1d229563c 100644
--- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -23,9 +23,9 @@ import java.util.concurrent.{ExecutionException, TimeUnit}
import org.apache.kafka.common.utils.{Time, Utils}
import kafka.integration.KafkaServerTestHarness
import kafka.log.LogConfig
-import kafka.server.{Defaults, KafkaConfig}
+import kafka.server.{Defaults, KafkaConfig, KafkaServer}
import org.apache.kafka.clients.admin._
-import kafka.utils.{Logging, TestUtils}
+import kafka.utils.{Logging, TestUtils, ZkUtils}
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.KafkaFuture
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
@@ -45,7 +45,9 @@ import scala.collection.JavaConverters._
*
* Also see {@link org.apache.kafka.clients.admin.KafkaAdminClientTest} for a unit test of the admin client.
*/
-class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
+class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
+
+ import AdminClientIntegrationTest._
@Rule
def globalTimeout = Timeout.millis(120000)
@@ -244,147 +246,17 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
assertEquals(servers(2).config.logCleanerThreads.toString,
configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value)
- // Alter topics
- var topicConfigEntries1 = Seq(
- new ConfigEntry(LogConfig.FlushMsProp, "1000")
- ).asJava
-
- var topicConfigEntries2 = Seq(
- new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"),
- new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
- ).asJava
-
- var alterResult = client.alterConfigs(Map(
- topicResource1 -> new Config(topicConfigEntries1),
- topicResource2 -> new Config(topicConfigEntries2)
- ).asJava)
-
- assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
- alterResult.all.get
-
- // Verify that topics were updated correctly
- describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava)
- configs = describeResult.all.get
-
- assertEquals(2, configs.size)
-
- assertEquals("1000", configs.get(topicResource1).get(LogConfig.FlushMsProp).value)
- assertEquals(Defaults.MessageMaxBytes.toString,
- configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value)
- assertEquals((Defaults.LogRetentionHours * 60 * 60 * 1000).toString,
- configs.get(topicResource1).get(LogConfig.RetentionMsProp).value)
-
- assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
- assertEquals("lz4", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
-
- // Alter topics with validateOnly=true
- topicConfigEntries1 = Seq(
- new ConfigEntry(LogConfig.MaxMessageBytesProp, "10")
- ).asJava
-
- topicConfigEntries2 = Seq(
- new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.3")
- ).asJava
-
- alterResult = client.alterConfigs(Map(
- topicResource1 -> new Config(topicConfigEntries1),
- topicResource2 -> new Config(topicConfigEntries2)
- ).asJava, new AlterConfigsOptions().validateOnly(true))
-
- assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
- alterResult.all.get
-
- // Verify that topics were not updated due to validateOnly = true
- describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava)
- configs = describeResult.all.get
-
- assertEquals(2, configs.size)
-
- assertEquals(Defaults.MessageMaxBytes.toString,
- configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value)
- assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
+ checkValidAlterConfigs(zkUtils, servers, client, topicResource1, topicResource2)
}
@Test
def testInvalidAlterConfigs(): Unit = {
client = AdminClient.create(createConfig)
-
- // Create topics
- val topic1 = "invalid-alter-configs-topic-1"
- val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
- TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, new Properties())
-
- val topic2 = "invalid-alter-configs-topic-2"
- val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
- TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
-
- val topicConfigEntries1 = Seq(
- new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), // this value is invalid as it's above 1.0
- new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
- ).asJava
-
- var topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "snappy")).asJava
-
- val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString)
- val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.CompressionTypeProp, "gzip")).asJava
-
- // Alter configs: first and third are invalid, second is valid
- var alterResult = client.alterConfigs(Map(
- topicResource1 -> new Config(topicConfigEntries1),
- topicResource2 -> new Config(topicConfigEntries2),
- brokerResource -> new Config(brokerConfigEntries)
- ).asJava)
-
- assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
- assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
- alterResult.results.get(topicResource2).get
- assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
-
- // Verify that first and third resources were not updated and second was updated
- var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
- var configs = describeResult.all.get
- assertEquals(3, configs.size)
-
- assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
- configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
- assertEquals(Defaults.CompressionType.toString,
- configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
-
- assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
-
- assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(LogConfig.CompressionTypeProp).value)
-
- // Alter configs with validateOnly = true: first and third are invalid, second is valid
- topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip")).asJava
-
- alterResult = client.alterConfigs(Map(
- topicResource1 -> new Config(topicConfigEntries1),
- topicResource2 -> new Config(topicConfigEntries2),
- brokerResource -> new Config(brokerConfigEntries)
- ).asJava, new AlterConfigsOptions().validateOnly(true))
-
- assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
- assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
- alterResult.results.get(topicResource2).get
- assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
-
- // Verify that no resources are updated since validate_only = true
- describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
- configs = describeResult.all.get
- assertEquals(3, configs.size)
-
- assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
- configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
- assertEquals(Defaults.CompressionType.toString,
- configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
-
- assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
-
- assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(LogConfig.CompressionTypeProp).value)
+ checkInvalidAlterConfigs(zkUtils, servers, client)
}
val ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
- new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
/**
* Test that ACL operations are not possible when the authorizer is disabled.
@@ -473,3 +345,147 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
cfgs.map(KafkaConfig.fromProps)
}
}
+
+object AdminClientIntegrationTest {
+
+ import org.scalatest.Assertions._
+
+ def checkValidAlterConfigs(zkUtils: ZkUtils, servers: Seq[KafkaServer], client: AdminClient,
+ topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = {
+ // Alter topics
+ var topicConfigEntries1 = Seq(
+ new ConfigEntry(LogConfig.FlushMsProp, "1000")
+ ).asJava
+
+ var topicConfigEntries2 = Seq(
+ new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"),
+ new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
+ ).asJava
+
+ var alterResult = client.alterConfigs(Map(
+ topicResource1 -> new Config(topicConfigEntries1),
+ topicResource2 -> new Config(topicConfigEntries2)
+ ).asJava)
+
+ assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
+ alterResult.all.get
+
+ // Verify that topics were updated correctly
+ var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava)
+ var configs = describeResult.all.get
+
+ assertEquals(2, configs.size)
+
+ assertEquals("1000", configs.get(topicResource1).get(LogConfig.FlushMsProp).value)
+ assertEquals(Defaults.MessageMaxBytes.toString,
+ configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value)
+ assertEquals((Defaults.LogRetentionHours * 60 * 60 * 1000).toString,
+ configs.get(topicResource1).get(LogConfig.RetentionMsProp).value)
+
+ assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
+ assertEquals("lz4", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
+
+ // Alter topics with validateOnly=true
+ topicConfigEntries1 = Seq(
+ new ConfigEntry(LogConfig.MaxMessageBytesProp, "10")
+ ).asJava
+
+ topicConfigEntries2 = Seq(
+ new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.3")
+ ).asJava
+
+ alterResult = client.alterConfigs(Map(
+ topicResource1 -> new Config(topicConfigEntries1),
+ topicResource2 -> new Config(topicConfigEntries2)
+ ).asJava, new AlterConfigsOptions().validateOnly(true))
+
+ assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
+ alterResult.all.get
+
+ // Verify that topics were not updated due to validateOnly = true
+ describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava)
+ configs = describeResult.all.get
+
+ assertEquals(2, configs.size)
+
+ assertEquals(Defaults.MessageMaxBytes.toString,
+ configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value)
+ assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
+ }
+
+ def checkInvalidAlterConfigs(zkUtils: ZkUtils, servers: Seq[KafkaServer], client: AdminClient): Unit = {
+ // Create topics
+ val topic1 = "invalid-alter-configs-topic-1"
+ val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
+ TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, new Properties())
+
+ val topic2 = "invalid-alter-configs-topic-2"
+ val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
+ TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
+
+ val topicConfigEntries1 = Seq(
+ new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), // this value is invalid as it's above 1.0
+ new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
+ ).asJava
+
+ var topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "snappy")).asJava
+
+ val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString)
+ val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.CompressionTypeProp, "gzip")).asJava
+
+ // Alter configs: first and third are invalid, second is valid
+ var alterResult = client.alterConfigs(Map(
+ topicResource1 -> new Config(topicConfigEntries1),
+ topicResource2 -> new Config(topicConfigEntries2),
+ brokerResource -> new Config(brokerConfigEntries)
+ ).asJava)
+
+ assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
+ assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
+ alterResult.results.get(topicResource2).get
+ assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+
+ // Verify that first and third resources were not updated and second was updated
+ var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
+ var configs = describeResult.all.get
+ assertEquals(3, configs.size)
+
+ assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
+ configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
+ assertEquals(Defaults.CompressionType.toString,
+ configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
+
+ assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
+
+ assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
+
+ // Alter configs with validateOnly = true: first and third are invalid, second is valid
+ topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip")).asJava
+
+ alterResult = client.alterConfigs(Map(
+ topicResource1 -> new Config(topicConfigEntries1),
+ topicResource2 -> new Config(topicConfigEntries2),
+ brokerResource -> new Config(brokerConfigEntries)
+ ).asJava, new AlterConfigsOptions().validateOnly(true))
+
+ assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
+ assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
+ alterResult.results.get(topicResource2).get
+ assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+
+ // Verify that no resources are updated since validate_only = true
+ describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
+ configs = describeResult.all.get
+ assertEquals(3, configs.size)
+
+ assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
+ configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
+ assertEquals(Defaults.CompressionType.toString,
+ configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
+
+ assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
+
+ assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
+ }
+
+}
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
new file mode 100644
index 00000000000..7d3c54cfef5
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package kafka.api
+
+import java.util
+import java.util.Properties
+import java.util.concurrent.ExecutionException
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.log.LogConfig
+import kafka.server.{Defaults, KafkaConfig}
+import kafka.utils.{Logging, TestUtils}
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, AlterConfigsOptions, Config, ConfigEntry}
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.errors.{InvalidRequestException, PolicyViolationException}
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.policy.AlterConfigPolicy
+import org.junit.Assert.{assertEquals, assertNull, assertTrue}
+import org.junit.{After, Before, Rule, Test}
+import org.junit.rules.Timeout
+
+import scala.collection.JavaConverters._
+
+/**
+ * Tests AdminClient calls when the broker is configured with policies like AlterConfigPolicy, CreateTopicPolicy, etc.
+ */
+class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with Logging {
+
+ import AdminClientWithPoliciesIntegrationTest._
+
+ var client: AdminClient = null
+ val brokerCount = 3
+
+ @Rule
+ def globalTimeout = Timeout.millis(120000)
+
+ @Before
+ override def setUp(): Unit = {
+ super.setUp
+ TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+ }
+
+ @After
+ override def tearDown(): Unit = {
+ if (client != null)
+ Utils.closeQuietly(client, "AdminClient")
+ super.tearDown()
+ }
+
+ def createConfig: util.Map[String, Object] =
+ Map[String, Object](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList).asJava
+
+ override def generateConfigs = {
+ val configs = TestUtils.createBrokerConfigs(brokerCount, zkConnect)
+ configs.foreach(props => props.put(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[Policy]))
+ configs.map(KafkaConfig.fromProps)
+ }
+
+ @Test
+ def testValidAlterConfigs(): Unit = {
+ client = AdminClient.create(createConfig)
+ // Create topics
+ val topic1 = "describe-alter-configs-topic-1"
+ val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
+ val topicConfig1 = new Properties
+ topicConfig1.setProperty(LogConfig.MaxMessageBytesProp, "500000")
+ topicConfig1.setProperty(LogConfig.RetentionMsProp, "60000000")
+ TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, topicConfig1)
+
+ val topic2 = "describe-alter-configs-topic-2"
+ val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
+ TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
+
+ AdminClientIntegrationTest.checkValidAlterConfigs(zkUtils, servers, client, topicResource1, topicResource2)
+ }
+
+ @Test
+ def testInvalidAlterConfigs(): Unit = {
+ client = AdminClient.create(createConfig)
+ AdminClientIntegrationTest.checkInvalidAlterConfigs(zkUtils, servers, client)
+ }
+
+ @Test
+ def testInvalidAlterConfigsDueToPolicy(): Unit = {
+ client = AdminClient.create(createConfig)
+
+ // Create topics
+ val topic1 = "invalid-alter-configs-due-to-policy-topic-1"
+ val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
+ TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, new Properties())
+
+ val topic2 = "invalid-alter-configs-due-to-policy-topic-2"
+ val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
+ TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
+
+ val topic3 = "invalid-alter-configs-due-to-policy-topic-3"
+ val topicResource3 = new ConfigResource(ConfigResource.Type.TOPIC, topic3)
+ TestUtils.createTopic(zkUtils, topic3, 1, 1, servers, new Properties)
+
+ val topicConfigEntries1 = Seq(
+ new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"),
+ new ConfigEntry(LogConfig.MinInSyncReplicasProp, "2") // policy doesn't allow this
+ ).asJava
+
+ var topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.8")).asJava
+
+ val topicConfigEntries3 = Seq(new ConfigEntry(LogConfig.MinInSyncReplicasProp, "-1")).asJava
+
+ val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString)
+ val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.SslTruststorePasswordProp, "12313")).asJava
+
+ // Alter configs: second is valid, the others are invalid
+ var alterResult = client.alterConfigs(Map(
+ topicResource1 -> new Config(topicConfigEntries1),
+ topicResource2 -> new Config(topicConfigEntries2),
+ topicResource3 -> new Config(topicConfigEntries3),
+ brokerResource -> new Config(brokerConfigEntries)
+ ).asJava)
+
+ assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.results.keySet)
+ assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
+ alterResult.results.get(topicResource2).get
+ assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
+ assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+
+ // Verify that the second resource was updated and the others were not
+ var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava)
+ var configs = describeResult.all.get
+ assertEquals(4, configs.size)
+
+ assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
+ configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
+ assertEquals(Defaults.MinInSyncReplicas.toString,
+ configs.get(topicResource1).get(LogConfig.MinInSyncReplicasProp).value)
+
+ assertEquals("0.8", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
+
+ assertNull(configs.get(brokerResource).get(KafkaConfig.SslTruststorePasswordProp).value)
+
+ // Alter configs with validateOnly = true: only second is valid
+ topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.7")).asJava
+
+ alterResult = client.alterConfigs(Map(
+ topicResource1 -> new Config(topicConfigEntries1),
+ topicResource2 -> new Config(topicConfigEntries2),
+ brokerResource -> new Config(brokerConfigEntries),
+ topicResource3 -> new Config(topicConfigEntries3)
+ ).asJava, new AlterConfigsOptions().validateOnly(true))
+
+ assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.results.keySet)
+ assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
+ alterResult.results.get(topicResource2).get
+ assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
+ assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+
+ // Verify that no resources are updated since validate_only = true
+ describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava)
+ configs = describeResult.all.get
+ assertEquals(4, configs.size)
+
+ assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
+ configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
+ assertEquals(Defaults.MinInSyncReplicas.toString,
+ configs.get(topicResource1).get(LogConfig.MinInSyncReplicasProp).value)
+
+ assertEquals("0.8", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
+
+ assertNull(configs.get(brokerResource).get(KafkaConfig.SslTruststorePasswordProp).value)
+ }
+
+
+}
+
+object AdminClientWithPoliciesIntegrationTest {
+
+ class Policy extends AlterConfigPolicy {
+
+ var configs: Map[String, _] = _
+ var closed = false
+
+ def configure(configs: util.Map[String, _]): Unit = {
+ this.configs = configs.asScala.toMap
+ }
+
+ def validate(requestMetadata: AlterConfigPolicy.RequestMetadata): Unit = {
+ require(!closed, "Policy should not be closed")
+ require(!configs.isEmpty, "configure should have been called with non empty configs")
+ require(!requestMetadata.configs.isEmpty, "request configs should not be empty")
+ require(requestMetadata.resource.name.nonEmpty, "resource name should not be empty")
+ require(requestMetadata.resource.name.contains("topic"))
+ if (requestMetadata.configs.containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG))
+ throw new PolicyViolationException("Min in sync replicas cannot be updated")
+ }
+
+ def close(): Unit = closed = true
+
+ }
+}
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index d27b0bfeafe..9cd86c3b29d 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -27,7 +27,7 @@ import org.junit.{After, Assert, Before, Test}
import scala.collection.JavaConverters._
-class SaslSslAdminClientIntegrationTest extends KafkaAdminClientIntegrationTest with SaslSetup {
+class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with SaslSetup {
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName())
this.serverConfig.setProperty(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")