diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index cb14b386d12..b8a0a0ad425 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -339,6 +339,10 @@ public class ConfigurationControlManager { return DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR; } else if (configRecord.value() == null) { allConfigs.remove(configRecord.name()); + } else if (configRecord.value().length() > Short.MAX_VALUE) { + // In KRaft mode, large config values cannot be created by appending. + // If the size exceeds Short.MAX_VALUE, this error will be thrown to notify the user. + return DISALLOWED_CONFIG_VALUE_SIZE_ERROR; } else { allConfigs.put(configRecord.name(), configRecord.value()); } @@ -385,6 +389,10 @@ public class ConfigurationControlManager { new ApiError(INVALID_CONFIG, "Cluster-level " + MIN_IN_SYNC_REPLICAS_CONFIG + " cannot be removed while ELR is enabled."); + private static final ApiError DISALLOWED_CONFIG_VALUE_SIZE_ERROR = + new ApiError(INVALID_CONFIG, "The configuration value cannot be added because " + + "it exceeds the maximum value size of " + Short.MAX_VALUE + " bytes."); + boolean isDisallowedBrokerMinIsrTransition(ConfigRecord configRecord) { if (configRecord.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG) && configRecord.resourceType() == BROKER.id() && diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index 4c1c73f64ac..62a920bdc9b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -210,6 +210,15 @@ public class ConfigurationControlManagerTest { setName("abc").setValue(null), CONFIG_RECORD.highestSupportedVersion())), ApiError.NONE), manager.incrementalAlterConfig(MYTOPIC, toMap(entry("abc", entry(DELETE, "xyz"))), true)); + + // The configuration value exceeding the maximum size is not allowed to be added. + String largeValue = new String(new char[Short.MAX_VALUE - APPEND.id() - 1]); + Map> largeValueOfOps = toMap(entry("abc", entry(APPEND, largeValue))); + + ControllerResult invalidConfigValueResult = manager.incrementalAlterConfig(MYTOPIC, largeValueOfOps, true); + assertEquals(Errors.INVALID_CONFIG, invalidConfigValueResult.response().error()); + assertEquals("The configuration value cannot be added because it exceeds the maximum value size of " + Short.MAX_VALUE + " bytes.", + invalidConfigValueResult.response().message()); } @Test