KAFKA-5272; Policy for Alter Configs (KIP-133)

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #3210 from ijuma/kafka-5272-improve-validation-for-describe-alter-configs
This commit is contained in:
Ismael Juma 2017-06-03 03:14:55 +01:00
parent 1c786c589a
commit eb5586102e
7 changed files with 479 additions and 146 deletions

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.policy;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.PolicyViolationException;
import java.util.Map;
/**
* An interface for enforcing a policy on alter configs requests.
*
* Common use cases are requiring that the replication factor, min.insync.replicas and/or retention settings for a
* topic remain within an allowable range.
*
* If <code>alter.config.policy.class.name</code> is defined, Kafka will create an instance of the specified class
* using the default constructor and will then pass the broker configs to its <code>configure()</code> method. During
* broker shutdown, the <code>close()</code> method will be invoked so that resources can be released (if necessary).
*/
public interface AlterConfigPolicy extends Configurable, AutoCloseable {
/**
* Class containing the create request parameters.
*/
class RequestMetadata {
private final ConfigResource resource;
private final Map<String, String> configs;
/**
* Create an instance of this class with the provided parameters.
*
* This constructor is public to make testing of <code>AlterConfigPolicy</code> implementations easier.
*/
public RequestMetadata(ConfigResource resource, Map<String, String> configs) {
this.resource = resource;
this.configs = configs;
}
/**
* Return the configs in the request.
*/
public Map<String, String> configs() {
return configs;
}
public ConfigResource resource() {
return resource;
}
@Override
public String toString() {
return "AlterConfigPolicy.RequestMetadata(resource=" + resource +
", configs=" + configs + ")";
}
}
/**
* Validate the request parameters and throw a <code>PolicyViolationException</code> with a suitable error
* message if the alter configs request parameters for the provided resource do not satisfy this policy.
*
* Clients will receive the POLICY_VIOLATION error code along with the exception's message. Note that validation
* failure only affects the relevant resource, other resources in the request will still be processed.
*
* @param requestMetadata the alter configs request parameters for the provided resource (topic is the only resource
* type whose configs can be updated currently).
* @throws PolicyViolationException if the request parameters do not satisfy this policy.
*/
void validate(RequestMetadata requestMetadata) throws PolicyViolationException;
}

View File

@ -67,7 +67,7 @@ import static org.junit.Assert.fail;
/**
* A unit test for KafkaAdminClient.
*
* See KafkaAdminClientIntegrationTest for an integration test of the KafkaAdminClient.
* See AdminClientIntegrationTest for an integration test.
*/
public class KafkaAdminClientTest {
@Rule

View File

@ -23,14 +23,14 @@ import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.log.LogConfig
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
import org.apache.kafka.common.errors.{ApiException, InvalidRequestException, PolicyViolationException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.CreateTopicsRequest._
import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse, Resource, ResourceType}
import org.apache.kafka.server.policy.CreateTopicPolicy
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
import scala.collection._
@ -47,6 +47,9 @@ class AdminManager(val config: KafkaConfig,
private val createTopicPolicy =
Option(config.getConfiguredInstance(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[CreateTopicPolicy]))
private val alterConfigPolicy =
Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))
def hasDelayedTopicOperations = topicPurgatory.delayed != 0
/**
@ -255,14 +258,28 @@ class AdminManager(val config: KafkaConfig,
resource.`type` match {
case ResourceType.TOPIC =>
val topic = resource.name
val properties = new Properties
config.entries.asScala.foreach { configEntry =>
properties.setProperty(configEntry.name(), configEntry.value())
}
if (validateOnly)
AdminUtils.validateTopicConfig(zkUtils, topic, properties)
else
AdminUtils.changeTopicConfig(zkUtils, topic, properties)
alterConfigPolicy match {
case Some(policy) =>
AdminUtils.validateTopicConfig(zkUtils, topic, properties)
val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap
policy.validate(new AlterConfigPolicy.RequestMetadata(
new ConfigResource(ConfigResource.Type.TOPIC, resource.name), configEntriesMap.asJava))
if (!validateOnly)
AdminUtils.changeTopicConfig(zkUtils, topic, properties)
case None =>
if (validateOnly)
AdminUtils.validateTopicConfig(zkUtils, topic, properties)
else
AdminUtils.changeTopicConfig(zkUtils, topic, properties)
}
resource -> new ApiError(Errors.NONE, null)
case resourceType =>
throw new InvalidRequestException(s"AlterConfigs is only supported for topics, but resource type is $resourceType")
@ -274,8 +291,8 @@ class AdminManager(val config: KafkaConfig,
resource -> ApiError.fromThrowable(new InvalidRequestException(message, e))
case e: Throwable =>
// Log client errors at a lower level than unexpected exceptions
val message = s"Error processing alter configs request for resource $resource"
if (e.isInstanceOf[ApiException])
val message = s"Error processing alter configs request for resource $resource, config $config"
if (e.isInstanceOf[ApiException] || e.isInstanceOf[PolicyViolationException])
info(message, e)
else
error(message, e)
@ -287,5 +304,6 @@ class AdminManager(val config: KafkaConfig,
def shutdown() {
topicPurgatory.shutdown()
CoreUtils.swallow(createTopicPolicy.foreach(_.close()))
CoreUtils.swallow(alterConfigPolicy.foreach(_.close()))
}
}

View File

@ -301,6 +301,7 @@ object KafkaConfig {
val AutoCreateTopicsEnableProp = "auto.create.topics.enable"
val MinInSyncReplicasProp = "min.insync.replicas"
val CreateTopicPolicyClassNameProp = "create.topic.policy.class.name"
val AlterConfigPolicyClassNameProp = "alter.config.policy.class.name"
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms"
val DefaultReplicationFactorProp = "default.replication.factor"
@ -529,6 +530,9 @@ object KafkaConfig {
val CreateTopicPolicyClassNameDoc = "The create topic policy class that should be used for validation. The class should " +
"implement the <code>org.apache.kafka.server.policy.CreateTopicPolicy</code> interface."
val AlterConfigPolicyClassNameDoc = "The alter configs policy class that should be used for validation. The class should " +
"implement the <code>org.apache.kafka.server.policy.AlterConfigPolicy</code> interface."
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels"
val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels"
@ -748,6 +752,7 @@ object KafkaConfig {
.define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc)
.define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
.define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc)
.define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, AlterConfigPolicyClassNameDoc)
/** ********* Replication configuration ***********/
.define(ControllerSocketTimeoutMsProp, INT, Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc)

View File

@ -23,9 +23,9 @@ import java.util.concurrent.{ExecutionException, TimeUnit}
import org.apache.kafka.common.utils.{Time, Utils}
import kafka.integration.KafkaServerTestHarness
import kafka.log.LogConfig
import kafka.server.{Defaults, KafkaConfig}
import kafka.server.{Defaults, KafkaConfig, KafkaServer}
import org.apache.kafka.clients.admin._
import kafka.utils.{Logging, TestUtils}
import kafka.utils.{Logging, TestUtils, ZkUtils}
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.KafkaFuture
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
@ -45,7 +45,9 @@ import scala.collection.JavaConverters._
*
* Also see {@link org.apache.kafka.clients.admin.KafkaAdminClientTest} for a unit test of the admin client.
*/
class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
import AdminClientIntegrationTest._
@Rule
def globalTimeout = Timeout.millis(120000)
@ -244,147 +246,17 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
assertEquals(servers(2).config.logCleanerThreads.toString,
configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value)
// Alter topics
var topicConfigEntries1 = Seq(
new ConfigEntry(LogConfig.FlushMsProp, "1000")
).asJava
var topicConfigEntries2 = Seq(
new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"),
new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
).asJava
var alterResult = client.alterConfigs(Map(
topicResource1 -> new Config(topicConfigEntries1),
topicResource2 -> new Config(topicConfigEntries2)
).asJava)
assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
alterResult.all.get
// Verify that topics were updated correctly
describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava)
configs = describeResult.all.get
assertEquals(2, configs.size)
assertEquals("1000", configs.get(topicResource1).get(LogConfig.FlushMsProp).value)
assertEquals(Defaults.MessageMaxBytes.toString,
configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value)
assertEquals((Defaults.LogRetentionHours * 60 * 60 * 1000).toString,
configs.get(topicResource1).get(LogConfig.RetentionMsProp).value)
assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
assertEquals("lz4", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
// Alter topics with validateOnly=true
topicConfigEntries1 = Seq(
new ConfigEntry(LogConfig.MaxMessageBytesProp, "10")
).asJava
topicConfigEntries2 = Seq(
new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.3")
).asJava
alterResult = client.alterConfigs(Map(
topicResource1 -> new Config(topicConfigEntries1),
topicResource2 -> new Config(topicConfigEntries2)
).asJava, new AlterConfigsOptions().validateOnly(true))
assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
alterResult.all.get
// Verify that topics were not updated due to validateOnly = true
describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava)
configs = describeResult.all.get
assertEquals(2, configs.size)
assertEquals(Defaults.MessageMaxBytes.toString,
configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value)
assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
checkValidAlterConfigs(zkUtils, servers, client, topicResource1, topicResource2)
}
@Test
def testInvalidAlterConfigs(): Unit = {
client = AdminClient.create(createConfig)
// Create topics
val topic1 = "invalid-alter-configs-topic-1"
val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, new Properties())
val topic2 = "invalid-alter-configs-topic-2"
val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
val topicConfigEntries1 = Seq(
new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), // this value is invalid as it's above 1.0
new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
).asJava
var topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "snappy")).asJava
val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString)
val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.CompressionTypeProp, "gzip")).asJava
// Alter configs: first and third are invalid, second is valid
var alterResult = client.alterConfigs(Map(
topicResource1 -> new Config(topicConfigEntries1),
topicResource2 -> new Config(topicConfigEntries2),
brokerResource -> new Config(brokerConfigEntries)
).asJava)
assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
alterResult.results.get(topicResource2).get
assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
// Verify that first and third resources were not updated and second was updated
var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
var configs = describeResult.all.get
assertEquals(3, configs.size)
assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
assertEquals(Defaults.CompressionType.toString,
configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(LogConfig.CompressionTypeProp).value)
// Alter configs with validateOnly = true: first and third are invalid, second is valid
topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip")).asJava
alterResult = client.alterConfigs(Map(
topicResource1 -> new Config(topicConfigEntries1),
topicResource2 -> new Config(topicConfigEntries2),
brokerResource -> new Config(brokerConfigEntries)
).asJava, new AlterConfigsOptions().validateOnly(true))
assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
alterResult.results.get(topicResource2).get
assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
// Verify that no resources are updated since validate_only = true
describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
configs = describeResult.all.get
assertEquals(3, configs.size)
assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
assertEquals(Defaults.CompressionType.toString,
configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(LogConfig.CompressionTypeProp).value)
checkInvalidAlterConfigs(zkUtils, servers, client)
}
val ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
/**
* Test that ACL operations are not possible when the authorizer is disabled.
@ -473,3 +345,147 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
cfgs.map(KafkaConfig.fromProps)
}
}
object AdminClientIntegrationTest {
import org.scalatest.Assertions._
def checkValidAlterConfigs(zkUtils: ZkUtils, servers: Seq[KafkaServer], client: AdminClient,
topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = {
// Alter topics
var topicConfigEntries1 = Seq(
new ConfigEntry(LogConfig.FlushMsProp, "1000")
).asJava
var topicConfigEntries2 = Seq(
new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"),
new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
).asJava
var alterResult = client.alterConfigs(Map(
topicResource1 -> new Config(topicConfigEntries1),
topicResource2 -> new Config(topicConfigEntries2)
).asJava)
assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
alterResult.all.get
// Verify that topics were updated correctly
var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava)
var configs = describeResult.all.get
assertEquals(2, configs.size)
assertEquals("1000", configs.get(topicResource1).get(LogConfig.FlushMsProp).value)
assertEquals(Defaults.MessageMaxBytes.toString,
configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value)
assertEquals((Defaults.LogRetentionHours * 60 * 60 * 1000).toString,
configs.get(topicResource1).get(LogConfig.RetentionMsProp).value)
assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
assertEquals("lz4", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
// Alter topics with validateOnly=true
topicConfigEntries1 = Seq(
new ConfigEntry(LogConfig.MaxMessageBytesProp, "10")
).asJava
topicConfigEntries2 = Seq(
new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.3")
).asJava
alterResult = client.alterConfigs(Map(
topicResource1 -> new Config(topicConfigEntries1),
topicResource2 -> new Config(topicConfigEntries2)
).asJava, new AlterConfigsOptions().validateOnly(true))
assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
alterResult.all.get
// Verify that topics were not updated due to validateOnly = true
describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava)
configs = describeResult.all.get
assertEquals(2, configs.size)
assertEquals(Defaults.MessageMaxBytes.toString,
configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value)
assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
}
def checkInvalidAlterConfigs(zkUtils: ZkUtils, servers: Seq[KafkaServer], client: AdminClient): Unit = {
// Create topics
val topic1 = "invalid-alter-configs-topic-1"
val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, new Properties())
val topic2 = "invalid-alter-configs-topic-2"
val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
val topicConfigEntries1 = Seq(
new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), // this value is invalid as it's above 1.0
new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
).asJava
var topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "snappy")).asJava
val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString)
val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.CompressionTypeProp, "gzip")).asJava
// Alter configs: first and third are invalid, second is valid
var alterResult = client.alterConfigs(Map(
topicResource1 -> new Config(topicConfigEntries1),
topicResource2 -> new Config(topicConfigEntries2),
brokerResource -> new Config(brokerConfigEntries)
).asJava)
assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
alterResult.results.get(topicResource2).get
assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
// Verify that first and third resources were not updated and second was updated
var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
var configs = describeResult.all.get
assertEquals(3, configs.size)
assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
assertEquals(Defaults.CompressionType.toString,
configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
// Alter configs with validateOnly = true: first and third are invalid, second is valid
topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip")).asJava
alterResult = client.alterConfigs(Map(
topicResource1 -> new Config(topicConfigEntries1),
topicResource2 -> new Config(topicConfigEntries2),
brokerResource -> new Config(brokerConfigEntries)
).asJava, new AlterConfigsOptions().validateOnly(true))
assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
alterResult.results.get(topicResource2).get
assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
// Verify that no resources are updated since validate_only = true
describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
configs = describeResult.all.get
assertEquals(3, configs.size)
assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
assertEquals(Defaults.CompressionType.toString,
configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
}
}

View File

@ -0,0 +1,209 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package kafka.api
import java.util
import java.util.Properties
import java.util.concurrent.ExecutionException
import kafka.integration.KafkaServerTestHarness
import kafka.log.LogConfig
import kafka.server.{Defaults, KafkaConfig}
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, AlterConfigsOptions, Config, ConfigEntry}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{InvalidRequestException, PolicyViolationException}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.policy.AlterConfigPolicy
import org.junit.Assert.{assertEquals, assertNull, assertTrue}
import org.junit.{After, Before, Rule, Test}
import org.junit.rules.Timeout
import scala.collection.JavaConverters._
/**
* Tests AdminClient calls when the broker is configured with policies like AlterConfigPolicy, CreateTopicPolicy, etc.
*/
class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with Logging {
import AdminClientWithPoliciesIntegrationTest._
var client: AdminClient = null
val brokerCount = 3
@Rule
def globalTimeout = Timeout.millis(120000)
@Before
override def setUp(): Unit = {
super.setUp
TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
}
@After
override def tearDown(): Unit = {
if (client != null)
Utils.closeQuietly(client, "AdminClient")
super.tearDown()
}
def createConfig: util.Map[String, Object] =
Map[String, Object](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList).asJava
override def generateConfigs = {
val configs = TestUtils.createBrokerConfigs(brokerCount, zkConnect)
configs.foreach(props => props.put(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[Policy]))
configs.map(KafkaConfig.fromProps)
}
@Test
def testValidAlterConfigs(): Unit = {
client = AdminClient.create(createConfig)
// Create topics
val topic1 = "describe-alter-configs-topic-1"
val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
val topicConfig1 = new Properties
topicConfig1.setProperty(LogConfig.MaxMessageBytesProp, "500000")
topicConfig1.setProperty(LogConfig.RetentionMsProp, "60000000")
TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, topicConfig1)
val topic2 = "describe-alter-configs-topic-2"
val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
AdminClientIntegrationTest.checkValidAlterConfigs(zkUtils, servers, client, topicResource1, topicResource2)
}
@Test
def testInvalidAlterConfigs(): Unit = {
client = AdminClient.create(createConfig)
AdminClientIntegrationTest.checkInvalidAlterConfigs(zkUtils, servers, client)
}
@Test
def testInvalidAlterConfigsDueToPolicy(): Unit = {
client = AdminClient.create(createConfig)
// Create topics
val topic1 = "invalid-alter-configs-due-to-policy-topic-1"
val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, new Properties())
val topic2 = "invalid-alter-configs-due-to-policy-topic-2"
val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
val topic3 = "invalid-alter-configs-due-to-policy-topic-3"
val topicResource3 = new ConfigResource(ConfigResource.Type.TOPIC, topic3)
TestUtils.createTopic(zkUtils, topic3, 1, 1, servers, new Properties)
val topicConfigEntries1 = Seq(
new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"),
new ConfigEntry(LogConfig.MinInSyncReplicasProp, "2") // policy doesn't allow this
).asJava
var topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.8")).asJava
val topicConfigEntries3 = Seq(new ConfigEntry(LogConfig.MinInSyncReplicasProp, "-1")).asJava
val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString)
val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.SslTruststorePasswordProp, "12313")).asJava
// Alter configs: second is valid, the others are invalid
var alterResult = client.alterConfigs(Map(
topicResource1 -> new Config(topicConfigEntries1),
topicResource2 -> new Config(topicConfigEntries2),
topicResource3 -> new Config(topicConfigEntries3),
brokerResource -> new Config(brokerConfigEntries)
).asJava)
assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.results.keySet)
assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
alterResult.results.get(topicResource2).get
assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
// Verify that the second resource was updated and the others were not
var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava)
var configs = describeResult.all.get
assertEquals(4, configs.size)
assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
assertEquals(Defaults.MinInSyncReplicas.toString,
configs.get(topicResource1).get(LogConfig.MinInSyncReplicasProp).value)
assertEquals("0.8", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
assertNull(configs.get(brokerResource).get(KafkaConfig.SslTruststorePasswordProp).value)
// Alter configs with validateOnly = true: only second is valid
topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.7")).asJava
alterResult = client.alterConfigs(Map(
topicResource1 -> new Config(topicConfigEntries1),
topicResource2 -> new Config(topicConfigEntries2),
brokerResource -> new Config(brokerConfigEntries),
topicResource3 -> new Config(topicConfigEntries3)
).asJava, new AlterConfigsOptions().validateOnly(true))
assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.results.keySet)
assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
alterResult.results.get(topicResource2).get
assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
// Verify that no resources are updated since validate_only = true
describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava)
configs = describeResult.all.get
assertEquals(4, configs.size)
assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
assertEquals(Defaults.MinInSyncReplicas.toString,
configs.get(topicResource1).get(LogConfig.MinInSyncReplicasProp).value)
assertEquals("0.8", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
assertNull(configs.get(brokerResource).get(KafkaConfig.SslTruststorePasswordProp).value)
}
}
object AdminClientWithPoliciesIntegrationTest {
class Policy extends AlterConfigPolicy {
var configs: Map[String, _] = _
var closed = false
def configure(configs: util.Map[String, _]): Unit = {
this.configs = configs.asScala.toMap
}
def validate(requestMetadata: AlterConfigPolicy.RequestMetadata): Unit = {
require(!closed, "Policy should not be closed")
require(!configs.isEmpty, "configure should have been called with non empty configs")
require(!requestMetadata.configs.isEmpty, "request configs should not be empty")
require(requestMetadata.resource.name.nonEmpty, "resource name should not be empty")
require(requestMetadata.resource.name.contains("topic"))
if (requestMetadata.configs.containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG))
throw new PolicyViolationException("Min in sync replicas cannot be updated")
}
def close(): Unit = closed = true
}
}

View File

@ -27,7 +27,7 @@ import org.junit.{After, Assert, Before, Test}
import scala.collection.JavaConverters._
class SaslSslAdminClientIntegrationTest extends KafkaAdminClientIntegrationTest with SaslSetup {
class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with SaslSetup {
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName())
this.serverConfig.setProperty(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")