KAFKA-13749: CreateTopics in KRaft must return configs (#11941)

Previously, when in KRaft mode, CreateTopics did not return the active configurations for the
topic(s) it had just created. This PR addresses that gap. We will now return these topic
configuration(s) when the user has DESCRIBE_CONFIGS permission. (In the case where the user does
not have this permission, we will omit the configurations and set TopicErrorCode. We will also omit
the number of partitions and replication factor data as well.)

For historical reasons, we use different names to refer to each topic configuration when it is set
in the broker context, as opposed to the topic context. For example, the topic configuration
"segment.ms" corresponds to the broker configuration "log.roll.ms". Additionally, some broker
configurations have synonyms. For example, the broker configuration "log.roll.hours" can be used to
set the log roll time instead of "log.roll.ms". In order to track all of this, this PR adds a
table in LogConfig.scala which maps each topic configuration to an ordered list of ConfigSynonym
classes. (This table is then passed to KafkaConfigSchema as a constructor argument.)

Some synonyms require transformations. For example, in order to convert from "log.roll.hours" to
"segment.ms", we must convert hours to milliseconds. (Note that our assumption right now is that
topic configurations do not have synonyms, only broker configurations. If this changes, we will
need to add some logic to handle it.)

This PR makes the 8-argument constructor for ConfigEntry public. We need this in order to make full
use of ConfigEntry outside of the admin namespace. This change is probably inevitable in general
since otherwise we cannot easily test the output from various admin APIs in junit tests outside the
admin package.

Testing:

This PR adds PlaintextAdminIntegrationTest#testCreateTopicsReturnsConfigs. This test validates
some of the configurations that it gets back from the call to CreateTopics, rather than just checking
if it got back a non-empty map like some of the existing tests. In order to test the
configuration override logic, testCreateDeleteTopics now sets up some custom static and dynamic
configurations.

In QuorumTestHarness, we now allow tests to configure what the ID of the controller should be. This
allows us to set dynamic configurations for the controller in testCreateDeleteTopics. We will have
a more complete fix for setting dynamic configuations on the controller later.

This PR changes ConfigurationControlManager so that it is created via a Builder. This will make it
easier to add more parameters to its constructor without having to update every piece of test code
that uses it. It will also make the test code easier to read.

Reviewers: David Arthur <mumrah@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2022-04-01 10:50:25 -07:00 committed by GitHub
parent 1bdd35d8d8
commit 62ea4c46a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 899 additions and 193 deletions

View File

@ -259,6 +259,7 @@
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />

View File

@ -61,8 +61,14 @@ public class ConfigEntry {
* @param isReadOnly whether the config is read-only and cannot be updated
* @param synonyms Synonym configs in order of precedence
*/
ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean isReadOnly,
List<ConfigSynonym> synonyms, ConfigType type, String documentation) {
public ConfigEntry(String name,
String value,
ConfigSource source,
boolean isSensitive,
boolean isReadOnly,
List<ConfigSynonym> synonyms,
ConfigType type,
String documentation) {
Objects.requireNonNull(name, "name should not be null");
this.name = name;
this.value = value;

View File

@ -27,7 +27,10 @@ import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigExceptio
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.record.{LegacyRecord, RecordVersion, TimestampType}
import org.apache.kafka.common.utils.{ConfigUtils, Utils}
import org.apache.kafka.metadata.ConfigSynonym
import org.apache.kafka.metadata.ConfigSynonym.{HOURS_TO_MILLISECONDS, MINUTES_TO_MILLISECONDS}
import java.util.Arrays.asList
import java.util.{Collections, Locale, Properties}
import scala.annotation.nowarn
import scala.collection.{Map, mutable}
@ -440,37 +443,86 @@ object LogConfig {
}
/**
* Map topic config to the broker config with highest priority. Some of these have additional synonyms
* that can be obtained using [[kafka.server.DynamicBrokerConfig#brokerConfigSynonyms]]
* Maps topic configurations to their equivalent broker configurations.
*
* Topics can be configured either by setting their dynamic topic configurations, or by
* setting equivalent broker configurations. For historical reasons, the equivalent broker
* configurations have different names. This table maps each topic configuration to its
* equivalent broker configurations.
*
* In some cases, the equivalent broker configurations must be transformed before they
* can be used. For example, log.roll.hours must be converted to milliseconds before it
* can be used as the value of segment.ms.
*
* The broker configurations will be used in the order specified here. In other words, if
* both the first and the second synonyms are configured, we will use only the value of
* the first synonym and ignore the second.
*/
@nowarn("cat=deprecation")
val TopicConfigSynonyms = Map(
SegmentBytesProp -> KafkaConfig.LogSegmentBytesProp,
SegmentMsProp -> KafkaConfig.LogRollTimeMillisProp,
SegmentJitterMsProp -> KafkaConfig.LogRollTimeJitterMillisProp,
SegmentIndexBytesProp -> KafkaConfig.LogIndexSizeMaxBytesProp,
FlushMessagesProp -> KafkaConfig.LogFlushIntervalMessagesProp,
FlushMsProp -> KafkaConfig.LogFlushIntervalMsProp,
RetentionBytesProp -> KafkaConfig.LogRetentionBytesProp,
RetentionMsProp -> KafkaConfig.LogRetentionTimeMillisProp,
MaxMessageBytesProp -> KafkaConfig.MessageMaxBytesProp,
IndexIntervalBytesProp -> KafkaConfig.LogIndexIntervalBytesProp,
DeleteRetentionMsProp -> KafkaConfig.LogCleanerDeleteRetentionMsProp,
MinCompactionLagMsProp -> KafkaConfig.LogCleanerMinCompactionLagMsProp,
MaxCompactionLagMsProp -> KafkaConfig.LogCleanerMaxCompactionLagMsProp,
FileDeleteDelayMsProp -> KafkaConfig.LogDeleteDelayMsProp,
MinCleanableDirtyRatioProp -> KafkaConfig.LogCleanerMinCleanRatioProp,
CleanupPolicyProp -> KafkaConfig.LogCleanupPolicyProp,
UncleanLeaderElectionEnableProp -> KafkaConfig.UncleanLeaderElectionEnableProp,
MinInSyncReplicasProp -> KafkaConfig.MinInSyncReplicasProp,
CompressionTypeProp -> KafkaConfig.CompressionTypeProp,
PreAllocateEnableProp -> KafkaConfig.LogPreAllocateProp,
MessageFormatVersionProp -> KafkaConfig.LogMessageFormatVersionProp,
MessageTimestampTypeProp -> KafkaConfig.LogMessageTimestampTypeProp,
MessageTimestampDifferenceMaxMsProp -> KafkaConfig.LogMessageTimestampDifferenceMaxMsProp,
MessageDownConversionEnableProp -> KafkaConfig.LogMessageDownConversionEnableProp
)
val AllTopicConfigSynonyms = Map(
SegmentBytesProp -> asList(
new ConfigSynonym(KafkaConfig.LogSegmentBytesProp)),
SegmentMsProp -> asList(
new ConfigSynonym(KafkaConfig.LogRollTimeMillisProp),
new ConfigSynonym(KafkaConfig.LogRollTimeHoursProp, HOURS_TO_MILLISECONDS)),
SegmentJitterMsProp -> asList(
new ConfigSynonym(KafkaConfig.LogRollTimeJitterMillisProp),
new ConfigSynonym(KafkaConfig.LogRollTimeJitterHoursProp, HOURS_TO_MILLISECONDS)),
SegmentIndexBytesProp -> asList(
new ConfigSynonym(KafkaConfig.LogIndexSizeMaxBytesProp)),
FlushMessagesProp -> asList(
new ConfigSynonym(KafkaConfig.LogFlushIntervalMessagesProp)),
FlushMsProp -> asList(
new ConfigSynonym(KafkaConfig.LogFlushSchedulerIntervalMsProp),
new ConfigSynonym(KafkaConfig.LogFlushIntervalMsProp)),
RetentionBytesProp -> asList(
new ConfigSynonym(KafkaConfig.LogRetentionBytesProp)),
RetentionMsProp -> asList(
new ConfigSynonym(KafkaConfig.LogRetentionTimeMillisProp),
new ConfigSynonym(KafkaConfig.LogRetentionTimeMinutesProp, MINUTES_TO_MILLISECONDS),
new ConfigSynonym(KafkaConfig.LogRetentionTimeHoursProp, HOURS_TO_MILLISECONDS)),
MaxMessageBytesProp -> asList(
new ConfigSynonym(KafkaConfig.MessageMaxBytesProp)),
IndexIntervalBytesProp -> asList(
new ConfigSynonym(KafkaConfig.LogIndexIntervalBytesProp)),
DeleteRetentionMsProp -> asList(
new ConfigSynonym(KafkaConfig.LogCleanerDeleteRetentionMsProp)),
MinCompactionLagMsProp -> asList(
new ConfigSynonym(KafkaConfig.LogCleanerMinCompactionLagMsProp)),
MaxCompactionLagMsProp -> asList(
new ConfigSynonym(KafkaConfig.LogCleanerMaxCompactionLagMsProp)),
FileDeleteDelayMsProp -> asList(
new ConfigSynonym(KafkaConfig.LogDeleteDelayMsProp)),
MinCleanableDirtyRatioProp -> asList(
new ConfigSynonym(KafkaConfig.LogCleanerMinCleanRatioProp)),
CleanupPolicyProp -> asList(
new ConfigSynonym(KafkaConfig.LogCleanupPolicyProp)),
UncleanLeaderElectionEnableProp -> asList(
new ConfigSynonym(KafkaConfig.UncleanLeaderElectionEnableProp)),
MinInSyncReplicasProp -> asList(
new ConfigSynonym(KafkaConfig.MinInSyncReplicasProp)),
CompressionTypeProp -> asList(
new ConfigSynonym(KafkaConfig.CompressionTypeProp)),
PreAllocateEnableProp -> asList(
new ConfigSynonym(KafkaConfig.LogPreAllocateProp)),
MessageFormatVersionProp -> asList(
new ConfigSynonym(KafkaConfig.LogMessageFormatVersionProp)),
MessageTimestampTypeProp -> asList(
new ConfigSynonym(KafkaConfig.LogMessageTimestampTypeProp)),
MessageTimestampDifferenceMaxMsProp -> asList(
new ConfigSynonym(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)),
MessageDownConversionEnableProp -> asList(
new ConfigSynonym(KafkaConfig.LogMessageDownConversionEnableProp)),
).asJava
/**
* Map topic config to the broker config with highest priority. Some of these have additional synonyms
* that can be obtained using [[kafka.server.DynamicBrokerConfig#brokerConfigSynonyms]]
* or using [[AllTopicConfigSynonyms]]
*/
val TopicConfigSynonyms = AllTopicConfigSynonyms.asScala.map {
case (k, v) => k -> v.get(0).name()
}
/**
* Copy the subset of properties that are relevant to Logs. The individual properties

View File

@ -29,7 +29,7 @@ import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.clients.admin.AlterConfigOp
import org.apache.kafka.common.Uuid.ZERO_UUID
import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE}
import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE, DESCRIBE_CONFIGS}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidRequestException, TopicDeletionDisabledException}
import org.apache.kafka.common.internals.FatalExitError
@ -316,7 +316,9 @@ class ControllerApis(val requestChannel: RequestChannel,
val createTopicsRequest = request.body[CreateTopicsRequest]
val future = createTopics(createTopicsRequest.data(),
authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false),
names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(identity))
names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(identity),
names => authHelper.filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
names, logIfDenied = false)(identity))
future.whenComplete { (result, exception) =>
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
if (exception != null) {
@ -329,10 +331,12 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
def createTopics(request: CreateTopicsRequestData,
def createTopics(
request: CreateTopicsRequestData,
hasClusterAuth: Boolean,
getCreatableTopics: Iterable[String] => Set[String])
: CompletableFuture[CreateTopicsResponseData] = {
getCreatableTopics: Iterable[String] => Set[String],
getDescribableTopics: Iterable[String] => Set[String]
): CompletableFuture[CreateTopicsResponseData] = {
val topicNames = new util.HashSet[String]()
val duplicateTopicNames = new util.HashSet[String]()
request.topics().forEach { topicData =>
@ -348,6 +352,7 @@ class ControllerApis(val requestChannel: RequestChannel,
} else {
getCreatableTopics.apply(topicNames.asScala)
}
val describableTopicNames = getDescribableTopics.apply(topicNames.asScala).asJava
val effectiveRequest = request.duplicate()
val iterator = effectiveRequest.topics().iterator()
while (iterator.hasNext) {
@ -357,7 +362,7 @@ class ControllerApis(val requestChannel: RequestChannel,
iterator.remove()
}
}
controller.createTopics(effectiveRequest).thenApply { response =>
controller.createTopics(effectiveRequest, describableTopicNames).thenApply { response =>
duplicateTopicNames.forEach { name =>
response.topics().add(new CreatableTopicResult().
setName(name).

View File

@ -183,7 +183,8 @@ class ControllerServer(
setMetrics(new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry())).
setCreateTopicPolicy(createTopicPolicy.asJava).
setAlterConfigPolicy(alterConfigPolicy.asJava).
setConfigurationValidator(new ControllerConfigurationValidator())
setConfigurationValidator(new ControllerConfigurationValidator()).
setStaticConfig(config.originals)
}
authorizer match {
case Some(a: ClusterMetadataAuthorizer) => controllerBuilder.setAuthorizer(a)

View File

@ -184,5 +184,5 @@ object KafkaRaftServer {
val configSchema = new KafkaConfigSchema(Map(
ConfigResource.Type.BROKER -> new ConfigDef(KafkaConfig.configDef),
ConfigResource.Type.TOPIC -> LogConfig.configDefCopy,
).asJava)
).asJava, LogConfig.AllTopicConfigSynonyms)
}

View File

@ -60,6 +60,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
@ -113,7 +114,7 @@ public class MockController implements Controller {
@Override
synchronized public CompletableFuture<CreateTopicsResponseData>
createTopics(CreateTopicsRequestData request) {
createTopics(CreateTopicsRequestData request, Set<String> describable) {
CreateTopicsResponseData response = new CreateTopicsResponseData();
for (CreatableTopic topic : request.topics()) {
if (topicNameToId.containsKey(topic.name())) {
@ -125,13 +126,30 @@ public class MockController implements Controller {
Uuid topicUuid = new Uuid(0, topicId);
topicNameToId.put(topic.name(), topicUuid);
topics.put(topicUuid, new MockTopic(topic.name(), topicUuid));
response.topics().add(new CreatableTopicResult().
CreatableTopicResult creatableTopicResult = new CreatableTopicResult().
setName(topic.name()).
setErrorCode(Errors.NONE.code()).
setTopicId(topicUuid));
// For a better mock, we might want to return configs, replication factor,
// etc. Right now, the tests that use MockController don't need these
// things.
setTopicId(topicUuid);
if (describable.contains(topic.name())) {
// Note: we don't simulate topic configs here yet.
// Just returning replication factor and numPartitions.
if (topic.assignments() != null && !topic.assignments().isEmpty()) {
creatableTopicResult.
setTopicConfigErrorCode(Errors.NONE.code()).
setReplicationFactor((short)
topic.assignments().iterator().next().brokerIds().size()).
setNumPartitions(topic.assignments().size());
} else {
creatableTopicResult.
setTopicConfigErrorCode(Errors.NONE.code()).
setReplicationFactor(topic.replicationFactor()).
setNumPartitions(topic.numPartitions());
}
} else {
creatableTopicResult.
setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code());
}
response.topics().add(creatableTopicResult);
}
}
return CompletableFuture.completedFuture(response);

View File

@ -48,12 +48,15 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
def brokerCount = 3
override def logDirCount = 2
var testInfo: TestInfo = null
var client: Admin = _
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
this.testInfo = testInfo
super.setUp(testInfo)
waitUntilBrokerMetadataIsPropagated(servers)
waitUntilBrokerMetadataIsPropagated(brokers)
}
@AfterEach
@ -189,6 +192,15 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
override def modifyConfigs(configs: Seq[Properties]): Unit = {
super.modifyConfigs(configs)
// For testCreateTopicsReturnsConfigs, set some static broker configurations so that we can
// verify that they show up in the "configs" output of CreateTopics.
if (testInfo.getTestMethod.toString.contains("testCreateTopicsReturnsConfigs")) {
configs.foreach(config => {
config.setProperty(KafkaConfig.LogRollTimeHoursProp, "2")
config.setProperty(KafkaConfig.LogRetentionTimeMinutesProp, "240")
config.setProperty(KafkaConfig.LogRollTimeJitterMillisProp, "123")
})
}
configs.foreach { config =>
config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
config.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
@ -201,6 +213,18 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
}
}
override def kraftControllerConfigs(): Seq[Properties] = {
val controllerConfig = new Properties()
if (testInfo.getTestMethod.toString.contains("testCreateTopicsReturnsConfigs")) {
// For testCreateTopicsReturnsConfigs, set the controller's ID to 1 so that the dynamic
// config we set for node 1 will apply to it.
controllerConfig.setProperty(KafkaConfig.NodeIdProp, "1")
}
val controllerConfigs = Seq(controllerConfig)
modifyConfigs(controllerConfigs)
controllerConfigs
}
def createConfig: util.Map[String, Object] = {
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())

View File

@ -33,6 +33,8 @@ import kafka.utils.TestUtils._
import kafka.utils.{Log4jController, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.HostResolver
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@ -45,6 +47,8 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.slf4j.LoggerFactory
import scala.annotation.nowarn
@ -74,7 +78,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
brokerLoggerConfigResource = new ConfigResource(
ConfigResource.Type.BROKER_LOGGER, servers.head.config.brokerId.toString)
ConfigResource.Type.BROKER_LOGGER, brokers.head.config.brokerId.toString)
}
@AfterEach
@ -2061,7 +2065,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// we expect the log level to be inherited from the first ancestor with a level configured
assertEquals(kafkaLogLevel, logCleanerLogLevelConfig.value())
assertEquals("kafka.cluster.Replica", logCleanerLogLevelConfig.name())
assertEquals(ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, logCleanerLogLevelConfig.source())
assertEquals(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, logCleanerLogLevelConfig.source())
assertEquals(false, logCleanerLogLevelConfig.isReadOnly)
assertEquals(false, logCleanerLogLevelConfig.isSensitive)
assertTrue(logCleanerLogLevelConfig.synonyms().isEmpty)
@ -2275,6 +2279,85 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
}
/**
* Test that createTopics returns the dynamic configurations of the topics that were created.
*
* Note: this test requires some custom static broker and controller configurations, which are set up in
* BaseAdminIntegrationTest.modifyConfigs and BaseAdminIntegrationTest.kraftControllerConfigs.
*/
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testCreateTopicsReturnsConfigs(quorum: String): Unit = {
client = Admin.create(super.createConfig)
val alterMap = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]
alterMap.put(new ConfigResource(ConfigResource.Type.BROKER, ""), util.Arrays.asList(
new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "10800000"), OpType.SET)))
(brokers.map(_.config) ++ controllerServers.map(_.config)).foreach { case config =>
alterMap.put(new ConfigResource(ConfigResource.Type.BROKER, config.nodeId.toString()),
util.Arrays.asList(new AlterConfigOp(new ConfigEntry(
KafkaConfig.LogCleanerDeleteRetentionMsProp, "34"), OpType.SET)))
}
client.incrementalAlterConfigs(alterMap).all().get()
waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
KafkaConfig.LogCleanerDeleteRetentionMsProp, "").toString.equals("34")),
s"Timed out waiting for change to ${KafkaConfig.LogCleanerDeleteRetentionMsProp}",
waitTimeMs = 60000L)
val newTopics = Seq(new NewTopic("foo", Map((0: Integer) -> Seq[Integer](1, 2).asJava,
(1: Integer) -> Seq[Integer](2, 0).asJava).asJava).
configs(Collections.singletonMap(LogConfig.IndexIntervalBytesProp, "9999999")),
new NewTopic("bar", 3, 3.toShort),
new NewTopic("baz", Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava)
)
val result = client.createTopics(newTopics.asJava)
result.all.get()
waitForTopics(client, newTopics.map(_.name()).toList, List())
assertEquals(2, result.numPartitions("foo").get())
assertEquals(2, result.replicationFactor("foo").get())
assertEquals(3, result.numPartitions("bar").get())
assertEquals(3, result.replicationFactor("bar").get())
assertEquals(configs.head.numPartitions, result.numPartitions("baz").get())
assertEquals(configs.head.defaultReplicationFactor, result.replicationFactor("baz").get())
val topicConfigs = result.config("foo").get()
// From the topic configuration defaults.
assertEquals(new ConfigEntry(LogConfig.CleanupPolicyProp, "delete",
ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), null, null),
topicConfigs.get(LogConfig.CleanupPolicyProp))
// From dynamic cluster config via the synonym LogRetentionTimeHoursProp.
assertEquals(new ConfigEntry(LogConfig.RetentionMsProp, "10800000",
ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG, false, false, Collections.emptyList(), null, null),
topicConfigs.get(LogConfig.RetentionMsProp))
// From dynamic broker config via LogCleanerDeleteRetentionMsProp.
assertEquals(new ConfigEntry(LogConfig.DeleteRetentionMsProp, "34",
ConfigSource.DYNAMIC_BROKER_CONFIG, false, false, Collections.emptyList(), null, null),
topicConfigs.get(LogConfig.DeleteRetentionMsProp))
// From static broker config by SegmentJitterMsProp.
assertEquals(new ConfigEntry(LogConfig.SegmentJitterMsProp, "123",
ConfigSource.STATIC_BROKER_CONFIG, false, false, Collections.emptyList(), null, null),
topicConfigs.get(LogConfig.SegmentJitterMsProp))
// From static broker config by the synonym LogRollTimeHoursProp.
val segmentMsPropType = if (isKRaftTest()) {
ConfigSource.STATIC_BROKER_CONFIG
} else {
ConfigSource.DEFAULT_CONFIG
}
assertEquals(new ConfigEntry(LogConfig.SegmentMsProp, "7200000",
segmentMsPropType, false, false, Collections.emptyList(), null, null),
topicConfigs.get(LogConfig.SegmentMsProp))
// From the dynamic topic config.
assertEquals(new ConfigEntry(LogConfig.IndexIntervalBytesProp, "9999999",
ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, Collections.emptyList(), null, null),
topicConfigs.get(LogConfig.IndexIntervalBytesProp))
}
}
object PlaintextAdminIntegrationTest {
@ -2417,5 +2500,4 @@ object PlaintextAdminIntegrationTest {
assertEquals(Defaults.CompressionType, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
}
}

View File

@ -239,24 +239,26 @@ abstract class QuorumTestHarness extends Logging {
}
private def newKRaftQuorum(testInfo: TestInfo): KRaftQuorumImplementation = {
val clusterId = Uuid.randomUuid().toString
val metadataDir = TestUtils.tempDir()
val metaProperties = new MetaProperties(clusterId, 0)
formatDirectories(immutable.Seq(metadataDir.getAbsolutePath()), metaProperties)
val controllerMetrics = new Metrics()
val propsList = kraftControllerConfigs()
if (propsList.size != 1) {
throw new RuntimeException("Only one KRaft controller is supported for now.")
}
val props = propsList(0)
props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
if (props.getProperty(KafkaConfig.NodeIdProp) == null) {
props.setProperty(KafkaConfig.NodeIdProp, "1000")
}
val nodeId = Integer.parseInt(props.getProperty(KafkaConfig.NodeIdProp))
val metadataDir = TestUtils.tempDir()
val metaProperties = new MetaProperties(Uuid.randomUuid().toString, nodeId)
formatDirectories(immutable.Seq(metadataDir.getAbsolutePath()), metaProperties)
val controllerMetrics = new Metrics()
props.setProperty(KafkaConfig.MetadataLogDirProp, metadataDir.getAbsolutePath())
val proto = controllerListenerSecurityProtocol.toString()
props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"CONTROLLER:${proto}")
props.setProperty(KafkaConfig.ListenersProp, s"CONTROLLER://localhost:0")
props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
props.setProperty(KafkaConfig.QuorumVotersProp, "1000@localhost:0")
props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:0")
val config = new KafkaConfig(props)
val threadNamePrefix = "Controller_" + testInfo.getDisplayName
val controllerQuorumVotersFuture = new CompletableFuture[util.Map[Integer, AddressSpec]]
@ -287,7 +289,7 @@ abstract class QuorumTestHarness extends Logging {
error("Error completing controller socket server future", e)
controllerQuorumVotersFuture.completeExceptionally(e)
} else {
controllerQuorumVotersFuture.complete(Collections.singletonMap(1000,
controllerQuorumVotersFuture.complete(Collections.singletonMap(nodeId,
new InetAddressSpec(new InetSocketAddress("localhost", port))))
}
})
@ -303,7 +305,7 @@ abstract class QuorumTestHarness extends Logging {
controllerServer,
metadataDir,
controllerQuorumVotersFuture,
clusterId,
metaProperties.clusterId,
this)
}

View File

@ -497,6 +497,7 @@ class ControllerApisTest {
new CreatableTopic().setName("bar").setNumPartitions(2).setReplicationFactor(3),
new CreatableTopic().setName("bar").setNumPartitions(2).setReplicationFactor(3),
new CreatableTopic().setName("baz").setNumPartitions(2).setReplicationFactor(3),
new CreatableTopic().setName("indescribable").setNumPartitions(2).setReplicationFactor(3),
new CreatableTopic().setName("quux").setNumPartitions(2).setReplicationFactor(3),
).iterator()))
val expectedResponse = Set(new CreatableTopicResult().setName("foo").
@ -507,11 +508,19 @@ class ControllerApisTest {
setErrorMessage("Duplicate topic name."),
new CreatableTopicResult().setName("baz").
setErrorCode(NONE.code()).
setTopicId(new Uuid(0L, 1L)),
setTopicId(new Uuid(0L, 1L)).
setNumPartitions(2).
setReplicationFactor(3).
setTopicConfigErrorCode(NONE.code()),
new CreatableTopicResult().setName("indescribable").
setErrorCode(NONE.code()).
setTopicId(new Uuid(0L, 2L)).
setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code()),
new CreatableTopicResult().setName("quux").
setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()))
assertEquals(expectedResponse, controllerApis.createTopics(request,
false,
_ => Set("baz", "indescribable"),
_ => Set("baz")).get().topics().asScala.toSet)
}

View File

@ -18,6 +18,7 @@
package org.apache.kafka.controller;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.config.ConfigResource;
@ -51,6 +52,8 @@ import static org.apache.kafka.common.protocol.Errors.INVALID_CONFIG;
public class ConfigurationControlManager {
public static final ConfigResource DEFAULT_NODE = new ConfigResource(Type.BROKER, "");
private final Logger log;
private final SnapshotRegistry snapshotRegistry;
private final KafkaConfigSchema configSchema;
@ -58,13 +61,82 @@ public class ConfigurationControlManager {
private final Optional<AlterConfigPolicy> alterConfigPolicy;
private final ConfigurationValidator validator;
private final TimelineHashMap<ConfigResource, TimelineHashMap<String, String>> configData;
private final Map<String, Object> staticConfig;
private final ConfigResource currentController;
ConfigurationControlManager(LogContext logContext,
static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
private KafkaConfigSchema configSchema = KafkaConfigSchema.EMPTY;
private Consumer<ConfigResource> existenceChecker = __ -> { };
private Optional<AlterConfigPolicy> alterConfigPolicy = Optional.empty();
private ConfigurationValidator validator = ConfigurationValidator.NO_OP;
private Map<String, Object> staticConfig = Collections.emptyMap();
private int nodeId = 0;
Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
return this;
}
Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
return this;
}
Builder setKafkaConfigSchema(KafkaConfigSchema configSchema) {
this.configSchema = configSchema;
return this;
}
Builder setExistenceChecker(Consumer<ConfigResource> existenceChecker) {
this.existenceChecker = existenceChecker;
return this;
}
Builder setAlterConfigPolicy(Optional<AlterConfigPolicy> alterConfigPolicy) {
this.alterConfigPolicy = alterConfigPolicy;
return this;
}
Builder setValidator(ConfigurationValidator validator) {
this.validator = validator;
return this;
}
Builder setStaticConfig(Map<String, Object> staticConfig) {
this.staticConfig = staticConfig;
return this;
}
Builder setNodeId(int nodeId) {
this.nodeId = nodeId;
return this;
}
ConfigurationControlManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
return new ConfigurationControlManager(
logContext,
snapshotRegistry,
configSchema,
existenceChecker,
alterConfigPolicy,
validator,
staticConfig,
nodeId);
}
}
private ConfigurationControlManager(LogContext logContext,
SnapshotRegistry snapshotRegistry,
KafkaConfigSchema configSchema,
Consumer<ConfigResource> existenceChecker,
Optional<AlterConfigPolicy> alterConfigPolicy,
ConfigurationValidator validator) {
ConfigurationValidator validator,
Map<String, Object> staticConfig,
int nodeId) {
this.log = logContext.logger(ConfigurationControlManager.class);
this.snapshotRegistry = snapshotRegistry;
this.configSchema = configSchema;
@ -72,6 +144,8 @@ public class ConfigurationControlManager {
this.alterConfigPolicy = alterConfigPolicy;
this.validator = validator;
this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
this.staticConfig = Collections.unmodifiableMap(new HashMap<>(staticConfig));
this.currentController = new ConfigResource(Type.BROKER, Integer.toString(nodeId));
}
/**
@ -355,6 +429,21 @@ public class ConfigurationControlManager {
return false; // TODO: support configuring unclean leader election.
}
Map<String, ConfigEntry> computeEffectiveTopicConfigs(Map<String, String> creationConfigs) {
return configSchema.resolveEffectiveTopicConfigs(staticConfig, clusterConfig(),
currentControllerConfig(), creationConfigs);
}
Map<String, String> clusterConfig() {
Map<String, String> result = configData.get(DEFAULT_NODE);
return (result == null) ? Collections.emptyMap() : result;
}
Map<String, String> currentControllerConfig() {
Map<String, String> result = configData.get(currentController);
return (result == null) ? Collections.emptyMap() : result;
}
class ConfigurationControlIterator implements Iterator<List<ApiMessageAndVersion>> {
private final long epoch;
private final Iterator<Entry<ConfigResource, TimelineHashMap<String, String>>> iterator;

View File

@ -47,6 +47,7 @@ import org.apache.kafka.metadata.authorizer.AclMutator;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -64,11 +65,12 @@ public interface Controller extends AclMutator, AutoCloseable {
* Create a batch of topics.
*
* @param request The CreateTopicsRequest data.
* @param describable The topics which we have DESCRIBE permission on.
*
* @return A future yielding the response.
*/
CompletableFuture<CreateTopicsResponseData>
createTopics(CreateTopicsRequestData request);
createTopics(CreateTopicsRequestData request, Set<String> describable);
/**
* Unregister a broker.

View File

@ -101,6 +101,7 @@ import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
@ -140,7 +141,7 @@ public final class QuorumController implements Controller {
private Time time = Time.SYSTEM;
private String threadNamePrefix = null;
private LogContext logContext = null;
private KafkaConfigSchema configSchema = new KafkaConfigSchema(Collections.emptyMap());
private KafkaConfigSchema configSchema = KafkaConfigSchema.EMPTY;
private RaftClient<ApiMessageAndVersion> raftClient = null;
private Map<String, VersionRange> supportedFeatures = Collections.emptyMap();
private short defaultReplicationFactor = 3;
@ -155,6 +156,7 @@ public final class QuorumController implements Controller {
private Optional<AlterConfigPolicy> alterConfigPolicy = Optional.empty();
private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP;
private Optional<ClusterMetadataAuthorizer> authorizer = Optional.empty();
private Map<String, Object> staticConfig = Collections.emptyMap();
public Builder(int nodeId, String clusterId) {
this.nodeId = nodeId;
@ -251,6 +253,11 @@ public final class QuorumController implements Controller {
return this;
}
public Builder setStaticConfig(Map<String, Object> staticConfig) {
this.staticConfig = staticConfig;
return this;
}
@SuppressWarnings("unchecked")
public QuorumController build() throws Exception {
if (raftClient == null) {
@ -273,7 +280,8 @@ public final class QuorumController implements Controller {
configSchema, raftClient, supportedFeatures, defaultReplicationFactor,
defaultNumPartitions, isLeaderRecoverySupported, replicaPlacer, snapshotMaxNewRecordBytes,
leaderImbalanceCheckIntervalNs, sessionTimeoutNs, controllerMetrics,
createTopicPolicy, alterConfigPolicy, configurationValidator, authorizer);
createTopicPolicy, alterConfigPolicy, configurationValidator, authorizer,
staticConfig);
} catch (Exception e) {
Utils.closeQuietly(queue, "event queue");
throw e;
@ -1316,7 +1324,8 @@ public final class QuorumController implements Controller {
Optional<CreateTopicPolicy> createTopicPolicy,
Optional<AlterConfigPolicy> alterConfigPolicy,
ConfigurationValidator configurationValidator,
Optional<ClusterMetadataAuthorizer> authorizer) {
Optional<ClusterMetadataAuthorizer> authorizer,
Map<String, Object> staticConfig) {
this.logContext = logContext;
this.log = logContext.logger(QuorumController.class);
this.nodeId = nodeId;
@ -1327,12 +1336,16 @@ public final class QuorumController implements Controller {
this.snapshotRegistry = new SnapshotRegistry(logContext);
this.purgatory = new ControllerPurgatory();
this.resourceExists = new ConfigResourceExistenceChecker();
this.configurationControl = new ConfigurationControlManager(logContext,
snapshotRegistry,
configSchema,
resourceExists,
alterConfigPolicy,
configurationValidator);
this.configurationControl = new ConfigurationControlManager.Builder().
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
setKafkaConfigSchema(configSchema).
setExistenceChecker(resourceExists).
setAlterConfigPolicy(alterConfigPolicy).
setValidator(configurationValidator).
setStaticConfig(staticConfig).
setNodeId(nodeId).
build();
this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
this.clusterControl = new ClusterControlManager(logContext, clusterId, time,
snapshotRegistry, sessionTimeoutNs, replicaPlacer, controllerMetrics);
@ -1367,13 +1380,13 @@ public final class QuorumController implements Controller {
@Override
public CompletableFuture<CreateTopicsResponseData>
createTopics(CreateTopicsRequestData request) {
createTopics(CreateTopicsRequestData request, Set<String> describable) {
if (request.topics().isEmpty()) {
return CompletableFuture.completedFuture(new CreateTopicsResponseData());
}
return appendWriteEvent("createTopics",
time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),
() -> replicationControl.createTopics(request));
() -> replicationControl.createTopics(request, describable));
}
@Override

View File

@ -18,6 +18,7 @@
package org.apache.kafka.controller;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
@ -50,6 +51,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfigCollection;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.ElectLeadersRequestData;
@ -71,6 +73,7 @@ import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
@ -114,7 +117,9 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.UNREGISTER_BRO
import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
import static org.apache.kafka.common.protocol.Errors.INVALID_UPDATE_VERSION;
import static org.apache.kafka.common.protocol.Errors.NONE;
import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
import static org.apache.kafka.common.protocol.Errors.TOPIC_AUTHORIZATION_FAILED;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
@ -127,7 +132,6 @@ import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
* of each partition, as well as administrative tasks like creating or deleting topics.
*/
public class ReplicationControlManager {
static class TopicControlInfo {
private final String name;
private final Uuid id;
@ -148,6 +152,15 @@ public class ReplicationControlManager {
}
}
/**
* Translate a CreateableTopicConfigCollection to a map from string to string.
*/
static Map<String, String> translateCreationConfigs(CreateableTopicConfigCollection collection) {
HashMap<String, String> result = new HashMap<>();
collection.forEach(config -> result.put(config.name(), config.value()));
return Collections.unmodifiableMap(result);
}
private final SnapshotRegistry snapshotRegistry;
private final Logger log;
@ -389,7 +402,7 @@ public class ReplicationControlManager {
}
ControllerResult<CreateTopicsResponseData>
createTopics(CreateTopicsRequestData request) {
createTopics(CreateTopicsRequestData request, Set<String> describable) {
Map<String, ApiError> topicErrors = new HashMap<>();
List<ApiMessageAndVersion> records = new ArrayList<>();
@ -420,7 +433,7 @@ public class ReplicationControlManager {
if (topicErrors.containsKey(topic.name())) continue;
ApiError error;
try {
error = createTopic(topic, records, successes);
error = createTopic(topic, records, successes, describable.contains(topic.name()));
} catch (ApiException e) {
error = ApiError.fromThrowable(e);
}
@ -462,7 +475,9 @@ public class ReplicationControlManager {
private ApiError createTopic(CreatableTopic topic,
List<ApiMessageAndVersion> records,
Map<String, CreatableTopicResult> successes) {
Map<String, CreatableTopicResult> successes,
boolean authorizedToReturnConfigs) {
Map<String, String> creationConfigs = translateCreationConfigs(topic.configs());
Map<Integer, PartitionRegistration> newParts = new HashMap<>();
if (!topic.assignments().isEmpty()) {
if (topic.replicationFactor() != -1) {
@ -499,10 +514,8 @@ public class ReplicationControlManager {
Map<Integer, List<Integer>> assignments = new HashMap<>();
newParts.entrySet().forEach(e -> assignments.put(e.getKey(),
Replicas.toList(e.getValue().replicas)));
Map<String, String> configs = new HashMap<>();
topic.configs().forEach(config -> configs.put(config.name(), config.value()));
return new CreateTopicPolicy.RequestMetadata(
topic.name(), null, null, assignments, configs);
topic.name(), null, null, assignments, creationConfigs);
});
if (error.isFailure()) return error;
} else if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) {
@ -530,21 +543,38 @@ public class ReplicationControlManager {
" time(s): " + e.getMessage());
}
ApiError error = maybeCheckCreateTopicPolicy(() -> {
Map<String, String> configs = new HashMap<>();
topic.configs().forEach(config -> configs.put(config.name(), config.value()));
return new CreateTopicPolicy.RequestMetadata(
topic.name(), numPartitions, replicationFactor, null, configs);
topic.name(), numPartitions, replicationFactor, null, creationConfigs);
});
if (error.isFailure()) return error;
}
Uuid topicId = Uuid.randomUuid();
successes.put(topic.name(), new CreatableTopicResult().
CreatableTopicResult result = new CreatableTopicResult().
setName(topic.name()).
setTopicId(topicId).
setErrorCode((short) 0).
setErrorMessage(null).
setNumPartitions(newParts.size()).
setReplicationFactor((short) newParts.get(0).replicas.length));
setErrorCode(NONE.code()).
setErrorMessage(null);
if (authorizedToReturnConfigs) {
Map<String, ConfigEntry> effectiveConfig = configurationControl.
computeEffectiveTopicConfigs(creationConfigs);
List<String> configNames = new ArrayList<>(effectiveConfig.keySet());
configNames.sort(String::compareTo);
for (String configName : configNames) {
ConfigEntry entry = effectiveConfig.get(configName);
result.configs().add(new CreateTopicsResponseData.CreatableTopicConfigs().
setName(entry.name()).
setValue(entry.isSensitive() ? null : entry.value()).
setReadOnly(entry.isReadOnly()).
setConfigSource(KafkaConfigSchema.translateConfigSource(entry.source()).id()).
setIsSensitive(entry.isSensitive()));
}
result.setNumPartitions(newParts.size());
result.setReplicationFactor((short) newParts.get(0).replicas.length);
result.setTopicConfigErrorCode(NONE.code());
} else {
result.setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code());
}
successes.put(topic.name(), result);
records.add(new ApiMessageAndVersion(new TopicRecord().
setName(topic.name()).
setTopicId(topicId), TOPIC_RECORD.highestSupportedVersion()));

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
/**
* Represents a synonym for a configuration plus a conversion function. The conversion
* function is necessary for cases where the synonym is denominated in different units
* (hours versus milliseconds, etc.)
*/
public class ConfigSynonym {
private static final Logger log = LoggerFactory.getLogger(ConfigSynonym.class);
public static final Function<String, String> HOURS_TO_MILLISECONDS = input -> {
int hours = valueToInt(input, 0, "hoursToMilliseconds");
return String.valueOf(TimeUnit.MILLISECONDS.convert(hours, TimeUnit.HOURS));
};
public static final Function<String, String> MINUTES_TO_MILLISECONDS = input -> {
int hours = valueToInt(input, 0, "minutesToMilliseconds");
return String.valueOf(TimeUnit.MILLISECONDS.convert(hours, TimeUnit.MINUTES));
};
private static int valueToInt(String input, int defaultValue, String what) {
if (input == null) return defaultValue;
String trimmedInput = input.trim();
if (trimmedInput.isEmpty()) {
return defaultValue;
}
try {
return Integer.parseInt(trimmedInput);
} catch (Exception e) {
log.error("{} failed: unable to parse '{}' as an integer.", what, trimmedInput, e);
return defaultValue;
}
}
private final String name;
private final Function<String, String> converter;
public ConfigSynonym(String name, Function<String, String> converter) {
this.name = name;
this.converter = converter;
}
public ConfigSynonym(String name) {
this(name, Function.identity());
}
public String name() {
return name;
}
public Function<String, String> converter() {
return converter;
}
}

View File

@ -17,12 +17,21 @@
package org.apache.kafka.metadata;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
@ -32,12 +41,75 @@ import static java.util.Collections.emptyMap;
* determining the type of config keys (string, int, password, etc.)
*/
public class KafkaConfigSchema {
public static final KafkaConfigSchema EMPTY = new KafkaConfigSchema(emptyMap());
public static final KafkaConfigSchema EMPTY = new KafkaConfigSchema(emptyMap(), emptyMap());
private static final ConfigDef EMPTY_CONFIG_DEF = new ConfigDef();
/**
* Translate a ConfigDef.Type to its equivalent for ConfigEntry.ConfigType.
*
* We do not want this code in ConfigEntry, since that is a public-facing API. On the
* other hand, putting this code in ConfigDef.Type would introduce an unwanted dependency
* from org.apache.kafka.common.config to org.apache.kafka.clients.admin. So it
* makes sense to put it here.
*/
public static ConfigEntry.ConfigType translateConfigType(ConfigDef.Type type) {
switch (type) {
case BOOLEAN:
return ConfigEntry.ConfigType.BOOLEAN;
case STRING:
return ConfigEntry.ConfigType.STRING;
case INT:
return ConfigEntry.ConfigType.INT;
case SHORT:
return ConfigEntry.ConfigType.SHORT;
case LONG:
return ConfigEntry.ConfigType.LONG;
case DOUBLE:
return ConfigEntry.ConfigType.DOUBLE;
case LIST:
return ConfigEntry.ConfigType.LIST;
case CLASS:
return ConfigEntry.ConfigType.CLASS;
case PASSWORD:
return ConfigEntry.ConfigType.PASSWORD;
default:
return ConfigEntry.ConfigType.UNKNOWN;
}
}
private static final Map<ConfigEntry.ConfigSource, DescribeConfigsResponse.ConfigSource> TRANSLATE_CONFIG_SOURCE_MAP;
static {
Map<ConfigEntry.ConfigSource, DescribeConfigsResponse.ConfigSource> map = new HashMap<>();
for (DescribeConfigsResponse.ConfigSource source : DescribeConfigsResponse.ConfigSource.values()) {
map.put(source.source(), source);
}
TRANSLATE_CONFIG_SOURCE_MAP = Collections.unmodifiableMap(map);
}
/**
* Translate a ConfigEntry.ConfigSource enum to its equivalent for DescribeConfigsResponse.
*
* We do not want this code in ConfigEntry, since that is a public-facing API. On the
* other hand, putting this code in DescribeConfigsResponse would introduce an unwanted
* dependency from org.apache.kafka.common.requests to org.apache.kafka.clients.admin.
* So it makes sense to put it here.
*/
public static DescribeConfigsResponse.ConfigSource translateConfigSource(ConfigEntry.ConfigSource configSource) {
DescribeConfigsResponse.ConfigSource result = TRANSLATE_CONFIG_SOURCE_MAP.get(configSource);
if (result != null) return result;
return DescribeConfigsResponse.ConfigSource.UNKNOWN;
}
private final Map<ConfigResource.Type, ConfigDef> configDefs;
public KafkaConfigSchema(Map<ConfigResource.Type, ConfigDef> configDefs) {
private final Map<String, List<ConfigSynonym>> logConfigSynonyms;
public KafkaConfigSchema(Map<ConfigResource.Type, ConfigDef> configDefs,
Map<String, List<ConfigSynonym>> logConfigSynonyms) {
this.configDefs = configDefs;
this.logConfigSynonyms = logConfigSynonyms;
}
/**
@ -84,4 +156,91 @@ public class KafkaConfigSchema {
}
return ConfigDef.convertToString(configKey.defaultValue, configKey.type);
}
public Map<String, ConfigEntry> resolveEffectiveTopicConfigs(
Map<String, ? extends Object> staticNodeConfig,
Map<String, ? extends Object> dynamicClusterConfigs,
Map<String, ? extends Object> dynamicNodeConfigs,
Map<String, ? extends Object> dynamicTopicConfigs) {
ConfigDef configDef = configDefs.getOrDefault(ConfigResource.Type.TOPIC, EMPTY_CONFIG_DEF);
HashMap<String, ConfigEntry> effectiveConfigs = new HashMap<>();
for (ConfigDef.ConfigKey configKey : configDef.configKeys().values()) {
ConfigEntry entry = resolveEffectiveTopicConfig(configKey, staticNodeConfig,
dynamicClusterConfigs, dynamicNodeConfigs, dynamicTopicConfigs);
effectiveConfigs.put(entry.name(), entry);
}
return effectiveConfigs;
}
private ConfigEntry resolveEffectiveTopicConfig(ConfigDef.ConfigKey configKey,
Map<String, ? extends Object> staticNodeConfig,
Map<String, ? extends Object> dynamicClusterConfigs,
Map<String, ? extends Object> dynamicNodeConfigs,
Map<String, ? extends Object> dynamicTopicConfigs) {
if (dynamicTopicConfigs.containsKey(configKey.name)) {
return toConfigEntry(configKey,
dynamicTopicConfigs.get(configKey.name),
ConfigSource.DYNAMIC_TOPIC_CONFIG, Function.identity());
}
List<ConfigSynonym> synonyms = logConfigSynonyms.getOrDefault(configKey.name, emptyList());
for (ConfigSynonym synonym : synonyms) {
if (dynamicNodeConfigs.containsKey(synonym.name())) {
return toConfigEntry(configKey, dynamicNodeConfigs.get(synonym.name()),
ConfigSource.DYNAMIC_BROKER_CONFIG, synonym.converter());
}
}
for (ConfigSynonym synonym : synonyms) {
if (dynamicClusterConfigs.containsKey(synonym.name())) {
return toConfigEntry(configKey, dynamicClusterConfigs.get(synonym.name()),
ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG, synonym.converter());
}
}
for (ConfigSynonym synonym : synonyms) {
if (staticNodeConfig.containsKey(synonym.name())) {
return toConfigEntry(configKey, staticNodeConfig.get(synonym.name()),
ConfigSource.STATIC_BROKER_CONFIG, synonym.converter());
}
}
return toConfigEntry(configKey, configKey.hasDefault() ? configKey.defaultValue : null,
ConfigSource.DEFAULT_CONFIG, Function.identity());
}
private ConfigEntry toConfigEntry(ConfigDef.ConfigKey configKey,
Object value,
ConfigSource source,
Function<String, String> converter) {
// Convert the value into a nulllable string suitable for storing in ConfigEntry.
String stringValue = null;
if (value != null) {
if (value instanceof String) {
// The value may already be a string if it's coming from a Map<String, String>.
// Then it doesn't need to be converted.
stringValue = (String) value;
} else if (value instanceof Password) {
// We want the actual value here, not [hidden], which is what we'd get
// from Password#toString. While we don't return sensitive config values
// over the wire to users, we may need the real value internally.
stringValue = ((Password) value).value();
} else {
try {
// Use the ConfigDef function here which will handle List, Class, etc.
stringValue = ConfigDef.convertToString(value, configKey.type);
} catch (Exception e) {
throw new RuntimeException("Unable to convert " + configKey.name + " to string.", e);
}
}
}
if (stringValue != null) {
stringValue = converter.apply(stringValue);
}
return new ConfigEntry(
configKey.name,
stringValue,
source,
configKey.type().isSensitive(),
false, // "readonly" is always false, for now.
emptyList(), // we don't populate synonyms, for now.
translateConfigType(configKey.type()),
configKey.documentation);
}
}

View File

@ -24,15 +24,15 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.ConfigSynonym;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
import org.apache.kafka.timeline.SnapshotRegistry;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
@ -53,6 +53,7 @@ import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SUBTRACT;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static org.apache.kafka.metadata.ConfigSynonym.HOURS_TO_MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -69,10 +70,19 @@ public class ConfigurationControlManagerTest {
CONFIGS.put(TOPIC, new ConfigDef().
define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc").
define("def", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "def").
define("ghi", ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.HIGH, "ghi"));
define("ghi", ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.HIGH, "ghi").
define("quuux", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "quux"));
}
static final KafkaConfigSchema SCHEMA = new KafkaConfigSchema(CONFIGS);
public static final Map<String, List<ConfigSynonym>> SYNONYMS = new HashMap<>();
static {
SYNONYMS.put("abc", Arrays.asList(new ConfigSynonym("foo.bar")));
SYNONYMS.put("def", Arrays.asList(new ConfigSynonym("baz")));
SYNONYMS.put("quuux", Arrays.asList(new ConfigSynonym("quux", HOURS_TO_MILLISECONDS)));
}
static final KafkaConfigSchema SCHEMA = new KafkaConfigSchema(CONFIGS, SYNONYMS);
static final ConfigResource BROKER0 = new ConfigResource(BROKER, "0");
static final ConfigResource MYTOPIC = new ConfigResource(TOPIC, "mytopic");
@ -101,26 +111,11 @@ public class ConfigurationControlManagerTest {
return new SimpleImmutableEntry<>(a, b);
}
static ConfigurationControlManager newConfigurationControlManager() {
return newConfigurationControlManager(Optional.empty());
}
static ConfigurationControlManager newConfigurationControlManager(
Optional<AlterConfigPolicy> alterConfigPolicy
) {
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
return new ConfigurationControlManager(logContext,
snapshotRegistry,
SCHEMA,
TestExistenceChecker.INSTANCE,
alterConfigPolicy,
ConfigurationValidator.NO_OP);
}
@Test
public void testReplay() throws Exception {
ConfigurationControlManager manager = newConfigurationControlManager();
ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
setKafkaConfigSchema(SCHEMA).
build();
assertEquals(Collections.emptyMap(), manager.getConfigs(BROKER0));
manager.replay(new ConfigRecord().
setResourceType(BROKER.id()).setResourceName("0").
@ -151,7 +146,9 @@ public class ConfigurationControlManagerTest {
@Test
public void testIncrementalAlterConfigs() {
ConfigurationControlManager manager = newConfigurationControlManager();
ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
setKafkaConfigSchema(SCHEMA).
build();
ControllerResult<Map<ConfigResource, ApiError>> result = manager.
incrementalAlterConfigs(toMap(entry(BROKER0, toMap(
@ -180,7 +177,10 @@ public class ConfigurationControlManagerTest {
@Test
public void testIncrementalAlterConfigsWithoutExistence() {
ConfigurationControlManager manager = newConfigurationControlManager();
ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
setKafkaConfigSchema(SCHEMA).
setExistenceChecker(TestExistenceChecker.INSTANCE).
build();
ConfigResource existingTopic = new ConfigResource(TOPIC, "ExistingTopic");
ControllerResult<Map<ConfigResource, ApiError>> result = manager.
@ -236,9 +236,10 @@ public class ConfigurationControlManagerTest {
new RequestMetadata(MYTOPIC, Collections.emptyMap()),
new RequestMetadata(BROKER0, toMap(entry("foo.bar", "123"),
entry("quux", "456")))));
ConfigurationControlManager manager = newConfigurationControlManager(Optional.of(policy));
ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
setKafkaConfigSchema(SCHEMA).
setAlterConfigPolicy(Optional.of(policy)).
build();
assertEquals(ControllerResult.atomicOf(asList(new ApiMessageAndVersion(
new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
setName("foo.bar").setValue("123"), (short) 0), new ApiMessageAndVersion(
@ -260,7 +261,9 @@ public class ConfigurationControlManagerTest {
@Test
public void testLegacyAlterConfigs() {
ConfigurationControlManager manager = newConfigurationControlManager();
ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
setKafkaConfigSchema(SCHEMA).
build();
List<ApiMessageAndVersion> expectedRecords1 = asList(
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").

View File

@ -233,7 +233,8 @@ public class QuorumControllerTest {
new CreatableTopicCollection(Collections.singleton(
new CreatableTopic().setName("foo").setNumPartitions(numberOfPartitions).
setReplicationFactor(replicationFactor)).iterator()));
CreateTopicsResponseData createTopicsResponseData = active.createTopics(createTopicsRequestData).get();
CreateTopicsResponseData createTopicsResponseData = active.createTopics(
createTopicsRequestData, Collections.singleton("foo")).get();
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
@ -322,7 +323,8 @@ public class QuorumControllerTest {
new CreatableTopicCollection(Collections.singleton(
new CreatableTopic().setName("foo").setNumPartitions(numberOfPartitions).
setReplicationFactor(replicationFactor)).iterator()));
CreateTopicsResponseData createTopicsResponseData = active.createTopics(createTopicsRequestData).get();
CreateTopicsResponseData createTopicsResponseData = active.createTopics(
createTopicsRequestData, Collections.singleton("foo")).get();
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
@ -432,16 +434,17 @@ public class QuorumControllerTest {
new CreatableTopic().setName("foo").setNumPartitions(1).
setReplicationFactor((short) 1)).iterator()));
assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), active.createTopics(
createTopicsRequestData).get().topics().find("foo").errorCode());
createTopicsRequestData, Collections.singleton("foo")).get().
topics().find("foo").errorCode());
assertEquals("Unable to replicate the partition 1 time(s): All brokers " +
"are currently fenced.", active.createTopics(
createTopicsRequestData).get().topics().find("foo").errorMessage());
"are currently fenced.", active.createTopics(createTopicsRequestData,
Collections.singleton("foo")).get().topics().find("foo").errorMessage());
assertEquals(new BrokerHeartbeatReply(true, false, false, false),
active.processBrokerHeartbeat(new BrokerHeartbeatRequestData().
setWantFence(false).setBrokerEpoch(0L).setBrokerId(0).
setCurrentMetadataOffset(100000L)).get());
assertEquals(Errors.NONE.code(), active.createTopics(
createTopicsRequestData).get().topics().find("foo").errorCode());
assertEquals(Errors.NONE.code(), active.createTopics(createTopicsRequestData,
Collections.singleton("foo")).get().topics().find("foo").errorCode());
CompletableFuture<TopicIdPartition> topicPartitionFuture = active.appendReadEvent(
"debugGetPartition", () -> {
Iterator<TopicIdPartition> iterator = active.
@ -504,7 +507,8 @@ public class QuorumControllerTest {
new CreatableReplicaAssignment().
setPartitionIndex(1).
setBrokerIds(Arrays.asList(1, 2, 0))).
iterator()))).iterator()))).get();
iterator()))).iterator())),
Collections.singleton("foo")).get();
fooId = fooData.topics().find("foo").topicId();
active.allocateProducerIds(
new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
@ -573,7 +577,8 @@ public class QuorumControllerTest {
new CreatableReplicaAssignment().
setPartitionIndex(1).
setBrokerIds(Arrays.asList(1, 2, 0))).
iterator()))).iterator()))).get();
iterator()))).iterator())),
Collections.singleton("foo")).get();
fooId = fooData.topics().find("foo").topicId();
active.allocateProducerIds(
new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
@ -636,7 +641,8 @@ public class QuorumControllerTest {
new CreatableReplicaAssignment().
setPartitionIndex(1).
setBrokerIds(Arrays.asList(1, 2, 0))).
iterator()))).iterator()))).get();
iterator()))).iterator())),
Collections.singleton(topicName)).get();
}
logEnv.waitForLatestSnapshot();
}
@ -772,7 +778,8 @@ public class QuorumControllerTest {
CompletableFuture<CreateTopicsResponseData> createFuture =
controller.createTopics(new CreateTopicsRequestData().setTimeoutMs(0).
setTopics(new CreatableTopicCollection(Collections.singleton(
new CreatableTopic().setName("foo")).iterator())));
new CreatableTopic().setName("foo")).iterator())),
Collections.emptySet());
long now = controller.time().nanoseconds();
CompletableFuture<Map<Uuid, ApiError>> deleteFuture =
controller.deleteTopics(now, Collections.singletonList(Uuid.ZERO_UUID));
@ -827,7 +834,8 @@ public class QuorumControllerTest {
QuorumController controller = controlEnv.activeController();
CountDownLatch countDownLatch = controller.pause();
CompletableFuture<CreateTopicsResponseData> createFuture =
controller.createTopics(new CreateTopicsRequestData().setTimeoutMs(120000));
controller.createTopics(new CreateTopicsRequestData().setTimeoutMs(120000),
Collections.emptySet());
long deadlineMs = controller.time().nanoseconds() + HOURS.toNanos(1);
CompletableFuture<Map<Uuid, ApiError>> deleteFuture =
controller.deleteTopics(deadlineMs, Collections.emptyList());
@ -877,20 +885,14 @@ public class QuorumControllerTest {
)
.collect(Collectors.toList());
Uuid topicId = controller.createTopics(
new CreateTopicsRequestData()
.setTopics(
new CreatableTopicCollection(
Collections.singleton(
new CreatableTopic()
Uuid topicId = controller.createTopics(new CreateTopicsRequestData()
.setTopics(new CreatableTopicCollection(Collections.singleton(new CreatableTopic()
.setName(topicName)
.setNumPartitions(-1)
.setReplicationFactor((short) -1)
.setAssignments(new CreatableReplicaAssignmentCollection(partitions.iterator()))
).iterator()
)
)
).get().topics().find(topicName).topicId();
).iterator())),
Collections.singleton("foo")).get().topics().find(topicName).topicId();
// Create a lot of alter isr
List<AlterPartitionRequestData.PartitionData> alterPartitions = IntStream
@ -1029,7 +1031,8 @@ public class QuorumControllerTest {
setTopics(new CreatableTopicCollection(Collections.singleton(
new CreatableTopic().setName("foo").
setReplicationFactor((short) 3).
setNumPartitions(1)).iterator()))).get();
setNumPartitions(1)).iterator())),
Collections.singleton("foo")).get();
ConfigResourceExistenceChecker checker =
active.new ConfigResourceExistenceChecker();
// A ConfigResource with type=BROKER and name=(empty string) represents

View File

@ -67,7 +67,6 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
@ -82,6 +81,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.ArrayList;
import java.util.Collections;
@ -142,13 +142,9 @@ public class ReplicationControlManagerTest {
TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS),
new StripedReplicaPlacer(random),
metrics);
final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
new LogContext(),
snapshotRegistry,
KafkaConfigSchema.EMPTY,
__ -> { },
Optional.empty(),
(__, ___) -> { });
final ConfigurationControlManager configurationControl = new ConfigurationControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
build();
final ReplicationControlManager replicationControl;
void replay(List<ApiMessageAndVersion> records) throws Exception {
@ -184,7 +180,7 @@ public class ReplicationControlManagerTest {
topic.setNumPartitions(numPartitions).setReplicationFactor(replicationFactor);
request.topics().add(topic);
ControllerResult<CreateTopicsResponseData> result =
replicationControl.createTopics(request);
replicationControl.createTopics(request, Collections.singleton(name));
CreatableTopicResult topicResult = result.response().topics().find(name);
assertNotNull(topicResult);
assertEquals(expectedErrorCode, topicResult.errorCode());
@ -219,7 +215,7 @@ public class ReplicationControlManagerTest {
setValue(e.getValue())));
request.topics().add(topic);
ControllerResult<CreateTopicsResponseData> result =
replicationControl.createTopics(request);
replicationControl.createTopics(request, Collections.singleton(name));
CreatableTopicResult topicResult = result.response().topics().find(name);
assertNotNull(topicResult);
assertEquals(expectedErrorCode, topicResult.errorCode());
@ -402,7 +398,7 @@ public class ReplicationControlManagerTest {
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(-1).setReplicationFactor((short) -1));
ControllerResult<CreateTopicsResponseData> result =
replicationControl.createTopics(request);
replicationControl.createTopics(request, Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).
@ -413,7 +409,7 @@ public class ReplicationControlManagerTest {
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
ControllerResult<CreateTopicsResponseData> result2 =
replicationControl.createTopics(request);
replicationControl.createTopics(request, Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData();
expectedResponse2.topics().add(new CreatableTopicResult().setName("foo").
setNumPartitions(1).setReplicationFactor((short) 3).
@ -426,7 +422,7 @@ public class ReplicationControlManagerTest {
replicationControl.getPartition(
((TopicRecord) result2.records().get(0).message()).topicId(), 0));
ControllerResult<CreateTopicsResponseData> result3 =
replicationControl.createTopics(request);
replicationControl.createTopics(request, Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData();
expectedResponse3.topics().add(new CreatableTopicResult().setName("foo").
setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).
@ -488,7 +484,7 @@ public class ReplicationControlManagerTest {
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(1).setReplicationFactor((short) 3));
ControllerResult<CreateTopicsResponseData> result =
ctx.replicationControl.createTopics(request);
ctx.replicationControl.createTopics(request, Collections.singleton("foo"));
assertEquals(0, result.records().size());
CreatableTopicResult topicResult = result.response().topics().find("foo");
assertEquals((short) 0, topicResult.errorCode());
@ -503,7 +499,7 @@ public class ReplicationControlManagerTest {
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(1).setReplicationFactor((short) 4));
ControllerResult<CreateTopicsResponseData> result =
ctx.replicationControl.createTopics(request);
ctx.replicationControl.createTopics(request, Collections.singleton("foo"));
assertEquals(0, result.records().size());
CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
@ -551,7 +547,7 @@ public class ReplicationControlManagerTest {
List<Uuid> topicsToDelete = new ArrayList<>();
ControllerResult<CreateTopicsResponseData> result =
replicationControl.createTopics(request);
replicationControl.createTopics(request, Collections.singleton("foo"));
topicsToDelete.add(result.response().topics().find("foo").topicId());
RecordTestUtils.replayAll(replicationControl, result.records());
@ -562,7 +558,8 @@ public class ReplicationControlManagerTest {
setNumPartitions(1).setReplicationFactor((short) -1));
request.topics().add(new CreatableTopic().setName("baz").
setNumPartitions(2).setReplicationFactor((short) -1));
result = replicationControl.createTopics(request);
result = replicationControl.createTopics(request,
new HashSet<>(Arrays.asList("bar", "baz")));
RecordTestUtils.replayAll(replicationControl, result.records());
assertEquals(3, ctx.metrics.globalTopicsCount());
assertEquals(4, ctx.metrics.globalPartitionCount());
@ -889,7 +886,7 @@ public class ReplicationControlManagerTest {
ctx.registerBrokers(0, 1);
ctx.unfenceBrokers(0, 1);
ControllerResult<CreateTopicsResponseData> createResult =
replicationControl.createTopics(request);
replicationControl.createTopics(request, Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
Uuid topicId = createResult.response().topics().find("foo").topicId();
expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
@ -961,8 +958,8 @@ public class ReplicationControlManagerTest {
setNumPartitions(2).setReplicationFactor((short) 2));
ctx.registerBrokers(0, 1);
ctx.unfenceBrokers(0, 1);
ControllerResult<CreateTopicsResponseData> createTopicResult =
replicationControl.createTopics(request);
ControllerResult<CreateTopicsResponseData> createTopicResult = replicationControl.
createTopics(request, new HashSet<>(Arrays.asList("foo", "bar", "quux", "foo2")));
ctx.replay(createTopicResult.records());
List<CreatePartitionsTopic> topics = new ArrayList<>();
topics.add(new CreatePartitionsTopic().

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(value = 40)
public class ConfigSynonymTest {
@Test
public void testHoursToMilliseconds() {
assertEquals("0", ConfigSynonym.HOURS_TO_MILLISECONDS.apply(""));
assertEquals("0", ConfigSynonym.HOURS_TO_MILLISECONDS.apply(" "));
assertEquals("0", ConfigSynonym.HOURS_TO_MILLISECONDS.apply("0"));
assertEquals("442800000", ConfigSynonym.HOURS_TO_MILLISECONDS.apply("123"));
assertEquals("442800000", ConfigSynonym.HOURS_TO_MILLISECONDS.apply(" 123 "));
assertEquals("0", ConfigSynonym.HOURS_TO_MILLISECONDS.apply("not_a_number"));
}
@Test
public void testMinutesToMilliseconds() {
assertEquals("0", ConfigSynonym.MINUTES_TO_MILLISECONDS.apply(""));
assertEquals("0", ConfigSynonym.MINUTES_TO_MILLISECONDS.apply(" "));
assertEquals("0", ConfigSynonym.MINUTES_TO_MILLISECONDS.apply("0"));
assertEquals("7380000", ConfigSynonym.MINUTES_TO_MILLISECONDS.apply("123"));
assertEquals("7380000", ConfigSynonym.MINUTES_TO_MILLISECONDS.apply(" 123 "));
assertEquals("0", ConfigSynonym.MINUTES_TO_MILLISECONDS.apply("not_a_number"));
}
}

View File

@ -17,16 +17,22 @@
package org.apache.kafka.metadata;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.emptyList;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static org.apache.kafka.metadata.ConfigSynonym.HOURS_TO_MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -38,42 +44,122 @@ public class KafkaConfigSchemaTest {
static {
CONFIGS.put(BROKER, new ConfigDef().
define("foo.bar", ConfigDef.Type.LIST, "1", ConfigDef.Importance.HIGH, "foo bar").
define("baz", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "baz").
define("quux", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, "quux").
define("quuux", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux"));
define("foo.bar", ConfigDef.Type.LIST, "1", ConfigDef.Importance.HIGH, "foo bar doc").
define("baz", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "baz doc").
define("quux", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, "quux doc").
define("quuux", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux doc").
define("quuux2", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux2 doc"));
CONFIGS.put(TOPIC, new ConfigDef().
define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc").
define("def", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "def").
define("ghi", ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.HIGH, "ghi").
define("xyz", ConfigDef.Type.PASSWORD, "thedefault", ConfigDef.Importance.HIGH, "xyz"));
define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc doc").
define("def", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "def doc").
define("ghi", ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.HIGH, "ghi doc").
define("xyz", ConfigDef.Type.PASSWORD, "thedefault", ConfigDef.Importance.HIGH, "xyz doc"));
}
public static final Map<String, List<ConfigSynonym>> SYNONYMS = new HashMap<>();
static {
SYNONYMS.put("abc", Arrays.asList(new ConfigSynonym("foo.bar")));
SYNONYMS.put("def", Arrays.asList(new ConfigSynonym("quux", HOURS_TO_MILLISECONDS)));
SYNONYMS.put("ghi", Arrays.asList(new ConfigSynonym("ghi")));
SYNONYMS.put("xyz", Arrays.asList(new ConfigSynonym("quuux"), new ConfigSynonym("quuux2")));
}
private static final KafkaConfigSchema SCHEMA = new KafkaConfigSchema(CONFIGS, SYNONYMS);
@Test
public void testTranslateConfigTypes() {
testTranslateConfigType(ConfigDef.Type.BOOLEAN, ConfigEntry.ConfigType.BOOLEAN);
testTranslateConfigType(ConfigDef.Type.STRING, ConfigEntry.ConfigType.STRING);
testTranslateConfigType(ConfigDef.Type.INT, ConfigEntry.ConfigType.INT);
testTranslateConfigType(ConfigDef.Type.SHORT, ConfigEntry.ConfigType.SHORT);
testTranslateConfigType(ConfigDef.Type.LONG, ConfigEntry.ConfigType.LONG);
testTranslateConfigType(ConfigDef.Type.DOUBLE, ConfigEntry.ConfigType.DOUBLE);
testTranslateConfigType(ConfigDef.Type.LIST, ConfigEntry.ConfigType.LIST);
testTranslateConfigType(ConfigDef.Type.CLASS, ConfigEntry.ConfigType.CLASS);
testTranslateConfigType(ConfigDef.Type.PASSWORD, ConfigEntry.ConfigType.PASSWORD);
}
private static void testTranslateConfigType(ConfigDef.Type a, ConfigEntry.ConfigType b) {
assertEquals(b, KafkaConfigSchema.translateConfigType(a));
}
@Test
public void testTranslateConfigSources() {
testTranslateConfigSource(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG,
DescribeConfigsResponse.ConfigSource.TOPIC_CONFIG);
testTranslateConfigSource(ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG,
DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG);
testTranslateConfigSource(ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG,
DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG);
testTranslateConfigSource(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG,
DescribeConfigsResponse.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG);
testTranslateConfigSource(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG,
DescribeConfigsResponse.ConfigSource.STATIC_BROKER_CONFIG);
testTranslateConfigSource(ConfigEntry.ConfigSource.DEFAULT_CONFIG,
DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG);
}
private static void testTranslateConfigSource(ConfigEntry.ConfigSource a,
DescribeConfigsResponse.ConfigSource b) {
assertEquals(b, KafkaConfigSchema.translateConfigSource(a));
}
@Test
public void testIsSplittable() {
KafkaConfigSchema schema = new KafkaConfigSchema(CONFIGS);
assertTrue(schema.isSplittable(BROKER, "foo.bar"));
assertFalse(schema.isSplittable(BROKER, "baz"));
assertFalse(schema.isSplittable(BROKER, "foo.baz.quux"));
assertFalse(schema.isSplittable(TOPIC, "baz"));
assertTrue(schema.isSplittable(TOPIC, "abc"));
assertTrue(SCHEMA.isSplittable(BROKER, "foo.bar"));
assertFalse(SCHEMA.isSplittable(BROKER, "baz"));
assertFalse(SCHEMA.isSplittable(BROKER, "foo.baz.quux"));
assertFalse(SCHEMA.isSplittable(TOPIC, "baz"));
assertTrue(SCHEMA.isSplittable(TOPIC, "abc"));
}
@Test
public void testGetConfigValueDefault() {
KafkaConfigSchema schema = new KafkaConfigSchema(CONFIGS);
assertEquals("1", schema.getDefault(BROKER, "foo.bar"));
assertEquals(null, schema.getDefault(BROKER, "foo.baz.quux"));
assertEquals(null, schema.getDefault(TOPIC, "abc"));
assertEquals("true", schema.getDefault(TOPIC, "ghi"));
assertEquals("1", SCHEMA.getDefault(BROKER, "foo.bar"));
assertEquals(null, SCHEMA.getDefault(BROKER, "foo.baz.quux"));
assertEquals(null, SCHEMA.getDefault(TOPIC, "abc"));
assertEquals("true", SCHEMA.getDefault(TOPIC, "ghi"));
}
@Test
public void testIsSensitive() {
KafkaConfigSchema schema = new KafkaConfigSchema(CONFIGS);
assertFalse(schema.isSensitive(BROKER, "foo.bar"));
assertTrue(schema.isSensitive(BROKER, "quuux"));
assertTrue(schema.isSensitive(BROKER, "unknown.config.key"));
assertFalse(schema.isSensitive(TOPIC, "abc"));
assertFalse(SCHEMA.isSensitive(BROKER, "foo.bar"));
assertTrue(SCHEMA.isSensitive(BROKER, "quuux"));
assertTrue(SCHEMA.isSensitive(BROKER, "quuux2"));
assertTrue(SCHEMA.isSensitive(BROKER, "unknown.config.key"));
assertFalse(SCHEMA.isSensitive(TOPIC, "abc"));
}
@Test
public void testResolveEffectiveTopicConfig() {
Map<String, String> staticNodeConfig = new HashMap<>();
staticNodeConfig.put("foo.bar", "the,static,value");
staticNodeConfig.put("quux", "123");
staticNodeConfig.put("ghi", "false");
Map<String, String> dynamicClusterConfigs = new HashMap<>();
dynamicClusterConfigs.put("foo.bar", "the,dynamic,cluster,config,value");
dynamicClusterConfigs.put("quux", "456");
Map<String, String> dynamicNodeConfigs = new HashMap<>();
dynamicNodeConfigs.put("quux", "789");
Map<String, String> dynamicTopicConfigs = new HashMap<>();
dynamicTopicConfigs.put("ghi", "true");
Map<String, ConfigEntry> expected = new HashMap<>();
expected.put("abc", new ConfigEntry("abc", "the,dynamic,cluster,config,value",
ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG, false, false, emptyList(),
ConfigEntry.ConfigType.LIST, "abc doc"));
expected.put("def", new ConfigEntry("def", "2840400000",
ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG, false, false, emptyList(),
ConfigEntry.ConfigType.LONG, "def doc"));
expected.put("ghi", new ConfigEntry("ghi", "true",
ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, emptyList(),
ConfigEntry.ConfigType.BOOLEAN, "ghi doc"));
expected.put("xyz", new ConfigEntry("xyz", "thedefault",
ConfigEntry.ConfigSource.DEFAULT_CONFIG, true, false, emptyList(),
ConfigEntry.ConfigType.PASSWORD, "xyz doc"));
assertEquals(expected, SCHEMA.resolveEffectiveTopicConfigs(staticNodeConfig,
dynamicClusterConfigs,
dynamicNodeConfigs,
dynamicTopicConfigs));
}
}