diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 979b5379cc2..b130609bd31 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -430,6 +430,7 @@ + diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index b23b770558f..8fb37665c3a 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -529,7 +529,7 @@ object ConfigCommand extends Logging { private val nl: String = System.lineSeparator() val addConfig: OptionSpec[String] = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " + - "For entity-type '" + TopicType + "': " + LogConfig.configNames.asScala.map("\t" + _).mkString(nl, nl, nl) + + "For entity-type '" + TopicType + "': " + LogConfig.nonInternalConfigNames.asScala.map("\t" + _).mkString(nl, nl, nl) + "For entity-type '" + BrokerType + "': " + DynamicConfig.Broker.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) + "For entity-type '" + UserType + "': " + QuotaConfig.scramMechanismsPlusUserAndClientQuotaConfigs().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) + "For entity-type '" + ClientType + "': " + QuotaConfig.userAndClientQuotaConfigs().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) + diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index f7b776f1336..dc5f3ca11d5 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -29,7 +29,7 @@ import org.apache.kafka.common.record.{MemoryRecords, Records} import org.apache.kafka.common.utils.LogContext import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} -import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, MetadataLogConfig, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch} +import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, MetadataLogConfig, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch} import org.apache.kafka.server.common.OffsetAndEpoch import org.apache.kafka.server.config.ServerLogConfigs import org.apache.kafka.server.storage.log.FetchIsolation @@ -73,7 +73,7 @@ final class KafkaMetadataLog private ( case _ => throw new IllegalArgumentException(s"Unhandled read isolation $readIsolation") } - val fetchInfo = log.read(startOffset, config.maxFetchSizeInBytes, isolation, true) + val fetchInfo = log.read(startOffset, config.internalMaxFetchSizeInBytes, isolation, true) new LogFetchInfo( fetchInfo.records, @@ -557,7 +557,7 @@ final class KafkaMetadataLog private ( scheduler.scheduleOnce( "delete-snapshot-files", () => KafkaMetadataLog.deleteSnapshotFiles(log.dir.toPath, expiredSnapshots), - config.deleteDelayMillis + config.internalDeleteDelayMillis ) } } @@ -588,9 +588,11 @@ object KafkaMetadataLog extends Logging { nodeId: Int ): KafkaMetadataLog = { val props = new Properties() - props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, config.maxBatchSizeInBytes.toString) - props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString) - props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, config.logSegmentMillis.toString) + props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, config.internalMaxBatchSizeInBytes.toString) + if (config.internalSegmentBytes() != null) + props.setProperty(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, config.internalSegmentBytes().toString) + else + props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString) props.setProperty(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT.toString) // Disable time and byte retention when deleting segments @@ -599,11 +601,7 @@ object KafkaMetadataLog extends Logging { LogConfig.validate(props) val defaultLogConfig = new LogConfig(props) - if (config.logSegmentBytes < config.logSegmentMinBytes) { - throw new InvalidConfigurationException( - s"Cannot set ${MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG} below ${config.logSegmentMinBytes}: ${config.logSegmentBytes}" - ) - } else if (defaultLogConfig.retentionMs >= 0) { + if (defaultLogConfig.retentionMs >= 0) { throw new InvalidConfigurationException( s"Cannot set ${TopicConfig.RETENTION_MS_CONFIG} above -1: ${defaultLogConfig.retentionMs}." ) @@ -639,12 +637,6 @@ object KafkaMetadataLog extends Logging { nodeId ) - // Print a warning if users have overridden the internal config - if (config.logSegmentMinBytes != KafkaRaftClient.MAX_BATCH_SIZE_BYTES) { - metadataLog.error(s"Overriding ${MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG} is only supported for testing. Setting " + - s"this value too low may lead to an inability to write batches of metadata records.") - } - // When recovering, truncate fully if the latest snapshot is after the log end offset. This can happen to a follower // when the follower crashes after downloading a snapshot from the leader but before it could truncate the log fully. metadataLog.truncateToLatestSnapshot() diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 11a334bf6e7..dc24bd84c59 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -1567,7 +1567,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @MethodSource(Array("getTestGroupProtocolParametersAll")) def testDeleteRecordsAfterCorruptRecords(groupProtocol: String): Unit = { val config = new Properties() - config.put(TopicConfig.SEGMENT_BYTES_CONFIG, "200") + config.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "200") createTopic(topic, numPartitions = 1, replicationFactor = 1, config) client = createAdminClient diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala index 7a385fa6646..fe5f1643d39 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala @@ -566,7 +566,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu client.createAcls(java.util.List.of(denyAcl), new CreateAclsOptions()).all().get() val topics = Seq(topic1, topic2) - val configsOverride = java.util.Map.of(TopicConfig.SEGMENT_BYTES_CONFIG, "100000") + val configsOverride = java.util.Map.of(TopicConfig.SEGMENT_BYTES_CONFIG, "3000000") val newTopics = java.util.List.of( new NewTopic(topic1, 2, 3.toShort).configs(configsOverride), new NewTopic(topic2, Optional.empty[Integer], Optional.empty[java.lang.Short]).configs(configsOverride)) @@ -580,7 +580,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu val topicConfigs = result.config(topic1).get().entries.asScala assertTrue(topicConfigs.nonEmpty) val segmentBytesConfig = topicConfigs.find(_.name == TopicConfig.SEGMENT_BYTES_CONFIG).get - assertEquals(100000, segmentBytesConfig.value.toLong) + assertEquals(3000000, segmentBytesConfig.value.toLong) assertEquals(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG, segmentBytesConfig.source) val compressionConfig = topicConfigs.find(_.name == TopicConfig.COMPRESSION_TYPE_CONFIG).get assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT, compressionConfig.value) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index ff4e54f7eea..5bbe7e54d55 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -654,7 +654,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup "Config not updated in LogManager") val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found")) - TestUtils.waitUntilTrue(() => log.config.segmentSize == 1048576, "Existing topic config using defaults not updated") + TestUtils.waitUntilTrue(() => log.config.segmentSize() == 1048576, "Existing topic config using defaults not updated") val KafkaConfigToLogConfigName: Map[String, String] = ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) } props.asScala.foreach { case (k, v) => diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala index 37d6ef0d4c3..4b40118e22a 100644 --- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala +++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala @@ -20,7 +20,7 @@ import kafka.server.{KafkaConfig, KafkaRaftServer} import kafka.utils.TestUtils import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.errors.CorruptRecordException -import org.apache.kafka.common.errors.{InvalidConfigurationException, RecordTooLargeException} +import org.apache.kafka.common.errors.RecordTooLargeException import org.apache.kafka.common.protocol import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable} import org.apache.kafka.common.record.ArbitraryMemoryRecords @@ -43,6 +43,7 @@ import org.junit.jupiter.params.provider.ArgumentsSource import net.jqwik.api.AfterFailureMode import net.jqwik.api.ForAll import net.jqwik.api.Property +import org.apache.kafka.common.config.{AbstractConfig, ConfigException} import org.apache.kafka.server.common.OffsetAndEpoch import java.io.File @@ -78,13 +79,13 @@ final class KafkaMetadataLogTest { props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL") props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, Int.box(10240)) props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG, Int.box(10 * 1024)) - assertThrows(classOf[InvalidConfigurationException], () => { + assertThrows(classOf[ConfigException], () => { val kafkaConfig = KafkaConfig.fromProps(props) val metadataConfig = new MetadataLogConfig(kafkaConfig) buildMetadataLog(tempDir, mockTime, metadataConfig) }) - props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, Int.box(10240)) + props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, Int.box(10 * 1024 * 1024)) val kafkaConfig = KafkaConfig.fromProps(props) val metadataConfig = new MetadataLogConfig(kafkaConfig) buildMetadataLog(tempDir, mockTime, metadataConfig) @@ -478,7 +479,7 @@ final class KafkaMetadataLogTest { assertEquals(log.earliestSnapshotId(), log.latestSnapshotId()) log.close() - mockTime.sleep(config.deleteDelayMillis) + mockTime.sleep(config.internalDeleteDelayMillis) // Assert that the log dir doesn't contain any older snapshots Files .walk(logDir, 1) @@ -649,7 +650,7 @@ final class KafkaMetadataLogTest { assertEquals(greaterSnapshotId, secondLog.latestSnapshotId().get) assertEquals(3 * numberOfRecords, secondLog.startOffset) assertEquals(epoch, secondLog.lastFetchedEpoch) - mockTime.sleep(config.deleteDelayMillis) + mockTime.sleep(config.internalDeleteDelayMillis) // Assert that the log dir doesn't contain any older snapshots Files @@ -687,15 +688,12 @@ final class KafkaMetadataLogTest { val leaderEpoch = 5 val maxBatchSizeInBytes = 16384 val recordSize = 64 - val config = new MetadataLogConfig( + val config = createMetadataLogConfig( DefaultMetadataLogConfig.logSegmentBytes, - DefaultMetadataLogConfig.logSegmentMinBytes, DefaultMetadataLogConfig.logSegmentMillis, DefaultMetadataLogConfig.retentionMaxBytes, DefaultMetadataLogConfig.retentionMillis, - maxBatchSizeInBytes, - DefaultMetadataLogConfig.maxFetchSizeInBytes, - DefaultMetadataLogConfig.deleteDelayMillis + maxBatchSizeInBytes ) val log = buildMetadataLog(tempDir, mockTime, config) @@ -907,15 +905,13 @@ final class KafkaMetadataLogTest { @Test def testAdvanceLogStartOffsetAfterCleaning(): Unit = { - val config = new MetadataLogConfig( - 512, + val config = createMetadataLogConfig( 512, 10 * 1000, 256, 60 * 1000, 512, - DefaultMetadataLogConfig.maxFetchSizeInBytes, - ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT + DefaultMetadataLogConfig.internalMaxFetchSizeInBytes, ) val log = buildMetadataLog(tempDir, mockTime, config) @@ -944,15 +940,12 @@ final class KafkaMetadataLogTest { @Test def testDeleteSnapshots(): Unit = { // Generate some logs and a few snapshots, set retention low and verify that cleaning occurs - val config = new MetadataLogConfig( - 1024, + val config = createMetadataLogConfig( 1024, 10 * 1000, 1024, 60 * 1000, 100, - DefaultMetadataLogConfig.maxBatchSizeInBytes, - DefaultMetadataLogConfig.maxFetchSizeInBytes ) val log = buildMetadataLog(tempDir, mockTime, config) @@ -978,15 +971,12 @@ final class KafkaMetadataLogTest { @Test def testSoftRetentionLimit(): Unit = { // Set retention equal to the segment size and generate slightly more than one segment of logs - val config = new MetadataLogConfig( - 10240, + val config = createMetadataLogConfig( 10240, 10 * 1000, 10240, 60 * 1000, 100, - DefaultMetadataLogConfig.maxFetchSizeInBytes, - DefaultMetadataLogConfig.deleteDelayMillis ) val log = buildMetadataLog(tempDir, mockTime, config) @@ -1022,15 +1012,12 @@ final class KafkaMetadataLogTest { @Test def testSegmentsLessThanLatestSnapshot(): Unit = { - val config = new MetadataLogConfig( - 10240, + val config = createMetadataLogConfig( 10240, 10 * 1000, 10240, 60 * 1000, 200, - DefaultMetadataLogConfig.maxFetchSizeInBytes, - DefaultMetadataLogConfig.deleteDelayMillis ) val log = buildMetadataLog(tempDir, mockTime, config) @@ -1081,15 +1068,11 @@ object KafkaMetadataLogTest { override def read(input: protocol.Readable, size: Int): Array[Byte] = input.readArray(size) } - val DefaultMetadataLogConfig = new MetadataLogConfig( - 100 * 1024, + val DefaultMetadataLogConfig = createMetadataLogConfig( 100 * 1024, 10 * 1000, 100 * 1024, 60 * 1000, - KafkaRaftClient.MAX_BATCH_SIZE_BYTES, - KafkaRaftClient.MAX_FETCH_SIZE_BYTES, - ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT ) def buildMetadataLogAndDir( @@ -1166,4 +1149,25 @@ object KafkaMetadataLogTest { } dir } + + private def createMetadataLogConfig( + internalLogSegmentBytes: Int, + logSegmentMillis: Long, + retentionMaxBytes: Long, + retentionMillis: Long, + internalMaxBatchSizeInBytes: Int = KafkaRaftClient.MAX_BATCH_SIZE_BYTES, + internalMaxFetchSizeInBytes: Int = KafkaRaftClient.MAX_FETCH_SIZE_BYTES, + internalDeleteDelayMillis: Long = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT + ): MetadataLogConfig = { + val config: util.Map[String, Any] = util.Map.of( + MetadataLogConfig.INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG, internalLogSegmentBytes, + MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG, logSegmentMillis, + MetadataLogConfig.METADATA_MAX_RETENTION_BYTES_CONFIG, retentionMaxBytes, + MetadataLogConfig.METADATA_MAX_RETENTION_MILLIS_CONFIG, retentionMillis, + MetadataLogConfig.INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG, internalMaxBatchSizeInBytes, + MetadataLogConfig.INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG, internalMaxFetchSizeInBytes, + MetadataLogConfig.INTERNAL_DELETE_DELAY_MILLIS_CONFIG, internalDeleteDelayMillis, + ) + new MetadataLogConfig(new AbstractConfig(MetadataLogConfig.CONFIG_DEF, config, false)) + } } diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala index d9eaa7b2aac..fff1930a718 100644 --- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala @@ -100,7 +100,7 @@ class AbstractPartitionTest { def createLogProperties(overrides: Map[String, String]): Properties = { val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 512: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 512: java.lang.Integer) logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1000: java.lang.Integer) logProps.put(TopicConfig.RETENTION_MS_CONFIG, 999: java.lang.Integer) overrides.foreach { case (k, v) => logProps.put(k, v) } diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 4a3051ddc95..77b098cf682 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -362,7 +362,7 @@ class PartitionLockTest extends Logging { private def createLogProperties(overrides: Map[String, String]): Properties = { val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 512: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 512: java.lang.Integer) logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1000: java.lang.Integer) logProps.put(TopicConfig.RETENTION_MS_CONFIG, 999: java.lang.Integer) overrides.foreach { case (k, v) => logProps.put(k, v) } diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala index acf21e69ec3..b1e161b9753 100644 --- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala @@ -71,7 +71,7 @@ abstract class AbstractLogCleanerIntegrationTest { maxCompactionLagMs: Long = defaultMaxCompactionLagMs): Properties = { val props = new Properties() props.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageSize: java.lang.Integer) - props.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentSize: java.lang.Integer) + props.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentSize: java.lang.Integer) props.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 100*1024: java.lang.Integer) props.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, deleteDelay: java.lang.Integer) props.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index f93d703f077..8445baa7719 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -53,7 +53,7 @@ class LogCleanerManagerTest extends Logging { val topicPartition = new TopicPartition("log", 0) val topicPartition2 = new TopicPartition("log2", 0) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024: java.lang.Integer) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) val logConfig: LogConfig = new LogConfig(logProps) @@ -370,7 +370,7 @@ class LogCleanerManagerTest extends Logging { // change cleanup policy from delete to compact val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, log.config.segmentSize: Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, log.config.segmentSize(): Integer) logProps.put(TopicConfig.RETENTION_MS_CONFIG, log.config.retentionMs: java.lang.Long) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0: Integer) @@ -548,7 +548,7 @@ class LogCleanerManagerTest extends Logging { @Test def testCleanableOffsetsForNone(): Unit = { val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -570,7 +570,7 @@ class LogCleanerManagerTest extends Logging { @Test def testCleanableOffsetsActiveSegment(): Unit = { val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -592,7 +592,7 @@ class LogCleanerManagerTest extends Logging { def testCleanableOffsetsForTime(): Unit = { val compactionLag = 60 * 60 * 1000 val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -625,7 +625,7 @@ class LogCleanerManagerTest extends Logging { def testCleanableOffsetsForShortTime(): Unit = { val compactionLag = 60 * 60 * 1000 val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -667,7 +667,7 @@ class LogCleanerManagerTest extends Logging { def testUndecidedTransactionalDataNotCleanable(): Unit = { val compactionLag = 60 * 60 * 1000 val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -711,7 +711,7 @@ class LogCleanerManagerTest extends Logging { @Test def testDoneCleaning(): Unit = { val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) while (log.numberOfSegments < 8) log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), 0) @@ -830,7 +830,7 @@ class LogCleanerManagerTest extends Logging { private def createLowRetentionLogConfig(segmentSize: Int, cleanupPolicy: String): LogConfig = { val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentSize: Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentSize: Integer) logProps.put(TopicConfig.RETENTION_MS_CONFIG, 1: Integer) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicy) logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0.05: java.lang.Double) // small for easier and clearer tests diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 09a9d1c40f1..1bebfaa49e1 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -56,7 +56,7 @@ class LogCleanerTest extends Logging { val tmpdir = TestUtils.tempDir() val dir = TestUtils.randomPartitionLogDir(tmpdir) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024: java.lang.Integer) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) val logConfig = new LogConfig(logProps) @@ -148,7 +148,7 @@ class LogCleanerTest extends Logging { def testCleanSegments(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -181,7 +181,7 @@ class LogCleanerTest extends Logging { // Construct a log instance. The replaceSegments() method of the log instance is overridden so that // it waits for another thread to execute deleteOldSegments() val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024 : java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024 : java.lang.Integer) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE) val config = LogConfig.fromProps(logConfig.originals, logProps) val topicPartition = UnifiedLog.parseTopicPartitionName(dir) @@ -271,7 +271,7 @@ class LogCleanerTest extends Logging { val originalMaxFileSize = 1024 val cleaner = makeCleaner(2) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, originalMaxFileSize: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, originalMaxFileSize: java.lang.Integer) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact": java.lang.String) logProps.put(TopicConfig.PREALLOCATE_CONFIG, "true": java.lang.String) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -295,7 +295,7 @@ class LogCleanerTest extends Logging { def testDuplicateCheckAfterCleaning(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 2048: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 2048: java.lang.Integer) var log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) val producerEpoch = 0.toShort @@ -464,7 +464,7 @@ class LogCleanerTest extends Logging { def testBasicTransactionAwareCleaning(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 2048: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 2048: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) val producerEpoch = 0.toShort @@ -497,7 +497,7 @@ class LogCleanerTest extends Logging { def testCleanWithTransactionsSpanningSegments(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) val producerEpoch = 0.toShort @@ -543,7 +543,7 @@ class LogCleanerTest extends Logging { def testCommitMarkerRemoval(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 256: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) val producerEpoch = 0.toShort @@ -591,7 +591,7 @@ class LogCleanerTest extends Logging { val cleaner = makeCleaner(capacity = Int.MaxValue, maxMessageSize = 100) val logProps = new Properties() logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 100: java.lang.Integer) - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1000: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1000: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) val producerEpoch = 0.toShort @@ -619,7 +619,7 @@ class LogCleanerTest extends Logging { def testCommitMarkerRetentionWithEmptyBatch(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 256: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) val producerEpoch = 0.toShort @@ -682,7 +682,7 @@ class LogCleanerTest extends Logging { def testCleanEmptyControlBatch(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 256: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) val producerEpoch = 0.toShort @@ -714,7 +714,7 @@ class LogCleanerTest extends Logging { def testCommittedTransactionSpanningSegments(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 128: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 128: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) val producerEpoch = 0.toShort val producerId = 1L @@ -736,7 +736,7 @@ class LogCleanerTest extends Logging { def testAbortedTransactionSpanningSegments(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 128: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 128: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) val producerEpoch = 0.toShort val producerId = 1L @@ -765,7 +765,7 @@ class LogCleanerTest extends Logging { def testAbortMarkerRemoval(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 256: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) val producerEpoch = 0.toShort @@ -799,7 +799,7 @@ class LogCleanerTest extends Logging { val producerId = 1L val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 2048: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 2048: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) val appendFirstTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch, 0, AppendOrigin.REPLICATION) @@ -832,7 +832,7 @@ class LogCleanerTest extends Logging { def testAbortMarkerRetentionWithEmptyBatch(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 256: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) val producerEpoch = 0.toShort @@ -896,7 +896,7 @@ class LogCleanerTest extends Logging { // Create cleaner with very small default max message size val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, largeMessageSize * 16: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, largeMessageSize * 16: java.lang.Integer) logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, largeMessageSize * 2: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -968,7 +968,7 @@ class LogCleanerTest extends Logging { def createLogWithMessagesLargerThanMaxSize(largeMessageSize: Int): (UnifiedLog, FakeOffsetMap) = { val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, largeMessageSize * 16: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, largeMessageSize * 16: java.lang.Integer) logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, largeMessageSize * 2: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -994,7 +994,7 @@ class LogCleanerTest extends Logging { def testCleaningWithDeletes(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -1021,7 +1021,7 @@ class LogCleanerTest extends Logging { // because loadFactor is 0.75, this means we can fit 3 messages in the map val cleaner = makeCleaner(4) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -1050,7 +1050,7 @@ class LogCleanerTest extends Logging { def testLogCleanerRetainsProducerLastSequence(): Unit = { val cleaner = makeCleaner(10) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) log.appendAsLeader(record(0, 0), 0) // offset 0 @@ -1073,7 +1073,7 @@ class LogCleanerTest extends Logging { def testLogCleanerRetainsLastSequenceEvenIfTransactionAborted(): Unit = { val cleaner = makeCleaner(10) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) val producerEpoch = 0.toShort @@ -1107,7 +1107,7 @@ class LogCleanerTest extends Logging { def testCleaningWithKeysConflictingWithTxnMarkerKeys(): Unit = { val cleaner = makeCleaner(10) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) val leaderEpoch = 5 val producerEpoch = 0.toShort @@ -1151,7 +1151,7 @@ class LogCleanerTest extends Logging { // because loadFactor is 0.75, this means we can fit 1 message in the map val cleaner = makeCleaner(2) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -1182,7 +1182,7 @@ class LogCleanerTest extends Logging { def testCleaningWithUncleanableSection(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -1225,7 +1225,7 @@ class LogCleanerTest extends Logging { def testLogToClean(): Unit = { // create a log with small segment size val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 100: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 100: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) // create 6 segments with only one message in each segment @@ -1243,7 +1243,7 @@ class LogCleanerTest extends Logging { def testLogToCleanWithUncleanableSection(): Unit = { // create a log with small segment size val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 100: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 100: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) // create 6 segments with only one message in each segment @@ -1276,7 +1276,7 @@ class LogCleanerTest extends Logging { // create a log with compaction turned off so we can append unkeyed messages val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -1334,7 +1334,7 @@ class LogCleanerTest extends Logging { def testCleanSegmentsWithAbort(): Unit = { val cleaner = makeCleaner(Int.MaxValue, abortCheckDone) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -1356,7 +1356,7 @@ class LogCleanerTest extends Logging { def testCleanSegmentsRetainingLastEmptyBatch(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -1388,7 +1388,7 @@ class LogCleanerTest extends Logging { def testSegmentGrouping(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 300: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 300: java.lang.Integer) logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -1489,7 +1489,7 @@ class LogCleanerTest extends Logging { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 400: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 400: java.lang.Integer) logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) @@ -1541,7 +1541,7 @@ class LogCleanerTest extends Logging { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 400: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 400: java.lang.Integer) //mimic the effect of loading an empty index file logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 400: java.lang.Integer) @@ -1666,7 +1666,7 @@ class LogCleanerTest extends Logging { def testRecoveryAfterCrash(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 300: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 300: java.lang.Integer) logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1: java.lang.Integer) logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, 10: java.lang.Integer) @@ -1797,7 +1797,7 @@ class LogCleanerTest extends Logging { def testBuildOffsetMapFakeLarge(): Unit = { val map = new FakeOffsetMap(1000) val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 120: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 120: java.lang.Integer) logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 120: java.lang.Integer) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) val logConfig = new LogConfig(logProps) @@ -1945,7 +1945,7 @@ class LogCleanerTest extends Logging { @Test def testCleaningBeyondMissingOffsets(): Unit = { val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024*1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024*1024: java.lang.Integer) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) val logConfig = new LogConfig(logProps) val cleaner = makeCleaner(Int.MaxValue) diff --git a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala index 0da8366f443..854be398086 100644 --- a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala @@ -20,7 +20,6 @@ package kafka.log import java.util.{Optional, Properties} import java.util.concurrent.{Callable, Executors} import kafka.utils.TestUtils -import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record.SimpleRecord import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.coordinator.transaction.TransactionLogConfig @@ -60,7 +59,7 @@ class LogConcurrencyTest { @Test def testUncommittedDataNotConsumedFrequentSegmentRolls(): Unit = { val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 237: Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 237: Integer) val logConfig = new LogConfig(logProps) testUncommittedDataNotConsumed(createLog(logConfig)) } diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index fdb827de733..296736fc678 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -34,29 +34,8 @@ import org.apache.kafka.storage.internals.log.{LogConfig, ThrottledReplicaListVa import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource -import scala.jdk.CollectionConverters._ - class LogConfigTest { - /** - * This test verifies that KafkaConfig object initialization does not depend on - * LogConfig initialization. Bad things happen due to static initialization - * order dependencies. For example, LogConfig.configDef ends up adding null - * values in serverDefaultConfigNames. This test ensures that the mapping of - * keys from LogConfig to KafkaConfig are not missing values. - */ - @Test - def ensureNoStaticInitializationOrderDependency(): Unit = { - // Access any KafkaConfig val to load KafkaConfig object before LogConfig. - assertNotNull(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG) - assertTrue(LogConfig.configNames.asScala - .filter(config => !LogConfig.CONFIGS_WITH_NO_SERVER_DEFAULTS.contains(config)) - .forall { config => - val serverConfigOpt = LogConfig.serverConfigName(config) - serverConfigOpt.isPresent && (serverConfigOpt.get != null) - }) - } - @Test def testKafkaConfigToProps(): Unit = { val millisInHour = 60L * 60L * 1000L @@ -94,6 +73,7 @@ class LogConfigTest { case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-0.1") case TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0") case TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0") + case LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG => // no op case _ => assertPropertyInvalid(name, "not_a_number", "-1") }) diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 8e417a695ee..0c465cf2138 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -21,7 +21,6 @@ import kafka.server.KafkaConfig import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.compress.Compression -import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, MemoryRecords, RecordBatch, SimpleRecord, TimestampType} import org.apache.kafka.common.utils.{Time, Utils} @@ -245,7 +244,7 @@ class LogLoaderTest { @Test def testProducerSnapshotsRecoveryAfterUncleanShutdown(): Unit = { val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, "640") + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "640") val logConfig = new LogConfig(logProps) var log = createLog(logDir, logConfig) assertEquals(OptionalLong.empty(), log.oldestProducerSnapshotOffset) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 67880e0ced5..04f4acca5de 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -60,7 +60,7 @@ class LogManagerTest { val maxRollInterval = 100 val maxLogAgeMs: Int = 10 * 60 * 1000 val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 4096: java.lang.Integer) logProps.put(TopicConfig.RETENTION_MS_CONFIG, maxLogAgeMs: java.lang.Integer) val logConfig = new LogConfig(logProps) @@ -391,7 +391,7 @@ class LogManagerTest { logManager.shutdown() val segmentBytes = 10 * setSize val properties = new Properties() - properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentBytes.toString) + properties.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentBytes.toString) properties.put(TopicConfig.RETENTION_BYTES_CONFIG, (5L * 10L * setSize + 10L).toString) val configRepository = MockConfigRepository.forTopic(name, properties) diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala index 04c91741037..0ff68988d76 100644 --- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala @@ -75,7 +75,7 @@ object LogTestUtils { remoteLogDeleteOnDisable: Boolean = DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG): LogConfig = { val logProps = new Properties() logProps.put(TopicConfig.SEGMENT_MS_CONFIG, segmentMs: java.lang.Long) - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentBytes: Integer) + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentBytes: Integer) logProps.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs: java.lang.Long) logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs: java.lang.Long) logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes: java.lang.Long) diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 3f73f8f731a..42c813074aa 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -2734,7 +2734,7 @@ class UnifiedLogTest { @Test def testLeaderEpochCacheCreatedAfterMessageFormatUpgrade(): Unit = { val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1000") + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "1000") logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1") logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "65536") val logConfig = new LogConfig(logProps) diff --git a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala index b45f3d62f47..16cce3ed81a 100644 --- a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala @@ -25,7 +25,7 @@ import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{AlterReplicaLogDirsRequest, AlterReplicaLogDirsResponse} import org.apache.kafka.server.config.ServerLogConfigs -import org.apache.kafka.storage.internals.log.LogFileUtils +import org.apache.kafka.storage.internals.log.{LogConfig, LogFileUtils} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -144,7 +144,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { // We don't want files with `.deleted` suffix are removed too fast, // so we can validate there will be orphan files and orphan files will be removed eventually. topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "10000") - topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024") + topicProperties.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "1024") createTopic(topic, partitionNum, 1, topicProperties) assertEquals(logDir1, brokers.head.logManager.getLog(tp).get.dir.getParent) diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 5f87b20d1f5..c20cd042fc5 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.internals.Plugin import org.apache.kafka.common.metrics.{JmxReporter, Metrics} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig} +import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.server.DynamicThreadPool import org.apache.kafka.server.authorizer._ @@ -670,16 +670,6 @@ class DynamicBrokerConfigTest { assertTrue(m.currentReporters.isEmpty) } - @Test - def testNonInternalValuesDoesNotExposeInternalConfigs(): Unit = { - val props = TestUtils.createBrokerConfig(0, port = 8181) - props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, "1024") - val config = new KafkaConfig(props) - assertFalse(config.nonInternalValues.containsKey(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG)) - config.updateCurrentConfig(new KafkaConfig(props)) - assertFalse(config.nonInternalValues.containsKey(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG)) - } - @Test def testDynamicLogLocalRetentionMsConfig(): Unit = { val props = TestUtils.createBrokerConfig(0, port = 8181) diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 61105b176dc..519a7d951a3 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -95,17 +95,17 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { @Test def testDynamicTopicConfigChange(): Unit = { val tp = new TopicPartition("test", 0) - val oldSegmentSize = 1000 + val oldSegmentSize = 2 * 1024 * 1024 val logProps = new Properties() logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, oldSegmentSize.toString) createTopic(tp.topic, 1, 1, logProps) TestUtils.retry(10000) { val logOpt = this.brokers.head.logManager.getLog(tp) assertTrue(logOpt.isDefined) - assertEquals(oldSegmentSize, logOpt.get.config.segmentSize) + assertEquals(oldSegmentSize, logOpt.get.config.segmentSize()) } - val newSegmentSize = 2000 + val newSegmentSize = 2 * 1024 * 1024 val admin = createAdminClient() try { val resource = new ConfigResource(ConfigResource.Type.TOPIC, tp.topic()) @@ -117,7 +117,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { } val log = brokers.head.logManager.getLog(tp).get TestUtils.retry(10000) { - assertEquals(newSegmentSize, log.config.segmentSize) + assertEquals(newSegmentSize, log.config.segmentSize()) } (1 to 50).foreach(i => TestUtils.produceMessage(brokers, tp.topic, i.toString)) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 3f6063e6e02..fb9b2939f32 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -793,6 +793,10 @@ class KafkaConfigTest { case MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number") case MetadataLogConfig.METADATA_MAX_RETENTION_BYTES_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number") case MetadataLogConfig.METADATA_MAX_RETENTION_MILLIS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number") + case MetadataLogConfig.INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG => // no op + case MetadataLogConfig.INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG => // no op + case MetadataLogConfig.INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG => // no op + case MetadataLogConfig.INTERNAL_DELETE_DELAY_MILLIS_CONFIG => // no op case KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG => // ignore string case MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number") @@ -1135,6 +1139,8 @@ class KafkaConfigTest { // topic only config case QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG => // topic only config + case "internal.segment.bytes" => + // topic internal config case prop => fail(prop + " must be explicitly checked for dynamic updatability. Note that LogConfig(s) require that KafkaConfig value lookups are dynamic and not static values.") } diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index 68a68183c96..67df4f1a7e6 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -33,7 +33,7 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.{Assignment, import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.compress.Compression -import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.config.{AbstractConfig, TopicConfig} import org.apache.kafka.common.message.{KRaftVersionRecord, LeaderChangeMessage, SnapshotFooterRecord, SnapshotHeaderRecord, VotersRecord} import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRecord, TopicRecord} import org.apache.kafka.common.protocol.{ApiMessage, ByteBufferAccessor, MessageUtil, ObjectSerializationCache} @@ -44,9 +44,8 @@ import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, ShareSnap import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogValue} import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.metadata.MetadataRecordSerde -import org.apache.kafka.raft.{KafkaRaftClient, MetadataLogConfig, VoterSetTest} +import org.apache.kafka.raft.{MetadataLogConfig, VoterSetTest} import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, OffsetAndEpoch} -import org.apache.kafka.server.config.ServerLogConfigs import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentMetadataUpdate, RemoteLogSegmentState, RemotePartitionDeleteMetadata, RemotePartitionDeleteState} import org.apache.kafka.server.storage.log.FetchIsolation @@ -584,15 +583,11 @@ class DumpLogSegmentsTest { logDir, time, time.scheduler, - new MetadataLogConfig( - 100 * 1024, + createMetadataLogConfig( 100 * 1024, 10 * 1000, 100 * 1024, - 60 * 1000, - KafkaRaftClient.MAX_BATCH_SIZE_BYTES, - KafkaRaftClient.MAX_FETCH_SIZE_BYTES, - ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT + 60 * 1000 ), 1 ) @@ -1195,4 +1190,19 @@ class DumpLogSegmentsTest { )) ) } + + private def createMetadataLogConfig( + internalLogSegmentBytes: Int, + logSegmentMillis: Long, + retentionMaxBytes: Long, + retentionMillis: Long + ): MetadataLogConfig = { + val config: util.Map[String, Any] = util.Map.of( + MetadataLogConfig.INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG, internalLogSegmentBytes, + MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG, logSegmentMillis, + MetadataLogConfig.METADATA_MAX_RETENTION_BYTES_CONFIG, retentionMaxBytes, + MetadataLogConfig.METADATA_MAX_RETENTION_MILLIS_CONFIG, retentionMillis, + ) + new MetadataLogConfig(new AbstractConfig(MetadataLogConfig.CONFIG_DEF, config, false)) + } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java index f61c4c70e06..a8cf8a47bb1 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java @@ -166,6 +166,10 @@ public class KafkaConfigSchema { ConfigDef configDef = configDefs.getOrDefault(ConfigResource.Type.TOPIC, EMPTY_CONFIG_DEF); HashMap effectiveConfigs = new HashMap<>(); for (ConfigDef.ConfigKey configKey : configDef.configKeys().values()) { + // This config is internal; if the user hasn't set it explicitly, it should not be returned. + if (configKey.internalConfig && !dynamicTopicConfigs.containsKey(configKey.name)) { + continue; + } ConfigEntry entry = resolveEffectiveTopicConfig(configKey, staticNodeConfig, dynamicClusterConfigs, dynamicNodeConfigs, dynamicTopicConfigs); effectiveConfigs.put(entry.name(), entry); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java b/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java index 27683ce8933..92c6e87a018 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java @@ -54,7 +54,8 @@ public class KafkaConfigSchemaTest { define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc doc"). define("def", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "def doc"). define("ghi", ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.HIGH, "ghi doc"). - define("xyz", ConfigDef.Type.PASSWORD, "thedefault", ConfigDef.Importance.HIGH, "xyz doc")); + define("xyz", ConfigDef.Type.PASSWORD, "thedefault", ConfigDef.Importance.HIGH, "xyz doc"). + defineInternal("internal", ConfigDef.Type.STRING, "internalValue", null, ConfigDef.Importance.HIGH, "internal doc")); } public static final Map> SYNONYMS = new HashMap<>(); @@ -167,4 +168,30 @@ public class KafkaConfigSchemaTest { dynamicNodeConfigs, dynamicTopicConfigs)); } + + @Test + public void testResolveEffectiveDynamicInternalTopicConfig() { + Map dynamicTopicConfigs = Map.of( + "ghi", "true", + "internal", "internal,change" + ); + Map expected = Map.of( + "abc", new ConfigEntry("abc", null, + ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, List.of(), + ConfigEntry.ConfigType.LIST, "abc doc"), + "def", new ConfigEntry("def", null, + ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, List.of(), + ConfigEntry.ConfigType.LONG, "def doc"), + "ghi", new ConfigEntry("ghi", "true", + ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, List.of(), + ConfigEntry.ConfigType.BOOLEAN, "ghi doc"), + "xyz", new ConfigEntry("xyz", "thedefault", + ConfigEntry.ConfigSource.DEFAULT_CONFIG, true, false, List.of(), + ConfigEntry.ConfigType.PASSWORD, "xyz doc"), + "internal", new ConfigEntry("internal", "internal,change", + ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, List.of(), + ConfigEntry.ConfigType.STRING, "internal doc") + ); + assertEquals(expected, SCHEMA.resolveEffectiveTopicConfigs(Map.of(), Map.of(), Map.of(), dynamicTopicConfigs)); + } } diff --git a/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java b/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java index b0a6f9f045a..529a9ded415 100644 --- a/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java @@ -18,7 +18,6 @@ package org.apache.kafka.raft; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.record.Records; import org.apache.kafka.server.config.ServerLogConfigs; import java.util.concurrent.TimeUnit; @@ -52,14 +51,13 @@ public class MetadataLogConfig { "configuration. The Kafka node will generate a snapshot when either the maximum time interval is reached or the " + "maximum bytes limit is reached."; - public static final String METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG = "metadata.log.segment.min.bytes"; - public static final String METADATA_LOG_SEGMENT_MIN_BYTES_DOC = "Override the minimum size for a single metadata log file. This should be used for testing only."; - public static final int METADATA_LOG_SEGMENT_MIN_BYTES_DEFAULT = 8 * 1024 * 1024; - public static final String METADATA_LOG_SEGMENT_BYTES_CONFIG = "metadata.log.segment.bytes"; public static final String METADATA_LOG_SEGMENT_BYTES_DOC = "The maximum size of a single metadata log file."; public static final int METADATA_LOG_SEGMENT_BYTES_DEFAULT = 1024 * 1024 * 1024; + public static final String INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG = "internal.metadata.log.segment.bytes"; + public static final String INTERNAL_METADATA_LOG_SEGMENT_BYTES_DOC = "The maximum size of a single metadata log file, only for testing."; + public static final String METADATA_LOG_SEGMENT_MILLIS_CONFIG = "metadata.log.segment.ms"; public static final String METADATA_LOG_SEGMENT_MILLIS_DOC = "The maximum time before a new metadata log file is rolled out (in milliseconds)."; public static final long METADATA_LOG_SEGMENT_MILLIS_DEFAULT = 24 * 7 * 60 * 60 * 1000L; @@ -80,72 +78,55 @@ public class MetadataLogConfig { "controller should write no-op records to the metadata partition. If the value is 0, no-op records " + "are not appended to the metadata partition. The default value is " + METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT; + public static final String INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG = "internal.max.batch.size.in.bytes"; + public static final String INTERNAL_MAX_BATCH_SIZE_IN_BYTES_DOC = "The largest record batch size allowed in the metadata log, only for testing."; + + public static final String INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG = "internal.max.fetch.size.in.bytes"; + public static final String INTERNAL_MAX_FETCH_SIZE_IN_BYTES_DOC = "The maximum number of bytes to read when fetching from the metadata log, only for testing."; + + public static final String INTERNAL_DELETE_DELAY_MILLIS_CONFIG = "internal.delete.delay.millis"; + public static final String INTERNAL_DELETE_DELAY_MILLIS_DOC = "The amount of time to wait before deleting a file from the filesystem, only for testing."; + public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC) .define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC) .define(METADATA_LOG_DIR_CONFIG, STRING, null, null, HIGH, METADATA_LOG_DIR_DOC) - .define(METADATA_LOG_SEGMENT_BYTES_CONFIG, INT, METADATA_LOG_SEGMENT_BYTES_DEFAULT, atLeast(Records.LOG_OVERHEAD), HIGH, METADATA_LOG_SEGMENT_BYTES_DOC) - .defineInternal(METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, INT, METADATA_LOG_SEGMENT_MIN_BYTES_DEFAULT, atLeast(Records.LOG_OVERHEAD), HIGH, METADATA_LOG_SEGMENT_MIN_BYTES_DOC) + .define(METADATA_LOG_SEGMENT_BYTES_CONFIG, INT, METADATA_LOG_SEGMENT_BYTES_DEFAULT, atLeast(8 * 1024 * 1024), HIGH, METADATA_LOG_SEGMENT_BYTES_DOC) .define(METADATA_LOG_SEGMENT_MILLIS_CONFIG, LONG, METADATA_LOG_SEGMENT_MILLIS_DEFAULT, null, HIGH, METADATA_LOG_SEGMENT_MILLIS_DOC) .define(METADATA_MAX_RETENTION_BYTES_CONFIG, LONG, METADATA_MAX_RETENTION_BYTES_DEFAULT, null, HIGH, METADATA_MAX_RETENTION_BYTES_DOC) .define(METADATA_MAX_RETENTION_MILLIS_CONFIG, LONG, METADATA_MAX_RETENTION_MILLIS_DEFAULT, null, HIGH, METADATA_MAX_RETENTION_MILLIS_DOC) - .define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, METADATA_MAX_IDLE_INTERVAL_MS_DOC); + .define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, METADATA_MAX_IDLE_INTERVAL_MS_DOC) + .defineInternal(INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG, INT, null, null, LOW, INTERNAL_METADATA_LOG_SEGMENT_BYTES_DOC) + .defineInternal(INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG, INT, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, null, LOW, INTERNAL_MAX_BATCH_SIZE_IN_BYTES_DOC) + .defineInternal(INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG, INT, KafkaRaftClient.MAX_FETCH_SIZE_BYTES, null, LOW, INTERNAL_MAX_FETCH_SIZE_IN_BYTES_DOC) + .defineInternal(INTERNAL_DELETE_DELAY_MILLIS_CONFIG, LONG, ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT, null, LOW, INTERNAL_DELETE_DELAY_MILLIS_DOC); private final int logSegmentBytes; - private final int logSegmentMinBytes; + private final Integer internalSegmentBytes; private final long logSegmentMillis; private final long retentionMaxBytes; private final long retentionMillis; - private final int maxBatchSizeInBytes; - private final int maxFetchSizeInBytes; - private final long deleteDelayMillis; - - /** - * Configuration for the metadata log - * @param logSegmentBytes The maximum size of a single metadata log file - * @param logSegmentMinBytes The minimum size of a single metadata log file - * @param logSegmentMillis The maximum time before a new metadata log file is rolled out - * @param retentionMaxBytes The size of the metadata log and snapshots before deleting old snapshots and log files - * @param retentionMillis The time to keep a metadata log file or snapshot before deleting it - * @param maxBatchSizeInBytes The largest record batch size allowed in the metadata log - * @param maxFetchSizeInBytes The maximum number of bytes to read when fetching from the metadata log - * @param deleteDelayMillis The amount of time to wait before deleting a file from the filesystem - */ - public MetadataLogConfig(int logSegmentBytes, - int logSegmentMinBytes, - long logSegmentMillis, - long retentionMaxBytes, - long retentionMillis, - int maxBatchSizeInBytes, - int maxFetchSizeInBytes, - long deleteDelayMillis) { - this.logSegmentBytes = logSegmentBytes; - this.logSegmentMinBytes = logSegmentMinBytes; - this.logSegmentMillis = logSegmentMillis; - this.retentionMaxBytes = retentionMaxBytes; - this.retentionMillis = retentionMillis; - this.maxBatchSizeInBytes = maxBatchSizeInBytes; - this.maxFetchSizeInBytes = maxFetchSizeInBytes; - this.deleteDelayMillis = deleteDelayMillis; - } + private final int internalMaxBatchSizeInBytes; + private final int internalMaxFetchSizeInBytes; + private final long internalDeleteDelayMillis; public MetadataLogConfig(AbstractConfig config) { this.logSegmentBytes = config.getInt(METADATA_LOG_SEGMENT_BYTES_CONFIG); - this.logSegmentMinBytes = config.getInt(METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG); + this.internalSegmentBytes = config.getInt(INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG); this.logSegmentMillis = config.getLong(METADATA_LOG_SEGMENT_MILLIS_CONFIG); this.retentionMaxBytes = config.getLong(METADATA_MAX_RETENTION_BYTES_CONFIG); this.retentionMillis = config.getLong(METADATA_MAX_RETENTION_MILLIS_CONFIG); - this.maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES; - this.maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES; - this.deleteDelayMillis = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT; + this.internalMaxBatchSizeInBytes = config.getInt(INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG); + this.internalMaxFetchSizeInBytes = config.getInt(INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG); + this.internalDeleteDelayMillis = config.getLong(INTERNAL_DELETE_DELAY_MILLIS_CONFIG); } public int logSegmentBytes() { return logSegmentBytes; } - - public int logSegmentMinBytes() { - return logSegmentMinBytes; + + public Integer internalSegmentBytes() { + return internalSegmentBytes; } public long logSegmentMillis() { @@ -160,15 +141,15 @@ public class MetadataLogConfig { return retentionMillis; } - public int maxBatchSizeInBytes() { - return maxBatchSizeInBytes; + public int internalMaxBatchSizeInBytes() { + return internalMaxBatchSizeInBytes; } - public int maxFetchSizeInBytes() { - return maxFetchSizeInBytes; + public int internalMaxFetchSizeInBytes() { + return internalMaxFetchSizeInBytes; } - public long deleteDelayMillis() { - return deleteDelayMillis; + public long internalDeleteDelayMillis() { + return internalDeleteDelayMillis; } } diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java index 5394d2d2c3a..c05f9f2816a 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java @@ -47,7 +47,6 @@ public final class ServerTopicConfigSynonyms { * both the first and the second synonyms are configured, we will use only the value of * the first synonym and ignore the second. */ - // Topic configs with no mapping to a server config can be found in `LogConfig.CONFIGS_WITH_NO_SERVER_DEFAULTS` public static final Map> ALL_TOPIC_CONFIG_SYNONYMS = Utils.mkMap( sameNameWithLogPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), listWithLogPrefix(TopicConfig.SEGMENT_MS_CONFIG, diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java index b7d4ccedb2e..54ad6a4e79a 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java @@ -171,7 +171,7 @@ public class Cleaner { List> groupedSegments = groupSegmentsBySize( log.logSegments(0, endOffset), - log.config().segmentSize, + log.config().segmentSize(), log.config().maxIndexSize, cleanable.firstUncleanableOffset() ); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index d24dbd953d3..eb7d0eaa5d4 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.record.LegacyRecord; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.ConfigUtils; import org.apache.kafka.common.utils.Utils; @@ -140,14 +139,8 @@ public class LogConfig extends AbstractConfig { public static final long DEFAULT_LOCAL_RETENTION_BYTES = -2; // It indicates the value to be derived from RetentionBytes public static final long DEFAULT_LOCAL_RETENTION_MS = -2; // It indicates the value to be derived from RetentionMs - // Visible for testing - public static final Set CONFIGS_WITH_NO_SERVER_DEFAULTS = Set.of( - TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, - TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, - TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, - QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, - QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG - ); + public static final String INTERNAL_SEGMENT_BYTES_CONFIG = "internal.segment.bytes"; + public static final String INTERNAL_SEGMENT_BYTES_DOC = "The maximum size of a single log file. This should be used for testing only."; public static final ConfigDef SERVER_CONFIG_DEF = new ConfigDef() .define(ServerLogConfigs.NUM_PARTITIONS_CONFIG, INT, ServerLogConfigs.NUM_PARTITIONS_DEFAULT, atLeast(1), MEDIUM, ServerLogConfigs.NUM_PARTITIONS_DOC) @@ -191,7 +184,7 @@ public class LogConfig extends AbstractConfig { private static final LogConfigDef CONFIG = new LogConfigDef(); static { CONFIG. - define(TopicConfig.SEGMENT_BYTES_CONFIG, INT, DEFAULT_SEGMENT_BYTES, atLeast(LegacyRecord.RECORD_OVERHEAD_V0), MEDIUM, + define(TopicConfig.SEGMENT_BYTES_CONFIG, INT, DEFAULT_SEGMENT_BYTES, atLeast(1024 * 1024), MEDIUM, TopicConfig.SEGMENT_BYTES_DOC) .define(TopicConfig.SEGMENT_MS_CONFIG, LONG, DEFAULT_SEGMENT_MS, atLeast(1), MEDIUM, TopicConfig.SEGMENT_MS_DOC) .define(TopicConfig.SEGMENT_JITTER_MS_CONFIG, LONG, DEFAULT_SEGMENT_JITTER_MS, atLeast(0), MEDIUM, @@ -253,7 +246,8 @@ public class LogConfig extends AbstractConfig { .define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM, TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC) .define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC) - .define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC); + .define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC) + .defineInternal(INTERNAL_SEGMENT_BYTES_CONFIG, INT, null, null, MEDIUM, INTERNAL_SEGMENT_BYTES_DOC); } public final Set overriddenConfigs; @@ -262,7 +256,8 @@ public class LogConfig extends AbstractConfig { * Important note: Any configuration parameter that is passed along from KafkaConfig to LogConfig * should also be in `KafkaConfig#extractLogConfigMap`. */ - public final int segmentSize; + private final int segmentSize; + private final Integer internalSegmentSize; public final long segmentMs; public final long segmentJitterMs; public final int maxIndexSize; @@ -306,6 +301,7 @@ public class LogConfig extends AbstractConfig { this.overriddenConfigs = Collections.unmodifiableSet(overriddenConfigs); this.segmentSize = getInt(TopicConfig.SEGMENT_BYTES_CONFIG); + this.internalSegmentSize = getInt(INTERNAL_SEGMENT_BYTES_CONFIG); this.segmentMs = getLong(TopicConfig.SEGMENT_MS_CONFIG); this.segmentJitterMs = getLong(TopicConfig.SEGMENT_JITTER_MS_CONFIG); this.maxIndexSize = getInt(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG); @@ -367,6 +363,11 @@ public class LogConfig extends AbstractConfig { } } + public int segmentSize() { + if (internalSegmentSize == null) return segmentSize; + return internalSegmentSize; + } + // Exposed as a method so it can be mocked public int maxMessageSize() { return maxMessageSize; @@ -388,7 +389,7 @@ public class LogConfig extends AbstractConfig { public int initFileSize() { if (preallocate) - return segmentSize; + return segmentSize(); else return 0; } @@ -446,8 +447,12 @@ public class LogConfig extends AbstractConfig { return CONFIG.names().stream().sorted().toList(); } - public static Optional serverConfigName(String configName) { - return CONFIG.serverConfigName(configName); + public static List nonInternalConfigNames() { + return CONFIG.configKeys().entrySet() + .stream() + .filter(entry -> !entry.getValue().internalConfig) + .map(Map.Entry::getKey) + .sorted().toList(); } public static Map configKeys() { @@ -630,7 +635,7 @@ public class LogConfig extends AbstractConfig { @Override public String toString() { return "LogConfig{" + - "segmentSize=" + segmentSize + + "segmentSize=" + segmentSize() + ", segmentMs=" + segmentMs + ", segmentJitterMs=" + segmentJitterMs + ", maxIndexSize=" + maxIndexSize + diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index 85e85212b11..fc06199de8f 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -1181,9 +1181,9 @@ public class UnifiedLog implements AutoCloseable { }); // check messages size does not exceed config.segmentSize - if (validRecords.sizeInBytes() > config().segmentSize) { + if (validRecords.sizeInBytes() > config().segmentSize()) { throw new RecordBatchTooLargeException("Message batch size is " + validRecords.sizeInBytes() + " bytes in append " + - "to partition " + topicPartition() + ", which exceeds the maximum configured segment size of " + config().segmentSize + "."); + "to partition " + topicPartition() + ", which exceeds the maximum configured segment size of " + config().segmentSize() + "."); } // maybe roll the log if this segment is full @@ -2038,12 +2038,12 @@ public class UnifiedLog implements AutoCloseable { long maxTimestampInMessages = appendInfo.maxTimestamp(); long maxOffsetInMessages = appendInfo.lastOffset(); - if (segment.shouldRoll(new RollParams(config().maxSegmentMs(), config().segmentSize, appendInfo.maxTimestamp(), appendInfo.lastOffset(), messagesSize, now))) { + if (segment.shouldRoll(new RollParams(config().maxSegmentMs(), config().segmentSize(), appendInfo.maxTimestamp(), appendInfo.lastOffset(), messagesSize, now))) { logger.debug("Rolling new log segment (log_size = {}/{}}, " + "offset_index_size = {}/{}, " + "time_index_size = {}/{}, " + "inactive_time_ms = {}/{}).", - segment.size(), config().segmentSize, + segment.size(), config().segmentSize(), segment.offsetIndex().entries(), segment.offsetIndex().maxEntries(), segment.timeIndex().entries(), segment.timeIndex().maxEntries(), segment.timeWaitedForRoll(now, maxTimestampInMessages), config().segmentMs - segment.rollJitterMs()); diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java index 7dfba8f7a59..86ff278012e 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java @@ -198,7 +198,7 @@ class LocalLogTest { assertEquals(oldConfig, log.config()); Properties props = new Properties(); - props.put(TopicConfig.SEGMENT_BYTES_CONFIG, oldConfig.segmentSize + 1); + props.put(TopicConfig.SEGMENT_BYTES_CONFIG, oldConfig.segmentSize() + 1); LogConfig newConfig = new LogConfig(props); log.updateConfig(newConfig); assertEquals(newConfig, log.config()); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java index 6023706847f..67203b89266 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -114,7 +115,7 @@ public class PurgeRepartitionTopicIntegrationTest { .get(); return config.get(TopicConfig.CLEANUP_POLICY_CONFIG).value().equals(TopicConfig.CLEANUP_POLICY_DELETE) && config.get(TopicConfig.SEGMENT_MS_CONFIG).value().equals(PURGE_INTERVAL_MS.toString()) - && config.get(TopicConfig.SEGMENT_BYTES_CONFIG).value().equals(PURGE_SEGMENT_BYTES.toString()); + && config.get(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG).value().equals(PURGE_SEGMENT_BYTES.toString()); } catch (final Exception e) { return false; } @@ -171,7 +172,7 @@ public class PurgeRepartitionTopicIntegrationTest { streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(APPLICATION_ID).getPath()); streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), PURGE_INTERVAL_MS); - streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), PURGE_SEGMENT_BYTES); + streamsConfiguration.put(StreamsConfig.topicPrefix(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG), PURGE_SEGMENT_BYTES); streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), PURGE_SEGMENT_BYTES / 2); // we cannot allow batch size larger than segment size final StreamsBuilder builder = new StreamsBuilder(); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index f081a768815..f1ee8df371d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -248,7 +248,7 @@ public class StreamsConfigTest { props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 99_999L); props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, 7L); props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:host"); - props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100); + props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 1024 * 1024); final StreamsConfig streamsConfig = new StreamsConfig(props); final Map returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); @@ -263,7 +263,7 @@ public class StreamsConfigTest { ); assertEquals(7L, returnedProps.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); assertEquals("dummy:host", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG)); - assertEquals(100, returnedProps.get(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG))); + assertEquals(1024 * 1024, returnedProps.get(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG))); } @Test diff --git a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java index 9c67917d76a..4eee239069c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java @@ -769,7 +769,7 @@ public abstract class TopicCommand { .ofType(String.class); nl = System.lineSeparator(); - String logConfigNames = LogConfig.configNames().stream().map(config -> "\t" + config).collect(Collectors.joining(nl)); + String logConfigNames = LogConfig.nonInternalConfigNames().stream().map(config -> "\t" + config).collect(Collectors.joining(nl)); configOpt = parser.accepts("config", "A topic configuration override for the topic being created." + " The following is a list of valid configurations: " + nl + logConfigNames + nl + "See the Kafka documentation for full details on the topic configs." + diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java index 5bb23cabdd9..5ee3b385640 100644 --- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java @@ -40,6 +40,7 @@ import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig; import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; +import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.test.TestUtils; import java.time.Duration; @@ -97,7 +98,7 @@ public class GetOffsetShellTest { Map rlsConfigs = new HashMap<>(); rlsConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"); rlsConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1"); - rlsConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "100"); + rlsConfigs.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "100"); setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs); sendProducerRecords(this::getRemoteLogStorageEnabledTopicName); } diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java index deb57ae8cbc..dcfe861b757 100644 --- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java @@ -20,6 +20,7 @@ package org.apache.kafka.tools; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientTestUtils; import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.CreatePartitionsResult; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.DeleteTopicsOptions; @@ -56,6 +57,7 @@ import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.Type; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.metadata.LeaderAndIsr; +import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Assertions; @@ -82,6 +84,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -1400,6 +1403,27 @@ public class TopicCommandTest { } } + @ClusterTest + public void testCreateWithInternalConfig(ClusterInstance cluster) throws InterruptedException, ExecutionException { + String internalConfigTopicName = TestUtils.randomString(10); + String testTopicName = TestUtils.randomString(10); + + try (Admin adminClient = cluster.admin()) { + CreateTopicsResult internalResult = adminClient.createTopics(List.of(new NewTopic(internalConfigTopicName, defaultNumPartitions, defaultReplicationFactor).configs( + Map.of(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "1000") + ))); + + ConfigEntry internalConfigEntry = internalResult.config(internalConfigTopicName).get().get(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG); + assertNotNull(internalConfigEntry, "Internal config entry should not be null"); + assertEquals("1000", internalConfigEntry.value()); + + CreateTopicsResult nonInternalResult = adminClient.createTopics(List.of(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor))); + + ConfigEntry nonInternalConfigEntry = nonInternalResult.config(testTopicName).get().get(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG); + assertNull(nonInternalConfigEntry, "Non-internal config entry should be null"); + } + } + private void checkReplicaDistribution(Map> assignment, Map brokerRackMapping, Integer numBrokers,