From 42a200bd3966f78d1ca9b6b6ab353bd09973a09a Mon Sep 17 00:00:00 2001 From: ClarkChen Date: Mon, 3 Mar 2025 03:23:01 +0800 Subject: [PATCH] KAFKA-18907 Add suitable error message when the appended value is too larger (#19070) In ZooKeeper mode, users can append configurations to create values larger than Short.MAX_VALUE. However, this behavior is disallowed in KRaft mode. Additionally, a server error is returned to users. Creating a value this large is rare, so we don't plan to fix it for KRaft. This PR aims to tweak the error message. Reviewers: Ken Huang , TengYao Chi , Chia-Ping Tsai --- .../kafka/controller/ConfigurationControlManager.java | 8 ++++++++ .../controller/ConfigurationControlManagerTest.java | 9 +++++++++ 2 files changed, 17 insertions(+) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index cb14b386d12..b8a0a0ad425 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -339,6 +339,10 @@ public class ConfigurationControlManager { return DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR; } else if (configRecord.value() == null) { allConfigs.remove(configRecord.name()); + } else if (configRecord.value().length() > Short.MAX_VALUE) { + // In KRaft mode, large config values cannot be created by appending. + // If the size exceeds Short.MAX_VALUE, this error will be thrown to notify the user. + return DISALLOWED_CONFIG_VALUE_SIZE_ERROR; } else { allConfigs.put(configRecord.name(), configRecord.value()); } @@ -385,6 +389,10 @@ public class ConfigurationControlManager { new ApiError(INVALID_CONFIG, "Cluster-level " + MIN_IN_SYNC_REPLICAS_CONFIG + " cannot be removed while ELR is enabled."); + private static final ApiError DISALLOWED_CONFIG_VALUE_SIZE_ERROR = + new ApiError(INVALID_CONFIG, "The configuration value cannot be added because " + + "it exceeds the maximum value size of " + Short.MAX_VALUE + " bytes."); + boolean isDisallowedBrokerMinIsrTransition(ConfigRecord configRecord) { if (configRecord.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG) && configRecord.resourceType() == BROKER.id() && diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index 4c1c73f64ac..62a920bdc9b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -210,6 +210,15 @@ public class ConfigurationControlManagerTest { setName("abc").setValue(null), CONFIG_RECORD.highestSupportedVersion())), ApiError.NONE), manager.incrementalAlterConfig(MYTOPIC, toMap(entry("abc", entry(DELETE, "xyz"))), true)); + + // The configuration value exceeding the maximum size is not allowed to be added. + String largeValue = new String(new char[Short.MAX_VALUE - APPEND.id() - 1]); + Map> largeValueOfOps = toMap(entry("abc", entry(APPEND, largeValue))); + + ControllerResult invalidConfigValueResult = manager.incrementalAlterConfig(MYTOPIC, largeValueOfOps, true); + assertEquals(Errors.INVALID_CONFIG, invalidConfigValueResult.response().error()); + assertEquals("The configuration value cannot be added because it exceeds the maximum value size of " + Short.MAX_VALUE + " bytes.", + invalidConfigValueResult.response().message()); } @Test