MINOR: refactor how ConfigurationControl checks for resource existence (#11835)

ConfigurationControl methods should take a boolean indicating whether the resource is newly
created, rather than taking an existence checker object. The boolean is easier to understand. Also
add a unit test of existing checking failing (and succeeding).

Reviewers: Kirk True <kirk@mustardgrain.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
This commit is contained in:
Colin Patrick McCabe 2022-03-15 12:50:53 -07:00 committed by GitHub
parent 76d287c967
commit bda5c34b03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 88 additions and 41 deletions

View File

@ -51,11 +51,10 @@ import static org.apache.kafka.common.protocol.Errors.INVALID_CONFIG;
public class ConfigurationControlManager {
final static Consumer<ConfigResource> NO_OP_EXISTENCE_CHECKER = __ -> { };
private final Logger log;
private final SnapshotRegistry snapshotRegistry;
private final KafkaConfigSchema configSchema;
private final Consumer<ConfigResource> existenceChecker;
private final Optional<AlterConfigPolicy> alterConfigPolicy;
private final ConfigurationValidator validator;
private final TimelineHashMap<ConfigResource, TimelineHashMap<String, String>> configData;
@ -63,11 +62,13 @@ public class ConfigurationControlManager {
ConfigurationControlManager(LogContext logContext,
SnapshotRegistry snapshotRegistry,
KafkaConfigSchema configSchema,
Consumer<ConfigResource> existenceChecker,
Optional<AlterConfigPolicy> alterConfigPolicy,
ConfigurationValidator validator) {
this.log = logContext.logger(ConfigurationControlManager.class);
this.snapshotRegistry = snapshotRegistry;
this.configSchema = configSchema;
this.existenceChecker = existenceChecker;
this.alterConfigPolicy = alterConfigPolicy;
this.validator = validator;
this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
@ -88,14 +89,14 @@ public class ConfigurationControlManager {
*/
ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges,
Consumer<ConfigResource> existenceChecker) {
boolean newlyCreatedResource) {
List<ApiMessageAndVersion> outputRecords = new ArrayList<>();
Map<ConfigResource, ApiError> outputResults = new HashMap<>();
for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> resourceEntry :
configChanges.entrySet()) {
incrementalAlterConfigResource(resourceEntry.getKey(),
resourceEntry.getValue(),
existenceChecker,
newlyCreatedResource,
outputRecords,
outputResults);
}
@ -104,7 +105,7 @@ public class ConfigurationControlManager {
private void incrementalAlterConfigResource(ConfigResource configResource,
Map<String, Entry<OpType, String>> keysToOps,
Consumer<ConfigResource> existenceChecker,
boolean newlyCreatedResource,
List<ApiMessageAndVersion> outputRecords,
Map<ConfigResource, ApiError> outputResults) {
List<ApiMessageAndVersion> newRecords = new ArrayList<>();
@ -153,7 +154,7 @@ public class ConfigurationControlManager {
setValue(newValue), CONFIG_RECORD.highestSupportedVersion()));
}
}
ApiError error = validateAlterConfig(configResource, newRecords, existenceChecker);
ApiError error = validateAlterConfig(configResource, newRecords, newlyCreatedResource);
if (error.isFailure()) {
outputResults.put(configResource, error);
return;
@ -164,7 +165,7 @@ public class ConfigurationControlManager {
private ApiError validateAlterConfig(ConfigResource configResource,
List<ApiMessageAndVersion> newRecords,
Consumer<ConfigResource> existenceChecker) {
boolean newlyCreatedResource) {
Map<String, String> newConfigs = new HashMap<>();
TimelineHashMap<String, String> existingConfigs = configData.get(configResource);
if (existingConfigs != null) newConfigs.putAll(existingConfigs);
@ -178,7 +179,9 @@ public class ConfigurationControlManager {
}
try {
validator.validate(configResource, newConfigs);
existenceChecker.accept(configResource);
if (!newlyCreatedResource) {
existenceChecker.accept(configResource);
}
if (alterConfigPolicy.isPresent()) {
alterConfigPolicy.get().validate(new RequestMetadata(configResource, newConfigs));
}
@ -201,7 +204,7 @@ public class ConfigurationControlManager {
*/
ControllerResult<Map<ConfigResource, ApiError>> legacyAlterConfigs(
Map<ConfigResource, Map<String, String>> newConfigs,
Consumer<ConfigResource> existenceChecker
boolean newlyCreatedResource
) {
List<ApiMessageAndVersion> outputRecords = new ArrayList<>();
Map<ConfigResource, ApiError> outputResults = new HashMap<>();
@ -209,7 +212,7 @@ public class ConfigurationControlManager {
newConfigs.entrySet()) {
legacyAlterConfigResource(resourceEntry.getKey(),
resourceEntry.getValue(),
existenceChecker,
newlyCreatedResource,
outputRecords,
outputResults);
}
@ -218,7 +221,7 @@ public class ConfigurationControlManager {
private void legacyAlterConfigResource(ConfigResource configResource,
Map<String, String> newConfigs,
Consumer<ConfigResource> existenceChecker,
boolean newlyCreatedResource,
List<ApiMessageAndVersion> outputRecords,
Map<ConfigResource, ApiError> outputResults) {
List<ApiMessageAndVersion> newRecords = new ArrayList<>();
@ -247,7 +250,7 @@ public class ConfigurationControlManager {
setValue(null), CONFIG_RECORD.highestSupportedVersion()));
}
}
ApiError error = validateAlterConfig(configResource, newRecords, existenceChecker);
ApiError error = validateAlterConfig(configResource, newRecords, newlyCreatedResource);
if (error.isFailure()) {
outputResults.put(configResource, error);
return;

View File

@ -1226,7 +1226,11 @@ public final class QuorumController implements Controller {
this.purgatory = new ControllerPurgatory();
this.resourceExists = new ConfigResourceExistenceChecker();
this.configurationControl = new ConfigurationControlManager(logContext,
snapshotRegistry, configSchema, alterConfigPolicy, configurationValidator);
snapshotRegistry,
configSchema,
resourceExists,
alterConfigPolicy,
configurationValidator);
this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
this.clusterControl = new ClusterControlManager(logContext, clusterId, time,
snapshotRegistry, sessionTimeoutNs, replicaPlacer, controllerMetrics);
@ -1340,7 +1344,7 @@ public final class QuorumController implements Controller {
}
return appendWriteEvent("incrementalAlterConfigs", () -> {
ControllerResult<Map<ConfigResource, ApiError>> result =
configurationControl.incrementalAlterConfigs(configChanges, resourceExists);
configurationControl.incrementalAlterConfigs(configChanges, false);
if (validateOnly) {
return result.withoutRecords();
} else {
@ -1380,7 +1384,7 @@ public final class QuorumController implements Controller {
}
return appendWriteEvent("legacyAlterConfigs", () -> {
ControllerResult<Map<ConfigResource, ApiError>> result =
configurationControl.legacyAlterConfigs(newConfigs, resourceExists);
configurationControl.legacyAlterConfigs(newConfigs, false);
if (validateOnly) {
return result.withoutRecords();
} else {

View File

@ -114,7 +114,6 @@ import static org.apache.kafka.common.protocol.Errors.INVALID_UPDATE_VERSION;
import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
import static org.apache.kafka.controller.ConfigurationControlManager.NO_OP_EXISTENCE_CHECKER;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
@ -378,7 +377,7 @@ public class ReplicationControlManager {
Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges =
computeConfigChanges(topicErrors, request.topics());
ControllerResult<Map<ConfigResource, ApiError>> configResult =
configurationControl.incrementalAlterConfigs(configChanges, NO_OP_EXISTENCE_CHECKER);
configurationControl.incrementalAlterConfigs(configChanges, true);
for (Entry<ConfigResource, ApiError> entry : configResult.response().entrySet()) {
if (entry.getValue().isFailure()) {
topicErrors.put(entry.getKey().name(), entry.getValue());

View File

@ -20,6 +20,7 @@ package org.apache.kafka.controller;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
@ -40,6 +41,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@ -51,7 +53,6 @@ import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SUBTRACT;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static org.apache.kafka.controller.ConfigurationControlManager.NO_OP_EXISTENCE_CHECKER;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -76,6 +77,17 @@ public class ConfigurationControlManagerTest {
static final ConfigResource BROKER0 = new ConfigResource(BROKER, "0");
static final ConfigResource MYTOPIC = new ConfigResource(TOPIC, "mytopic");
static class TestExistenceChecker implements Consumer<ConfigResource> {
static final TestExistenceChecker INSTANCE = new TestExistenceChecker();
@Override
public void accept(ConfigResource resource) {
if (!resource.name().startsWith("Existing")) {
throw new UnknownTopicOrPartitionException("Unknown resource.");
}
}
}
@SuppressWarnings("unchecked")
private static <A, B> Map<A, B> toMap(Entry... entries) {
Map<A, B> map = new LinkedHashMap<>();
@ -89,12 +101,26 @@ public class ConfigurationControlManagerTest {
return new SimpleImmutableEntry<>(a, b);
}
static ConfigurationControlManager newConfigurationControlManager() {
return newConfigurationControlManager(Optional.empty());
}
static ConfigurationControlManager newConfigurationControlManager(
Optional<AlterConfigPolicy> alterConfigPolicy
) {
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
return new ConfigurationControlManager(logContext,
snapshotRegistry,
SCHEMA,
TestExistenceChecker.INSTANCE,
alterConfigPolicy,
ConfigurationValidator.NO_OP);
}
@Test
public void testReplay() throws Exception {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ConfigurationControlManager manager =
new ConfigurationControlManager(new LogContext(), snapshotRegistry, SCHEMA,
Optional.empty(), ConfigurationValidator.NO_OP);
ConfigurationControlManager manager = newConfigurationControlManager();
assertEquals(Collections.emptyMap(), manager.getConfigs(BROKER0));
manager.replay(new ConfigRecord().
setResourceType(BROKER.id()).setResourceName("0").
@ -125,17 +151,14 @@ public class ConfigurationControlManagerTest {
@Test
public void testIncrementalAlterConfigs() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ConfigurationControlManager manager =
new ConfigurationControlManager(new LogContext(), snapshotRegistry, SCHEMA,
Optional.empty(), ConfigurationValidator.NO_OP);
ConfigurationControlManager manager = newConfigurationControlManager();
ControllerResult<Map<ConfigResource, ApiError>> result = manager.
incrementalAlterConfigs(toMap(entry(BROKER0, toMap(
entry("baz", entry(SUBTRACT, "abc")),
entry("quux", entry(SET, "abc")))),
entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123"))))),
NO_OP_EXISTENCE_CHECKER);
true);
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
@ -152,7 +175,26 @@ public class ConfigurationControlManagerTest {
toMap(entry(MYTOPIC, ApiError.NONE))),
manager.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(
entry("abc", entry(DELETE, "xyz"))))),
NO_OP_EXISTENCE_CHECKER));
true));
}
@Test
public void testIncrementalAlterConfigsWithoutExistence() {
ConfigurationControlManager manager = newConfigurationControlManager();
ConfigResource existingTopic = new ConfigResource(TOPIC, "ExistingTopic");
ControllerResult<Map<ConfigResource, ApiError>> result = manager.
incrementalAlterConfigs(toMap(entry(BROKER0, toMap(
entry("quux", entry(SET, "1")))),
entry(existingTopic, toMap(entry("def", entry(SET, "newVal"))))),
false);
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("ExistingTopic").
setName("def").setValue("newVal"), (short) 0)),
toMap(entry(BROKER0, new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION,
"Unknown resource.")),
entry(existingTopic, ApiError.NONE))), result);
}
private static class MockAlterConfigsPolicy implements AlterConfigPolicy {
@ -190,14 +232,12 @@ public class ConfigurationControlManagerTest {
@Test
public void testIncrementalAlterConfigsWithPolicy() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
MockAlterConfigsPolicy policy = new MockAlterConfigsPolicy(asList(
new RequestMetadata(MYTOPIC, Collections.emptyMap()),
new RequestMetadata(BROKER0, toMap(entry("foo.bar", "123"),
entry("quux", "456")))));
ConfigurationControlManager manager = new ConfigurationControlManager(
new LogContext(), snapshotRegistry, SCHEMA, Optional.of(policy),
ConfigurationValidator.NO_OP);
ConfigurationControlManager manager = newConfigurationControlManager(Optional.of(policy));
assertEquals(ControllerResult.atomicOf(asList(new ApiMessageAndVersion(
new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
@ -215,15 +255,12 @@ public class ConfigurationControlManagerTest {
entry(BROKER0, toMap(
entry("foo.bar", entry(SET, "123")),
entry("quux", entry(SET, "456"))))),
NO_OP_EXISTENCE_CHECKER));
true));
}
@Test
public void testLegacyAlterConfigs() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ConfigurationControlManager manager =
new ConfigurationControlManager(new LogContext(), snapshotRegistry, SCHEMA,
Optional.empty(), ConfigurationValidator.NO_OP);
ConfigurationControlManager manager = newConfigurationControlManager();
List<ApiMessageAndVersion> expectedRecords1 = asList(
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
@ -235,7 +272,7 @@ public class ConfigurationControlManagerTest {
expectedRecords1, toMap(entry(MYTOPIC, ApiError.NONE))),
manager.legacyAlterConfigs(
toMap(entry(MYTOPIC, toMap(entry("abc", "456"), entry("def", "901")))),
NO_OP_EXISTENCE_CHECKER));
true));
for (ApiMessageAndVersion message : expectedRecords1) {
manager.replay((ConfigRecord) message.message());
}
@ -249,6 +286,6 @@ public class ConfigurationControlManagerTest {
(short) 0)),
toMap(entry(MYTOPIC, ApiError.NONE))),
manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def", "901")))),
NO_OP_EXISTENCE_CHECKER));
true));
}
}

View File

@ -143,8 +143,12 @@ public class ReplicationControlManagerTest {
new StripedReplicaPlacer(random),
metrics);
final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
new LogContext(), snapshotRegistry, KafkaConfigSchema.EMPTY, Optional.empty(),
(__, ___) -> { });
new LogContext(),
snapshotRegistry,
KafkaConfigSchema.EMPTY,
__ -> { },
Optional.empty(),
(__, ___) -> { });
final ReplicationControlManager replicationControl;
void replay(List<ApiMessageAndVersion> records) throws Exception {