MINOR: create KafkaConfigSchema and TimelineObject (#11809)

Create KafkaConfigSchema to encapsulate the concept of determining the types of configuration keys.
This is useful in the controller because we can't import KafkaConfig, which is part of core. Also
introduce the TimelineObject class, which is a more generic version of TimelineInteger /
TimelineLong.

Reviewers: David Arthur <mumrah@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2022-03-02 14:26:31 -08:00 committed by GitHub
parent f089bea7ed
commit 07553d13f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 476 additions and 123 deletions

View File

@ -254,6 +254,7 @@
<subpackage name="metadata">
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />

View File

@ -22,7 +22,6 @@ import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit}
import kafka.cluster.Broker.ServerInfo
import kafka.log.LogConfig
import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.RaftManager
@ -30,7 +29,6 @@ import kafka.security.CredentialProvider
import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPolicyClassNameProp}
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.security.scram.internals.ScramMechanism
@ -38,7 +36,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumControllerMetrics}
import org.apache.kafka.metadata.VersionRange
import org.apache.kafka.metadata.{KafkaConfigSchema, VersionRange}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.server.authorizer.Authorizer
@ -60,7 +58,8 @@ class ControllerServer(
val time: Time,
val metrics: Metrics,
val threadNamePrefix: Option[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
val configSchema: KafkaConfigSchema,
) extends Logging with KafkaMetricsGroup {
import kafka.server.Server._
@ -152,8 +151,6 @@ class ControllerServer(
throw new ConfigException("No controller.listener.names defined for controller");
}
val configDefs = Map(ConfigResource.Type.BROKER -> KafkaConfig.configDef,
ConfigResource.Type.TOPIC -> LogConfig.configDefCopy).asJava
val threadNamePrefixAsString = threadNamePrefix.getOrElse("")
createTopicPolicy = Option(config.
@ -164,7 +161,7 @@ class ControllerServer(
val controllerBuilder = new QuorumController.Builder(config.nodeId, metaProperties.clusterId).
setTime(time).
setThreadNamePrefix(threadNamePrefixAsString).
setConfigDefs(configDefs).
setConfigSchema(configSchema).
setRaftClient(raftManager.client).
setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
setDefaultNumPartitions(config.numPartitions.intValue()).

View File

@ -18,19 +18,22 @@ package kafka.server
import java.io.File
import java.util.concurrent.CompletableFuture
import kafka.common.{InconsistentNodeIdException, KafkaException}
import kafka.log.UnifiedLog
import kafka.log.{LogConfig, UnifiedLog}
import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics}
import kafka.raft.KafkaRaftManager
import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
import org.apache.kafka.common.utils.{AppInfoParser, Time}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
import org.apache.kafka.metadata.{KafkaConfigSchema, MetadataRecordSerde}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.common.ApiMessageAndVersion
import scala.collection.Seq
import scala.jdk.CollectionConverters._
/**
* This class implements the KRaft (Kafka Raft) mode server which relies
@ -97,7 +100,8 @@ class KafkaRaftServer(
time,
metrics,
threadNamePrefix,
controllerQuorumVotersFuture
controllerQuorumVotersFuture,
KafkaRaftServer.configSchema,
))
} else {
None
@ -176,4 +180,8 @@ object KafkaRaftServer {
(metaProperties, offlineDirs.toSeq)
}
val configSchema = new KafkaConfigSchema(Map(
ConfigResource.Type.BROKER -> new ConfigDef(KafkaConfig.configDef),
ConfigResource.Type.TOPIC -> LogConfig.configDefCopy,
).asJava)
}

View File

@ -184,7 +184,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
Time.SYSTEM,
new Metrics(),
Option.apply(threadNamePrefix),
connectFutureManager.future
connectFutureManager.future,
KafkaRaftServer.configSchema()
);
controllers.put(node.id(), controller);
controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {

View File

@ -279,7 +279,9 @@ abstract class QuorumTestHarness extends Logging {
time = Time.SYSTEM,
metrics = controllerMetrics,
threadNamePrefix = Option(threadNamePrefix),
controllerQuorumVotersFuture = controllerQuorumVotersFuture)
controllerQuorumVotersFuture = controllerQuorumVotersFuture,
configSchema = KafkaRaftServer.configSchema,
)
controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e) => {
if (e != null) {
error("Error completing controller socket server future", e)

View File

@ -33,17 +33,27 @@ import org.junit.jupiter.api.Test
import scala.jdk.CollectionConverters._
class BrokerMetadataListenerTest {
private def newBrokerMetadataListener(
snapshotter: Option[MetadataSnapshotter] = None,
maxBytesBetweenSnapshots: Long = 1000000L,
): BrokerMetadataListener = {
new BrokerMetadataListener(
brokerId = 0,
time = Time.SYSTEM,
threadNamePrefix = None,
maxBytesBetweenSnapshots = maxBytesBetweenSnapshots,
snapshotter = snapshotter)
}
@Test
def testCreateAndClose(): Unit = {
val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000000L,
snapshotter = None)
val listener = newBrokerMetadataListener()
listener.close()
}
@Test
def testPublish(): Unit = {
val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000000L,
snapshotter = None)
val listener = newBrokerMetadataListener()
try {
listener.handleCommit(RecordTestUtils.mockBatchReader(100L,
util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
@ -137,8 +147,7 @@ class BrokerMetadataListenerTest {
@Test
def testHandleCommitsWithNoSnapshotterDefined(): Unit = {
val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000L,
snapshotter = None)
val listener = newBrokerMetadataListener(maxBytesBetweenSnapshots = 1000L)
try {
val brokerIds = 0 to 3
@ -157,7 +166,8 @@ class BrokerMetadataListenerTest {
@Test
def testCreateSnapshot(): Unit = {
val snapshotter = new MockMetadataSnapshotter()
val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000L, Some(snapshotter))
val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
maxBytesBetweenSnapshots = 1000L)
try {
val brokerIds = 0 to 3

View File

@ -18,14 +18,13 @@
package org.apache.kafka.controller;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.config.ConfigDef.ConfigKey;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
@ -56,19 +55,19 @@ public class ConfigurationControlManager {
private final Logger log;
private final SnapshotRegistry snapshotRegistry;
private final Map<ConfigResource.Type, ConfigDef> configDefs;
private final KafkaConfigSchema configSchema;
private final Optional<AlterConfigPolicy> alterConfigPolicy;
private final ConfigurationValidator validator;
private final TimelineHashMap<ConfigResource, TimelineHashMap<String, String>> configData;
ConfigurationControlManager(LogContext logContext,
SnapshotRegistry snapshotRegistry,
Map<ConfigResource.Type, ConfigDef> configDefs,
KafkaConfigSchema configSchema,
Optional<AlterConfigPolicy> alterConfigPolicy,
ConfigurationValidator validator) {
this.log = logContext.logger(ConfigurationControlManager.class);
this.snapshotRegistry = snapshotRegistry;
this.configDefs = configDefs;
this.configSchema = configSchema;
this.alterConfigPolicy = alterConfigPolicy;
this.validator = validator;
this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
@ -129,7 +128,7 @@ public class ConfigurationControlManager {
break;
case APPEND:
case SUBTRACT:
if (!isSplittable(configResource.type(), key)) {
if (!configSchema.isSplittable(configResource.type(), key)) {
outputResults.put(configResource, new ApiError(
INVALID_CONFIG, "Can't " + opType + " to " +
"key " + key + " because its type is not LIST."));
@ -259,7 +258,7 @@ public class ConfigurationControlManager {
private List<String> getParts(String value, String key, ConfigResource configResource) {
if (value == null) {
value = getConfigValueDefault(configResource.type(), key);
value = configSchema.getDefault(configResource.type(), key);
}
List<String> parts = new ArrayList<>();
if (value == null) {
@ -274,30 +273,6 @@ public class ConfigurationControlManager {
return parts;
}
boolean isSplittable(ConfigResource.Type type, String key) {
ConfigDef configDef = configDefs.get(type);
if (configDef == null) {
return false;
}
ConfigKey configKey = configDef.configKeys().get(key);
if (configKey == null) {
return false;
}
return configKey.type == ConfigDef.Type.LIST;
}
String getConfigValueDefault(ConfigResource.Type type, String key) {
ConfigDef configDef = configDefs.get(type);
if (configDef == null) {
return null;
}
ConfigKey configKey = configDef.configKeys().get(key);
if (configKey == null || !configKey.hasDefault()) {
return null;
}
return ConfigDef.convertToString(configKey.defaultValue, configKey.type);
}
/**
* Apply a configuration record to the in-memory state.
*

View File

@ -21,7 +21,6 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
@ -68,6 +67,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.SnapshotGenerator.Section;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
@ -140,7 +140,7 @@ public final class QuorumController implements Controller {
private Time time = Time.SYSTEM;
private String threadNamePrefix = null;
private LogContext logContext = null;
private Map<ConfigResource.Type, ConfigDef> configDefs = Collections.emptyMap();
private KafkaConfigSchema configSchema = new KafkaConfigSchema(Collections.emptyMap());
private RaftClient<ApiMessageAndVersion> raftClient = null;
private Map<String, VersionRange> supportedFeatures = Collections.emptyMap();
private short defaultReplicationFactor = 3;
@ -174,8 +174,8 @@ public final class QuorumController implements Controller {
return this;
}
public Builder setConfigDefs(Map<ConfigResource.Type, ConfigDef> configDefs) {
this.configDefs = configDefs;
public Builder setConfigSchema(KafkaConfigSchema configSchema) {
this.configSchema = configSchema;
return this;
}
@ -258,7 +258,7 @@ public final class QuorumController implements Controller {
try {
queue = new KafkaEventQueue(time, logContext, threadNamePrefix + "QuorumController");
return new QuorumController(logContext, nodeId, clusterId, queue, time,
configDefs, raftClient, supportedFeatures, defaultReplicationFactor,
configSchema, raftClient, supportedFeatures, defaultReplicationFactor,
defaultNumPartitions, replicaPlacer, snapshotMaxNewRecordBytes,
sessionTimeoutNs, controllerMetrics, createTopicPolicy,
alterConfigPolicy, configurationValidator, authorizer);
@ -1202,7 +1202,7 @@ public final class QuorumController implements Controller {
String clusterId,
KafkaEventQueue queue,
Time time,
Map<ConfigResource.Type, ConfigDef> configDefs,
KafkaConfigSchema configSchema,
RaftClient<ApiMessageAndVersion> raftClient,
Map<String, VersionRange> supportedFeatures,
short defaultReplicationFactor,
@ -1226,7 +1226,7 @@ public final class QuorumController implements Controller {
this.purgatory = new ControllerPurgatory();
this.resourceExists = new ConfigResourceExistenceChecker();
this.configurationControl = new ConfigurationControlManager(logContext,
snapshotRegistry, configDefs, alterConfigPolicy, configurationValidator);
snapshotRegistry, configSchema, alterConfigPolicy, configurationValidator);
this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
this.clusterControl = new ClusterControlManager(logContext, clusterId, time,
snapshotRegistry, sessionTimeoutNs, replicaPlacer, controllerMetrics);

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ConfigRecord;
import java.util.Map;
import static java.util.Collections.emptyMap;
/**
* Tracks information about the schema of configuration keys for brokers, topics, and other
* resources. Since this class does not depend on core, it is useful in the controller for
* determining the type of config keys (string, int, password, etc.)
*/
public class KafkaConfigSchema {
public static final KafkaConfigSchema EMPTY = new KafkaConfigSchema(emptyMap());
private final Map<ConfigResource.Type, ConfigDef> configDefs;
public KafkaConfigSchema(Map<ConfigResource.Type, ConfigDef> configDefs) {
this.configDefs = configDefs;
}
/**
* Returns true if the configuration key specified is splittable (only lists are splittable.)
*/
public boolean isSplittable(ConfigResource.Type type, String key) {
ConfigDef configDef = configDefs.get(type);
if (configDef == null) return false;
ConfigDef.ConfigKey configKey = configDef.configKeys().get(key);
if (configKey == null) return false;
return configKey.type == ConfigDef.Type.LIST;
}
/**
* Returns true if the configuration key specified in this ConfigRecord is sensitive, or if
* we don't know whether it is sensitive.
*/
public boolean isSensitive(ConfigRecord record) {
ConfigResource.Type type = ConfigResource.Type.forId(record.resourceType());
return isSensitive(type, record.name());
}
/**
* Returns true if the configuration key specified is sensitive, or if we don't know whether
* it is sensitive.
*/
public boolean isSensitive(ConfigResource.Type type, String key) {
ConfigDef configDef = configDefs.get(type);
if (configDef == null) return true;
ConfigDef.ConfigKey configKey = configDef.configKeys().get(key);
if (configKey == null) return true;
return configKey.type.isSensitive();
}
/**
* Get the default value of the configuration key, or null if no default is specified.
*/
public String getDefault(ConfigResource.Type type, String key) {
ConfigDef configDef = configDefs.get(type);
if (configDef == null) return null;
ConfigDef.ConfigKey configKey = configDef.configKeys().get(key);
if (configKey == null || !configKey.hasDefault()) {
return null;
}
return ConfigDef.convertToString(configKey.defaultValue, configKey.type);
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.timeline;
import java.util.Iterator;
import java.util.Objects;
/**
* This is a mutable reference to an immutable object. It can be snapshotted.
*
* This class requires external synchronization.
*/
public class TimelineObject<T> implements Revertable {
static class ObjectContainer<T> implements Delta {
private T value;
ObjectContainer(T initialValue) {
this.value = initialValue;
}
T value() {
return value;
}
void setValue(T value) {
this.value = value;
}
@Override
public void mergeFrom(long destinationEpoch, Delta delta) {
// Nothing to do
}
}
private final SnapshotRegistry snapshotRegistry;
private final T initialValue;
private T value;
public TimelineObject(SnapshotRegistry snapshotRegistry, T initialValue) {
Objects.requireNonNull(initialValue);
this.snapshotRegistry = snapshotRegistry;
this.initialValue = initialValue;
this.value = initialValue;
snapshotRegistry.register(this);
}
public T get() {
return value;
}
public T get(long epoch) {
if (epoch == SnapshotRegistry.LATEST_EPOCH) return value;
Iterator<Snapshot> iterator = snapshotRegistry.iterator(epoch);
while (iterator.hasNext()) {
Snapshot snapshot = iterator.next();
ObjectContainer<T> container = snapshot.getDelta(TimelineObject.this);
if (container != null) return container.value();
}
return value;
}
public void set(T newValue) {
Objects.requireNonNull(newValue);
Iterator<Snapshot> iterator = snapshotRegistry.reverseIterator();
if (iterator.hasNext()) {
Snapshot snapshot = iterator.next();
ObjectContainer<T> prevContainer = snapshot.getDelta(TimelineObject.this);
if (prevContainer == null) {
prevContainer = new ObjectContainer<>(initialValue);
snapshot.setDelta(TimelineObject.this, prevContainer);
prevContainer.setValue(value);
}
}
this.value = newValue;
}
@SuppressWarnings("unchecked")
@Override
public void executeRevert(long targetEpoch, Delta delta) {
ObjectContainer<T> container = (ObjectContainer<T>) delta;
this.value = container.value();
}
@Override
public void reset() {
set(initialValue);
}
@Override
public int hashCode() {
return value.hashCode();
}
@Override
public boolean equals(Object o) {
if (!(o instanceof TimelineObject)) return false;
TimelineObject other = (TimelineObject) o;
return value.equals(other.value);
}
@Override
public String toString() {
return value.toString();
}
}

View File

@ -24,6 +24,7 @@ import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.policy.AlterConfigPolicy;
@ -52,8 +53,6 @@ import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static org.apache.kafka.controller.ConfigurationControlManager.NO_OP_EXISTENCE_CHECKER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
@ -72,6 +71,8 @@ public class ConfigurationControlManagerTest {
define("ghi", ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.HIGH, "ghi"));
}
static final KafkaConfigSchema SCHEMA = new KafkaConfigSchema(CONFIGS);
static final ConfigResource BROKER0 = new ConfigResource(BROKER, "0");
static final ConfigResource MYTOPIC = new ConfigResource(TOPIC, "mytopic");
@ -92,7 +93,7 @@ public class ConfigurationControlManagerTest {
public void testReplay() throws Exception {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ConfigurationControlManager manager =
new ConfigurationControlManager(new LogContext(), snapshotRegistry, CONFIGS,
new ConfigurationControlManager(new LogContext(), snapshotRegistry, SCHEMA,
Optional.empty(), ConfigurationValidator.NO_OP);
assertEquals(Collections.emptyMap(), manager.getConfigs(BROKER0));
manager.replay(new ConfigRecord().
@ -126,7 +127,7 @@ public class ConfigurationControlManagerTest {
public void testIncrementalAlterConfigs() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ConfigurationControlManager manager =
new ConfigurationControlManager(new LogContext(), snapshotRegistry, CONFIGS,
new ConfigurationControlManager(new LogContext(), snapshotRegistry, SCHEMA,
Optional.empty(), ConfigurationValidator.NO_OP);
ControllerResult<Map<ConfigResource, ApiError>> result = manager.
@ -195,7 +196,7 @@ public class ConfigurationControlManagerTest {
new RequestMetadata(BROKER0, toMap(entry("foo.bar", "123"),
entry("quux", "456")))));
ConfigurationControlManager manager = new ConfigurationControlManager(
new LogContext(), snapshotRegistry, CONFIGS, Optional.of(policy),
new LogContext(), snapshotRegistry, SCHEMA, Optional.of(policy),
ConfigurationValidator.NO_OP);
assertEquals(ControllerResult.atomicOf(asList(new ApiMessageAndVersion(
@ -217,36 +218,11 @@ public class ConfigurationControlManagerTest {
NO_OP_EXISTENCE_CHECKER));
}
@Test
public void testIsSplittable() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ConfigurationControlManager manager =
new ConfigurationControlManager(new LogContext(), snapshotRegistry, CONFIGS,
Optional.empty(), ConfigurationValidator.NO_OP);
assertTrue(manager.isSplittable(BROKER, "foo.bar"));
assertFalse(manager.isSplittable(BROKER, "baz"));
assertFalse(manager.isSplittable(BROKER, "foo.baz.quux"));
assertFalse(manager.isSplittable(TOPIC, "baz"));
assertTrue(manager.isSplittable(TOPIC, "abc"));
}
@Test
public void testGetConfigValueDefault() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ConfigurationControlManager manager =
new ConfigurationControlManager(new LogContext(), snapshotRegistry, CONFIGS,
Optional.empty(), ConfigurationValidator.NO_OP);
assertEquals("1", manager.getConfigValueDefault(BROKER, "foo.bar"));
assertEquals(null, manager.getConfigValueDefault(BROKER, "foo.baz.quux"));
assertEquals(null, manager.getConfigValueDefault(TOPIC, "abc"));
assertEquals("true", manager.getConfigValueDefault(TOPIC, "ghi"));
}
@Test
public void testLegacyAlterConfigs() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ConfigurationControlManager manager =
new ConfigurationControlManager(new LogContext(), snapshotRegistry, CONFIGS,
new ConfigurationControlManager(new LogContext(), snapshotRegistry, SCHEMA,
Optional.empty(), ConfigurationValidator.NO_OP);
List<ApiMessageAndVersion> expectedRecords1 = asList(
new ApiMessageAndVersion(new ConfigRecord().

View File

@ -92,7 +92,7 @@ import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER0;
import static org.apache.kafka.controller.ConfigurationControlManagerTest.CONFIGS;
import static org.apache.kafka.controller.ConfigurationControlManagerTest.SCHEMA;
import static org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -127,7 +127,9 @@ public class QuorumControllerTest {
public void testConfigurationOperations() throws Throwable {
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
})
) {
controlEnv.activeController().registerBroker(new BrokerRegistrationRequestData().
setBrokerId(0).setClusterId(logEnv.clusterId())).get();
@ -160,7 +162,9 @@ public class QuorumControllerTest {
public void testDelayedConfigurationOperations() throws Throwable {
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
})
) {
controlEnv.activeController().registerBroker(new BrokerRegistrationRequestData().
setBrokerId(0).setClusterId(logEnv.clusterId())).get();
@ -194,8 +198,9 @@ public class QuorumControllerTest {
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(
logEnv, b -> b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutMillis));
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
}, Optional.of(sessionTimeoutMillis));
) {
ListenerCollection listeners = new ListenerCollection();
listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
@ -270,8 +275,9 @@ public class QuorumControllerTest {
@Test
public void testUnregisterBroker() throws Throwable {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
})) {
ListenerCollection listeners = new ListenerCollection();
listeners.add(new Listener().setName("PLAINTEXT").
setHost("localhost").setPort(9092));
@ -327,8 +333,9 @@ public class QuorumControllerTest {
RawSnapshotReader reader = null;
Uuid fooId;
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
})) {
QuorumController active = controlEnv.activeController();
for (int i = 0; i < numBrokers; i++) {
BrokerRegistrationReply reply = active.registerBroker(
@ -373,8 +380,9 @@ public class QuorumControllerTest {
}
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.of(reader))) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
})) {
QuorumController active = controlEnv.activeController();
long snapshotLogOffset = active.beginWritingSnapshot().get();
SnapshotReader<ApiMessageAndVersion> snapshot = createSnapshotReader(
@ -393,14 +401,10 @@ public class QuorumControllerTest {
Map<Integer, Long> brokerEpochs = new HashMap<>();
Uuid fooId;
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv,
builder -> {
builder
.setConfigDefs(CONFIGS)
.setSnapshotMaxNewRecordBytes(maxNewRecordBytes);
})
) {
try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
b.setSnapshotMaxNewRecordBytes(maxNewRecordBytes);
})) {
QuorumController active = controlEnv.activeController();
for (int i = 0; i < numBrokers; i++) {
BrokerRegistrationReply reply = active.registerBroker(
@ -452,10 +456,10 @@ public class QuorumControllerTest {
final int maxNewRecordBytes = 1000;
Map<Integer, Long> brokerEpochs = new HashMap<>();
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv,
builder -> builder.setConfigDefs(CONFIGS).
setSnapshotMaxNewRecordBytes(maxNewRecordBytes))
) {
try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
b.setSnapshotMaxNewRecordBytes(maxNewRecordBytes);
})) {
QuorumController active = controlEnv.activeController();
for (int i = 0; i < numBrokers; i++) {
BrokerRegistrationReply reply = active.registerBroker(
@ -623,8 +627,9 @@ public class QuorumControllerTest {
@Test
public void testTimeouts() throws Throwable {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
})) {
QuorumController controller = controlEnv.activeController();
CountDownLatch countDownLatch = controller.pause();
CompletableFuture<CreateTopicsResponseData> createFuture =
@ -679,8 +684,9 @@ public class QuorumControllerTest {
@Test
public void testEarlyControllerResults() throws Throwable {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
})) {
QuorumController controller = controlEnv.activeController();
CountDownLatch countDownLatch = controller.pause();
CompletableFuture<CreateTopicsResponseData> createFuture =
@ -717,11 +723,10 @@ public class QuorumControllerTest {
int numPartitions = 3;
String topicName = "topic-name";
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))
) {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
})) {
QuorumController controller = controlEnv.activeController();
Map<Integer, Long> brokerEpochs = registerBrokers(controller, numBrokers);
@ -878,8 +883,9 @@ public class QuorumControllerTest {
@Test
public void testConfigResourceExistenceChecker() throws Throwable {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
})) {
QuorumController active = controlEnv.activeController();
registerBrokers(active, 5);
active.createTopics(new CreateTopicsRequestData().

View File

@ -67,6 +67,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.Replicas;
@ -142,7 +143,7 @@ public class ReplicationControlManagerTest {
new StripedReplicaPlacer(random),
metrics);
final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
new LogContext(), snapshotRegistry, Collections.emptyMap(), Optional.empty(),
new LogContext(), snapshotRegistry, KafkaConfigSchema.EMPTY, Optional.empty(),
(__, ___) -> { });
final ReplicationControlManager replicationControl;

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class KafkaConfigSchemaTest {
public static final Map<ConfigResource.Type, ConfigDef> CONFIGS = new HashMap<>();
static {
CONFIGS.put(BROKER, new ConfigDef().
define("foo.bar", ConfigDef.Type.LIST, "1", ConfigDef.Importance.HIGH, "foo bar").
define("baz", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "baz").
define("quux", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, "quux").
define("quuux", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux"));
CONFIGS.put(TOPIC, new ConfigDef().
define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc").
define("def", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "def").
define("ghi", ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.HIGH, "ghi").
define("xyz", ConfigDef.Type.PASSWORD, "thedefault", ConfigDef.Importance.HIGH, "xyz"));
}
@Test
public void testIsSplittable() {
KafkaConfigSchema schema = new KafkaConfigSchema(CONFIGS);
assertTrue(schema.isSplittable(BROKER, "foo.bar"));
assertFalse(schema.isSplittable(BROKER, "baz"));
assertFalse(schema.isSplittable(BROKER, "foo.baz.quux"));
assertFalse(schema.isSplittable(TOPIC, "baz"));
assertTrue(schema.isSplittable(TOPIC, "abc"));
}
@Test
public void testGetConfigValueDefault() {
KafkaConfigSchema schema = new KafkaConfigSchema(CONFIGS);
assertEquals("1", schema.getDefault(BROKER, "foo.bar"));
assertEquals(null, schema.getDefault(BROKER, "foo.baz.quux"));
assertEquals(null, schema.getDefault(TOPIC, "abc"));
assertEquals("true", schema.getDefault(TOPIC, "ghi"));
}
@Test
public void testIsSensitive() {
KafkaConfigSchema schema = new KafkaConfigSchema(CONFIGS);
assertFalse(schema.isSensitive(BROKER, "foo.bar"));
assertTrue(schema.isSensitive(BROKER, "quuux"));
assertTrue(schema.isSensitive(BROKER, "unknown.config.key"));
assertFalse(schema.isSensitive(TOPIC, "abc"));
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.timeline;
import java.util.Collections;
import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(value = 40)
public class TimelineObjectTest {
@Test
public void testModifyValue() {
SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
TimelineObject<String> object = new TimelineObject<>(registry, "default");
assertEquals("default", object.get());
assertEquals("default", object.get(Long.MAX_VALUE));
object.set("1");
object.set("2");
assertEquals("2", object.get());
assertEquals("2", object.get(Long.MAX_VALUE));
}
@Test
public void testToStringAndEquals() {
SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
TimelineObject<String> object = new TimelineObject<>(registry, "");
assertEquals("", object.toString());
object.set("a");
TimelineObject<String> object2 = new TimelineObject<>(registry, "");
object2.set("a");
assertEquals("a", object2.toString());
assertEquals(object, object2);
}
@Test
public void testSnapshot() {
SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
TimelineObject<String> object = new TimelineObject<>(registry, "1000");
registry.getOrCreateSnapshot(2);
object.set("1001");
registry.getOrCreateSnapshot(3);
object.set("1002");
object.set("1003");
object.set("1002");
registry.getOrCreateSnapshot(4);
assertEquals("1000", object.get(2));
assertEquals("1001", object.get(3));
assertEquals("1002", object.get(4));
registry.revertToSnapshot(3);
assertEquals("1001", object.get());
registry.revertToSnapshot(2);
assertEquals("1000", object.get());
}
@Test
public void testReset() {
SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
TimelineObject<String> value = new TimelineObject<>(registry, "<default>");
registry.getOrCreateSnapshot(2);
value.set("first value");
registry.getOrCreateSnapshot(3);
value.set("second value");
registry.reset();
assertEquals(Collections.emptyList(), registry.epochsList());
assertEquals("<default>", value.get());
}
}