From bda5c34b030207f542c7987a5e0f9bcb23406c18 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Tue, 15 Mar 2022 12:50:53 -0700 Subject: [PATCH] MINOR: refactor how ConfigurationControl checks for resource existence (#11835) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ConfigurationControl methods should take a boolean indicating whether the resource is newly created, rather than taking an existence checker object. The boolean is easier to understand. Also add a unit test of existing checking failing (and succeeding). Reviewers: Kirk True , José Armando García Sancio --- .../ConfigurationControlManager.java | 27 ++++--- .../kafka/controller/QuorumController.java | 10 ++- .../controller/ReplicationControlManager.java | 3 +- .../ConfigurationControlManagerTest.java | 81 ++++++++++++++----- .../ReplicationControlManagerTest.java | 8 +- 5 files changed, 88 insertions(+), 41 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index f9caf2bb04c..a16361343b2 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -51,11 +51,10 @@ import static org.apache.kafka.common.protocol.Errors.INVALID_CONFIG; public class ConfigurationControlManager { - final static Consumer NO_OP_EXISTENCE_CHECKER = __ -> { }; - private final Logger log; private final SnapshotRegistry snapshotRegistry; private final KafkaConfigSchema configSchema; + private final Consumer existenceChecker; private final Optional alterConfigPolicy; private final ConfigurationValidator validator; private final TimelineHashMap> configData; @@ -63,11 +62,13 @@ public class ConfigurationControlManager { ConfigurationControlManager(LogContext logContext, SnapshotRegistry snapshotRegistry, KafkaConfigSchema configSchema, + Consumer existenceChecker, Optional alterConfigPolicy, ConfigurationValidator validator) { this.log = logContext.logger(ConfigurationControlManager.class); this.snapshotRegistry = snapshotRegistry; this.configSchema = configSchema; + this.existenceChecker = existenceChecker; this.alterConfigPolicy = alterConfigPolicy; this.validator = validator; this.configData = new TimelineHashMap<>(snapshotRegistry, 0); @@ -88,14 +89,14 @@ public class ConfigurationControlManager { */ ControllerResult> incrementalAlterConfigs( Map>> configChanges, - Consumer existenceChecker) { + boolean newlyCreatedResource) { List outputRecords = new ArrayList<>(); Map outputResults = new HashMap<>(); for (Entry>> resourceEntry : configChanges.entrySet()) { incrementalAlterConfigResource(resourceEntry.getKey(), resourceEntry.getValue(), - existenceChecker, + newlyCreatedResource, outputRecords, outputResults); } @@ -104,7 +105,7 @@ public class ConfigurationControlManager { private void incrementalAlterConfigResource(ConfigResource configResource, Map> keysToOps, - Consumer existenceChecker, + boolean newlyCreatedResource, List outputRecords, Map outputResults) { List newRecords = new ArrayList<>(); @@ -153,7 +154,7 @@ public class ConfigurationControlManager { setValue(newValue), CONFIG_RECORD.highestSupportedVersion())); } } - ApiError error = validateAlterConfig(configResource, newRecords, existenceChecker); + ApiError error = validateAlterConfig(configResource, newRecords, newlyCreatedResource); if (error.isFailure()) { outputResults.put(configResource, error); return; @@ -164,7 +165,7 @@ public class ConfigurationControlManager { private ApiError validateAlterConfig(ConfigResource configResource, List newRecords, - Consumer existenceChecker) { + boolean newlyCreatedResource) { Map newConfigs = new HashMap<>(); TimelineHashMap existingConfigs = configData.get(configResource); if (existingConfigs != null) newConfigs.putAll(existingConfigs); @@ -178,7 +179,9 @@ public class ConfigurationControlManager { } try { validator.validate(configResource, newConfigs); - existenceChecker.accept(configResource); + if (!newlyCreatedResource) { + existenceChecker.accept(configResource); + } if (alterConfigPolicy.isPresent()) { alterConfigPolicy.get().validate(new RequestMetadata(configResource, newConfigs)); } @@ -201,7 +204,7 @@ public class ConfigurationControlManager { */ ControllerResult> legacyAlterConfigs( Map> newConfigs, - Consumer existenceChecker + boolean newlyCreatedResource ) { List outputRecords = new ArrayList<>(); Map outputResults = new HashMap<>(); @@ -209,7 +212,7 @@ public class ConfigurationControlManager { newConfigs.entrySet()) { legacyAlterConfigResource(resourceEntry.getKey(), resourceEntry.getValue(), - existenceChecker, + newlyCreatedResource, outputRecords, outputResults); } @@ -218,7 +221,7 @@ public class ConfigurationControlManager { private void legacyAlterConfigResource(ConfigResource configResource, Map newConfigs, - Consumer existenceChecker, + boolean newlyCreatedResource, List outputRecords, Map outputResults) { List newRecords = new ArrayList<>(); @@ -247,7 +250,7 @@ public class ConfigurationControlManager { setValue(null), CONFIG_RECORD.highestSupportedVersion())); } } - ApiError error = validateAlterConfig(configResource, newRecords, existenceChecker); + ApiError error = validateAlterConfig(configResource, newRecords, newlyCreatedResource); if (error.isFailure()) { outputResults.put(configResource, error); return; diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 11594ed77d2..5b52c2d2fd4 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1226,7 +1226,11 @@ public final class QuorumController implements Controller { this.purgatory = new ControllerPurgatory(); this.resourceExists = new ConfigResourceExistenceChecker(); this.configurationControl = new ConfigurationControlManager(logContext, - snapshotRegistry, configSchema, alterConfigPolicy, configurationValidator); + snapshotRegistry, + configSchema, + resourceExists, + alterConfigPolicy, + configurationValidator); this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry); this.clusterControl = new ClusterControlManager(logContext, clusterId, time, snapshotRegistry, sessionTimeoutNs, replicaPlacer, controllerMetrics); @@ -1340,7 +1344,7 @@ public final class QuorumController implements Controller { } return appendWriteEvent("incrementalAlterConfigs", () -> { ControllerResult> result = - configurationControl.incrementalAlterConfigs(configChanges, resourceExists); + configurationControl.incrementalAlterConfigs(configChanges, false); if (validateOnly) { return result.withoutRecords(); } else { @@ -1380,7 +1384,7 @@ public final class QuorumController implements Controller { } return appendWriteEvent("legacyAlterConfigs", () -> { ControllerResult> result = - configurationControl.legacyAlterConfigs(newConfigs, resourceExists); + configurationControl.legacyAlterConfigs(newConfigs, false); if (validateOnly) { return result.withoutRecords(); } else { diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index ad9e3c7ecc9..c492ed9af07 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -114,7 +114,6 @@ import static org.apache.kafka.common.protocol.Errors.INVALID_UPDATE_VERSION; import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION; -import static org.apache.kafka.controller.ConfigurationControlManager.NO_OP_EXISTENCE_CHECKER; import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; @@ -378,7 +377,7 @@ public class ReplicationControlManager { Map>> configChanges = computeConfigChanges(topicErrors, request.topics()); ControllerResult> configResult = - configurationControl.incrementalAlterConfigs(configChanges, NO_OP_EXISTENCE_CHECKER); + configurationControl.incrementalAlterConfigs(configChanges, true); for (Entry entry : configResult.response().entrySet()) { if (entry.getValue().isFailure()) { topicErrors.put(entry.getKey().name(), entry.getValue()); diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index 60bcb8299ff..6a71aba1e67 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -20,6 +20,7 @@ package org.apache.kafka.controller; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.PolicyViolationException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; @@ -40,6 +41,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -51,7 +53,6 @@ import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SUBTRACT; import static org.apache.kafka.common.config.ConfigResource.Type.BROKER; import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; -import static org.apache.kafka.controller.ConfigurationControlManager.NO_OP_EXISTENCE_CHECKER; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -76,6 +77,17 @@ public class ConfigurationControlManagerTest { static final ConfigResource BROKER0 = new ConfigResource(BROKER, "0"); static final ConfigResource MYTOPIC = new ConfigResource(TOPIC, "mytopic"); + static class TestExistenceChecker implements Consumer { + static final TestExistenceChecker INSTANCE = new TestExistenceChecker(); + + @Override + public void accept(ConfigResource resource) { + if (!resource.name().startsWith("Existing")) { + throw new UnknownTopicOrPartitionException("Unknown resource."); + } + } + } + @SuppressWarnings("unchecked") private static Map toMap(Entry... entries) { Map map = new LinkedHashMap<>(); @@ -89,12 +101,26 @@ public class ConfigurationControlManagerTest { return new SimpleImmutableEntry<>(a, b); } + static ConfigurationControlManager newConfigurationControlManager() { + return newConfigurationControlManager(Optional.empty()); + } + + static ConfigurationControlManager newConfigurationControlManager( + Optional alterConfigPolicy + ) { + LogContext logContext = new LogContext(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + return new ConfigurationControlManager(logContext, + snapshotRegistry, + SCHEMA, + TestExistenceChecker.INSTANCE, + alterConfigPolicy, + ConfigurationValidator.NO_OP); + } + @Test public void testReplay() throws Exception { - SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - ConfigurationControlManager manager = - new ConfigurationControlManager(new LogContext(), snapshotRegistry, SCHEMA, - Optional.empty(), ConfigurationValidator.NO_OP); + ConfigurationControlManager manager = newConfigurationControlManager(); assertEquals(Collections.emptyMap(), manager.getConfigs(BROKER0)); manager.replay(new ConfigRecord(). setResourceType(BROKER.id()).setResourceName("0"). @@ -125,17 +151,14 @@ public class ConfigurationControlManagerTest { @Test public void testIncrementalAlterConfigs() { - SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - ConfigurationControlManager manager = - new ConfigurationControlManager(new LogContext(), snapshotRegistry, SCHEMA, - Optional.empty(), ConfigurationValidator.NO_OP); + ConfigurationControlManager manager = newConfigurationControlManager(); ControllerResult> result = manager. incrementalAlterConfigs(toMap(entry(BROKER0, toMap( entry("baz", entry(SUBTRACT, "abc")), entry("quux", entry(SET, "abc")))), entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123"))))), - NO_OP_EXISTENCE_CHECKER); + true); assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion( new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic"). @@ -152,7 +175,26 @@ public class ConfigurationControlManagerTest { toMap(entry(MYTOPIC, ApiError.NONE))), manager.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap( entry("abc", entry(DELETE, "xyz"))))), - NO_OP_EXISTENCE_CHECKER)); + true)); + } + + @Test + public void testIncrementalAlterConfigsWithoutExistence() { + ConfigurationControlManager manager = newConfigurationControlManager(); + ConfigResource existingTopic = new ConfigResource(TOPIC, "ExistingTopic"); + + ControllerResult> result = manager. + incrementalAlterConfigs(toMap(entry(BROKER0, toMap( + entry("quux", entry(SET, "1")))), + entry(existingTopic, toMap(entry("def", entry(SET, "newVal"))))), + false); + + assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion( + new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("ExistingTopic"). + setName("def").setValue("newVal"), (short) 0)), + toMap(entry(BROKER0, new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, + "Unknown resource.")), + entry(existingTopic, ApiError.NONE))), result); } private static class MockAlterConfigsPolicy implements AlterConfigPolicy { @@ -190,14 +232,12 @@ public class ConfigurationControlManagerTest { @Test public void testIncrementalAlterConfigsWithPolicy() { - SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); MockAlterConfigsPolicy policy = new MockAlterConfigsPolicy(asList( new RequestMetadata(MYTOPIC, Collections.emptyMap()), new RequestMetadata(BROKER0, toMap(entry("foo.bar", "123"), entry("quux", "456"))))); - ConfigurationControlManager manager = new ConfigurationControlManager( - new LogContext(), snapshotRegistry, SCHEMA, Optional.of(policy), - ConfigurationValidator.NO_OP); + + ConfigurationControlManager manager = newConfigurationControlManager(Optional.of(policy)); assertEquals(ControllerResult.atomicOf(asList(new ApiMessageAndVersion( new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0"). @@ -215,15 +255,12 @@ public class ConfigurationControlManagerTest { entry(BROKER0, toMap( entry("foo.bar", entry(SET, "123")), entry("quux", entry(SET, "456"))))), - NO_OP_EXISTENCE_CHECKER)); + true)); } @Test public void testLegacyAlterConfigs() { - SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - ConfigurationControlManager manager = - new ConfigurationControlManager(new LogContext(), snapshotRegistry, SCHEMA, - Optional.empty(), ConfigurationValidator.NO_OP); + ConfigurationControlManager manager = newConfigurationControlManager(); List expectedRecords1 = asList( new ApiMessageAndVersion(new ConfigRecord(). setResourceType(TOPIC.id()).setResourceName("mytopic"). @@ -235,7 +272,7 @@ public class ConfigurationControlManagerTest { expectedRecords1, toMap(entry(MYTOPIC, ApiError.NONE))), manager.legacyAlterConfigs( toMap(entry(MYTOPIC, toMap(entry("abc", "456"), entry("def", "901")))), - NO_OP_EXISTENCE_CHECKER)); + true)); for (ApiMessageAndVersion message : expectedRecords1) { manager.replay((ConfigRecord) message.message()); } @@ -249,6 +286,6 @@ public class ConfigurationControlManagerTest { (short) 0)), toMap(entry(MYTOPIC, ApiError.NONE))), manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def", "901")))), - NO_OP_EXISTENCE_CHECKER)); + true)); } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 4a08b237b0c..a6c8c1e6617 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -143,8 +143,12 @@ public class ReplicationControlManagerTest { new StripedReplicaPlacer(random), metrics); final ConfigurationControlManager configurationControl = new ConfigurationControlManager( - new LogContext(), snapshotRegistry, KafkaConfigSchema.EMPTY, Optional.empty(), - (__, ___) -> { }); + new LogContext(), + snapshotRegistry, + KafkaConfigSchema.EMPTY, + __ -> { }, + Optional.empty(), + (__, ___) -> { }); final ReplicationControlManager replicationControl; void replay(List records) throws Exception {