From df73133f3b147e2f4f11907d951560f2e9400863 Mon Sep 17 00:00:00 2001 From: Ken Huang Date: Mon, 9 Jun 2025 03:13:03 +0800 Subject: [PATCH] MINOR: Follow up KAFKA-19080 MetadataLogConfig (#19842) See Discussion: https://github.com/apache/kafka/pull/19371#discussion_r2109549343 Do the following changes: - Update the internal config name with metadata prefix - add the warning message for setting `INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG` Reviewers: Chia-Ping Tsai --- .../scala/kafka/raft/KafkaMetadataLog.scala | 5 ++++ .../kafka/raft/KafkaMetadataLogTest.scala | 6 ++--- .../unit/kafka/server/KafkaConfigTest.scala | 6 ++--- .../apache/kafka/raft/MetadataLogConfig.java | 24 +++++++++---------- 4 files changed, 23 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index dc5f3ca11d5..38f3d06168a 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -637,6 +637,11 @@ object KafkaMetadataLog extends Logging { nodeId ) + if (defaultLogConfig.segmentSize() < config.logSegmentBytes()) { + metadataLog.error(s"Overriding ${MetadataLogConfig.INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG} is only supported for testing. Setting " + + s"this value too low may lead to an inability to write batches of metadata records.") + } + // When recovering, truncate fully if the latest snapshot is after the log end offset. This can happen to a follower // when the follower crashes after downloading a snapshot from the leader but before it could truncate the log fully. metadataLog.truncateToLatestSnapshot() diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala index 4b40118e22a..bd737ccd566 100644 --- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala +++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala @@ -1164,9 +1164,9 @@ object KafkaMetadataLogTest { MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG, logSegmentMillis, MetadataLogConfig.METADATA_MAX_RETENTION_BYTES_CONFIG, retentionMaxBytes, MetadataLogConfig.METADATA_MAX_RETENTION_MILLIS_CONFIG, retentionMillis, - MetadataLogConfig.INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG, internalMaxBatchSizeInBytes, - MetadataLogConfig.INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG, internalMaxFetchSizeInBytes, - MetadataLogConfig.INTERNAL_DELETE_DELAY_MILLIS_CONFIG, internalDeleteDelayMillis, + MetadataLogConfig.INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_CONFIG, internalMaxBatchSizeInBytes, + MetadataLogConfig.INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_CONFIG, internalMaxFetchSizeInBytes, + MetadataLogConfig.INTERNAL_METADATA_DELETE_DELAY_MILLIS_CONFIG, internalDeleteDelayMillis, ) new MetadataLogConfig(new AbstractConfig(MetadataLogConfig.CONFIG_DEF, config, false)) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 4f269d5ed7c..d16201462dd 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -794,9 +794,9 @@ class KafkaConfigTest { case MetadataLogConfig.METADATA_MAX_RETENTION_BYTES_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number") case MetadataLogConfig.METADATA_MAX_RETENTION_MILLIS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number") case MetadataLogConfig.INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG => // no op - case MetadataLogConfig.INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG => // no op - case MetadataLogConfig.INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG => // no op - case MetadataLogConfig.INTERNAL_DELETE_DELAY_MILLIS_CONFIG => // no op + case MetadataLogConfig.INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_CONFIG => // no op + case MetadataLogConfig.INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_CONFIG => // no op + case MetadataLogConfig.INTERNAL_METADATA_DELETE_DELAY_MILLIS_CONFIG => // no op case KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG => // ignore string case MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number") diff --git a/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java b/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java index 529a9ded415..93821df8db3 100644 --- a/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java @@ -78,14 +78,14 @@ public class MetadataLogConfig { "controller should write no-op records to the metadata partition. If the value is 0, no-op records " + "are not appended to the metadata partition. The default value is " + METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT; - public static final String INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG = "internal.max.batch.size.in.bytes"; - public static final String INTERNAL_MAX_BATCH_SIZE_IN_BYTES_DOC = "The largest record batch size allowed in the metadata log, only for testing."; + public static final String INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_CONFIG = "internal.metadata.max.batch.size.in.bytes"; + public static final String INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_DOC = "The largest record batch size allowed in the metadata log, only for testing."; - public static final String INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG = "internal.max.fetch.size.in.bytes"; - public static final String INTERNAL_MAX_FETCH_SIZE_IN_BYTES_DOC = "The maximum number of bytes to read when fetching from the metadata log, only for testing."; + public static final String INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_CONFIG = "internal.metadata.max.fetch.size.in.bytes"; + public static final String INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_DOC = "The maximum number of bytes to read when fetching from the metadata log, only for testing."; - public static final String INTERNAL_DELETE_DELAY_MILLIS_CONFIG = "internal.delete.delay.millis"; - public static final String INTERNAL_DELETE_DELAY_MILLIS_DOC = "The amount of time to wait before deleting a file from the filesystem, only for testing."; + public static final String INTERNAL_METADATA_DELETE_DELAY_MILLIS_CONFIG = "internal.metadata.delete.delay.millis"; + public static final String INTERNAL_METADATA_DELETE_DELAY_MILLIS_DOC = "The amount of time to wait before deleting a file from the filesystem, only for testing."; public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC) @@ -97,9 +97,9 @@ public class MetadataLogConfig { .define(METADATA_MAX_RETENTION_MILLIS_CONFIG, LONG, METADATA_MAX_RETENTION_MILLIS_DEFAULT, null, HIGH, METADATA_MAX_RETENTION_MILLIS_DOC) .define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, METADATA_MAX_IDLE_INTERVAL_MS_DOC) .defineInternal(INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG, INT, null, null, LOW, INTERNAL_METADATA_LOG_SEGMENT_BYTES_DOC) - .defineInternal(INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG, INT, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, null, LOW, INTERNAL_MAX_BATCH_SIZE_IN_BYTES_DOC) - .defineInternal(INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG, INT, KafkaRaftClient.MAX_FETCH_SIZE_BYTES, null, LOW, INTERNAL_MAX_FETCH_SIZE_IN_BYTES_DOC) - .defineInternal(INTERNAL_DELETE_DELAY_MILLIS_CONFIG, LONG, ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT, null, LOW, INTERNAL_DELETE_DELAY_MILLIS_DOC); + .defineInternal(INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_CONFIG, INT, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, null, LOW, INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_DOC) + .defineInternal(INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_CONFIG, INT, KafkaRaftClient.MAX_FETCH_SIZE_BYTES, null, LOW, INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_DOC) + .defineInternal(INTERNAL_METADATA_DELETE_DELAY_MILLIS_CONFIG, LONG, ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT, null, LOW, INTERNAL_METADATA_DELETE_DELAY_MILLIS_DOC); private final int logSegmentBytes; private final Integer internalSegmentBytes; @@ -116,9 +116,9 @@ public class MetadataLogConfig { this.logSegmentMillis = config.getLong(METADATA_LOG_SEGMENT_MILLIS_CONFIG); this.retentionMaxBytes = config.getLong(METADATA_MAX_RETENTION_BYTES_CONFIG); this.retentionMillis = config.getLong(METADATA_MAX_RETENTION_MILLIS_CONFIG); - this.internalMaxBatchSizeInBytes = config.getInt(INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG); - this.internalMaxFetchSizeInBytes = config.getInt(INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG); - this.internalDeleteDelayMillis = config.getLong(INTERNAL_DELETE_DELAY_MILLIS_CONFIG); + this.internalMaxBatchSizeInBytes = config.getInt(INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_CONFIG); + this.internalMaxFetchSizeInBytes = config.getInt(INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_CONFIG); + this.internalDeleteDelayMillis = config.getLong(INTERNAL_METADATA_DELETE_DELAY_MILLIS_CONFIG); } public int logSegmentBytes() {