KAFKA-16368: Add a new constraint for segment.bytes to min 1MB for KIP-1030 (#18140)

Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Jason Taylor 2025-01-16 15:07:00 +00:00 committed by GitHub
parent 724cf84de9
commit 54fe0f0135
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 14 additions and 8 deletions

View File

@ -56,6 +56,7 @@ import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import scala.Option;
import scala.jdk.javaapi.OptionConverters;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@ -238,7 +239,6 @@ public class DeleteTopicTest {
@ClusterTest(serverProperties = {
@ClusterConfigProperty(key = "log.cleaner.enable", value = "true"),
@ClusterConfigProperty(key = "log.cleanup.policy", value = "compact"),
@ClusterConfigProperty(key = "log.segment.bytes", value = "100"),
@ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size", value = "1048577")
})
public void testDeleteTopicWithCleaner(ClusterInstance cluster) throws Exception {
@ -251,6 +251,8 @@ public class DeleteTopicTest {
"Replicas for topic test not created.");
UnifiedLog log = server.logManager().getLog(topicPartition, false).get();
writeDups(100, 3, log);
// force roll the segment so that cleaner can work on it
server.logManager().getLog(topicPartition, false).get().roll(Option.empty());
// wait for cleaner to clean
server.logManager().cleaner().awaitCleaned(topicPartition, 0, 60000);
admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();

View File

@ -128,7 +128,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
props.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "requested")
props.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, "PLAIN")
props.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(","))
props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "2000") // low value to test log rolling on config update
props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576") // low value to test log rolling on config update
props.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "2") // greater than one to test reducing threads
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 1680000000.toString)
props.put(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, 168.toString)
@ -587,7 +587,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val props = new Properties
val logIndexSizeMaxBytes = "100000"
val logRetentionMs = TimeUnit.DAYS.toMillis(1)
props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "4000")
props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576")
props.put(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, TimeUnit.HOURS.toMillis(2).toString)
props.put(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG, TimeUnit.HOURS.toMillis(1).toString)
props.put(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG, logIndexSizeMaxBytes)
@ -609,11 +609,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG, TimestampType.LOG_APPEND_TIME.toString)
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, "1000")
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, "1000")
reconfigureServers(props, perBrokerConfig = false, (ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "4000"))
reconfigureServers(props, perBrokerConfig = false, (ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576"))
// Verify that all broker defaults have been updated
servers.foreach { server =>
props.forEach { (k, v) =>
TestUtils.waitUntilTrue(() => server.config.originals.get(k) != null, "Configs not present")
assertEquals(server.config.originals.get(k).toString, v, s"Not reconfigured $k")
}
}
@ -624,7 +625,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
"Config not updated in LogManager")
val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found"))
TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing topic config using defaults not updated")
TestUtils.waitUntilTrue(() => log.config.segmentSize == 1048576, "Existing topic config using defaults not updated")
val KafkaConfigToLogConfigName: Map[String, String] =
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) }
props.asScala.foreach { case (k, v) =>

View File

@ -1118,7 +1118,7 @@ class KafkaConfigTest {
case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG =>
assertDynamic(kafkaConfigProp, "5", () => config.zstdCompressionLevel)
case TopicConfig.SEGMENT_BYTES_CONFIG =>
assertDynamic(kafkaConfigProp, 10000, () => config.logSegmentBytes)
assertDynamic(kafkaConfigProp, 1048576, () => config.logSegmentBytes)
case TopicConfig.SEGMENT_MS_CONFIG =>
assertDynamic(kafkaConfigProp, 10001L, () => config.logRollTimeMillis)
case TopicConfig.DELETE_RETENTION_MS_CONFIG =>

View File

@ -52,7 +52,6 @@ class LogOffsetTest extends BaseRequestTest {
props.put("num.partitions", "20")
props.put("log.retention.hours", "10")
props.put("log.retention.check.interval.ms", (5 * 1000 * 60).toString)
props.put("log.segment.bytes", "140")
}
@ParameterizedTest

View File

@ -111,6 +111,10 @@
The <code>remote.log.manager.thread.pool.size</code> configuration default value was changed to 2 from 10.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
</li>
<li>
The minimum <code>segment.bytes/log.segment.bytes</code> has changed from 14 bytes to 1MB.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
</li>
</ul>
</li>
<li><b>MirrorMaker</b>

View File

@ -159,7 +159,7 @@ public class LogConfig extends AbstractConfig {
.define(ServerLogConfigs.NUM_PARTITIONS_CONFIG, INT, ServerLogConfigs.NUM_PARTITIONS_DEFAULT, atLeast(1), MEDIUM, ServerLogConfigs.NUM_PARTITIONS_DOC)
.define(ServerLogConfigs.LOG_DIR_CONFIG, STRING, ServerLogConfigs.LOG_DIR_DEFAULT, HIGH, ServerLogConfigs.LOG_DIR_DOC)
.define(ServerLogConfigs.LOG_DIRS_CONFIG, STRING, null, HIGH, ServerLogConfigs.LOG_DIRS_DOC)
.define(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, INT, DEFAULT_SEGMENT_BYTES, atLeast(LegacyRecord.RECORD_OVERHEAD_V0), HIGH, ServerLogConfigs.LOG_SEGMENT_BYTES_DOC)
.define(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, INT, DEFAULT_SEGMENT_BYTES, atLeast(1024 * 1024), HIGH, ServerLogConfigs.LOG_SEGMENT_BYTES_DOC)
.define(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, LONG, null, HIGH, ServerLogConfigs.LOG_ROLL_TIME_MILLIS_DOC)
.define(ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG, INT, (int) TimeUnit.MILLISECONDS.toHours(DEFAULT_SEGMENT_MS), atLeast(1), HIGH, ServerLogConfigs.LOG_ROLL_TIME_HOURS_DOC)