KAFKA-19080 The constraint on segment.ms is not enforced at topic level (#19371)
CI / build (push) Waiting to run Details

The main issue was that we forgot to set
`TopicConfig.SEGMENT_BYTES_CONFIG` to at least `1024 * 1024`, which
caused problems in tests with small segment sizes.

To address this, we introduced a new internal config:
`LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG`, allowing us to set smaller
segment bytes specifically for testing purposes.

We also updated the logic so that if a user configures the topic-level
segment bytes without explicitly setting the internal config, the
internal value will no longer be returned to the user.

In addition, we removed
`MetadataLogConfig#METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG` and added
three new internal configurations:
- `INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG`
- `INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG`
- `INTERNAL_DELETE_DELAY_MILLIS_CONFIG`

Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2025-05-25 20:57:22 +08:00 committed by GitHub
parent 69a457d8a5
commit bcda92b5b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 263 additions and 240 deletions

View File

@ -430,6 +430,7 @@
<subpackage name="integration">
<allow pkg="org.apache.kafka.common.test"/>
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="org.apache.kafka.storage"/>
<allow pkg="kafka.admin" />
<allow pkg="kafka.api" />
<allow pkg="kafka.cluster" />

View File

@ -529,7 +529,7 @@ object ConfigCommand extends Logging {
private val nl: String = System.lineSeparator()
val addConfig: OptionSpec[String] = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " +
"For entity-type '" + TopicType + "': " + LogConfig.configNames.asScala.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + TopicType + "': " + LogConfig.nonInternalConfigNames.asScala.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + BrokerType + "': " + DynamicConfig.Broker.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + UserType + "': " + QuotaConfig.scramMechanismsPlusUserAndClientQuotaConfigs().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ClientType + "': " + QuotaConfig.userAndClientQuotaConfigs().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +

View File

@ -29,7 +29,7 @@ import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, MetadataLogConfig, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, MetadataLogConfig, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.storage.log.FetchIsolation
@ -73,7 +73,7 @@ final class KafkaMetadataLog private (
case _ => throw new IllegalArgumentException(s"Unhandled read isolation $readIsolation")
}
val fetchInfo = log.read(startOffset, config.maxFetchSizeInBytes, isolation, true)
val fetchInfo = log.read(startOffset, config.internalMaxFetchSizeInBytes, isolation, true)
new LogFetchInfo(
fetchInfo.records,
@ -557,7 +557,7 @@ final class KafkaMetadataLog private (
scheduler.scheduleOnce(
"delete-snapshot-files",
() => KafkaMetadataLog.deleteSnapshotFiles(log.dir.toPath, expiredSnapshots),
config.deleteDelayMillis
config.internalDeleteDelayMillis
)
}
}
@ -588,9 +588,11 @@ object KafkaMetadataLog extends Logging {
nodeId: Int
): KafkaMetadataLog = {
val props = new Properties()
props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, config.maxBatchSizeInBytes.toString)
props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString)
props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, config.logSegmentMillis.toString)
props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, config.internalMaxBatchSizeInBytes.toString)
if (config.internalSegmentBytes() != null)
props.setProperty(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, config.internalSegmentBytes().toString)
else
props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString)
props.setProperty(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT.toString)
// Disable time and byte retention when deleting segments
@ -599,11 +601,7 @@ object KafkaMetadataLog extends Logging {
LogConfig.validate(props)
val defaultLogConfig = new LogConfig(props)
if (config.logSegmentBytes < config.logSegmentMinBytes) {
throw new InvalidConfigurationException(
s"Cannot set ${MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG} below ${config.logSegmentMinBytes}: ${config.logSegmentBytes}"
)
} else if (defaultLogConfig.retentionMs >= 0) {
if (defaultLogConfig.retentionMs >= 0) {
throw new InvalidConfigurationException(
s"Cannot set ${TopicConfig.RETENTION_MS_CONFIG} above -1: ${defaultLogConfig.retentionMs}."
)
@ -639,12 +637,6 @@ object KafkaMetadataLog extends Logging {
nodeId
)
// Print a warning if users have overridden the internal config
if (config.logSegmentMinBytes != KafkaRaftClient.MAX_BATCH_SIZE_BYTES) {
metadataLog.error(s"Overriding ${MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG} is only supported for testing. Setting " +
s"this value too low may lead to an inability to write batches of metadata records.")
}
// When recovering, truncate fully if the latest snapshot is after the log end offset. This can happen to a follower
// when the follower crashes after downloading a snapshot from the leader but before it could truncate the log fully.
metadataLog.truncateToLatestSnapshot()

View File

@ -1567,7 +1567,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testDeleteRecordsAfterCorruptRecords(groupProtocol: String): Unit = {
val config = new Properties()
config.put(TopicConfig.SEGMENT_BYTES_CONFIG, "200")
config.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "200")
createTopic(topic, numPartitions = 1, replicationFactor = 1, config)
client = createAdminClient

View File

@ -566,7 +566,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
client.createAcls(java.util.List.of(denyAcl), new CreateAclsOptions()).all().get()
val topics = Seq(topic1, topic2)
val configsOverride = java.util.Map.of(TopicConfig.SEGMENT_BYTES_CONFIG, "100000")
val configsOverride = java.util.Map.of(TopicConfig.SEGMENT_BYTES_CONFIG, "3000000")
val newTopics = java.util.List.of(
new NewTopic(topic1, 2, 3.toShort).configs(configsOverride),
new NewTopic(topic2, Optional.empty[Integer], Optional.empty[java.lang.Short]).configs(configsOverride))
@ -580,7 +580,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
val topicConfigs = result.config(topic1).get().entries.asScala
assertTrue(topicConfigs.nonEmpty)
val segmentBytesConfig = topicConfigs.find(_.name == TopicConfig.SEGMENT_BYTES_CONFIG).get
assertEquals(100000, segmentBytesConfig.value.toLong)
assertEquals(3000000, segmentBytesConfig.value.toLong)
assertEquals(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG, segmentBytesConfig.source)
val compressionConfig = topicConfigs.find(_.name == TopicConfig.COMPRESSION_TYPE_CONFIG).get
assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT, compressionConfig.value)

View File

@ -654,7 +654,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
"Config not updated in LogManager")
val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found"))
TestUtils.waitUntilTrue(() => log.config.segmentSize == 1048576, "Existing topic config using defaults not updated")
TestUtils.waitUntilTrue(() => log.config.segmentSize() == 1048576, "Existing topic config using defaults not updated")
val KafkaConfigToLogConfigName: Map[String, String] =
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) }
props.asScala.foreach { case (k, v) =>

View File

@ -20,7 +20,7 @@ import kafka.server.{KafkaConfig, KafkaRaftServer}
import kafka.utils.TestUtils
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.errors.{InvalidConfigurationException, RecordTooLargeException}
import org.apache.kafka.common.errors.RecordTooLargeException
import org.apache.kafka.common.protocol
import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable}
import org.apache.kafka.common.record.ArbitraryMemoryRecords
@ -43,6 +43,7 @@ import org.junit.jupiter.params.provider.ArgumentsSource
import net.jqwik.api.AfterFailureMode
import net.jqwik.api.ForAll
import net.jqwik.api.Property
import org.apache.kafka.common.config.{AbstractConfig, ConfigException}
import org.apache.kafka.server.common.OffsetAndEpoch
import java.io.File
@ -78,13 +79,13 @@ final class KafkaMetadataLogTest {
props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, Int.box(10240))
props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG, Int.box(10 * 1024))
assertThrows(classOf[InvalidConfigurationException], () => {
assertThrows(classOf[ConfigException], () => {
val kafkaConfig = KafkaConfig.fromProps(props)
val metadataConfig = new MetadataLogConfig(kafkaConfig)
buildMetadataLog(tempDir, mockTime, metadataConfig)
})
props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, Int.box(10240))
props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, Int.box(10 * 1024 * 1024))
val kafkaConfig = KafkaConfig.fromProps(props)
val metadataConfig = new MetadataLogConfig(kafkaConfig)
buildMetadataLog(tempDir, mockTime, metadataConfig)
@ -478,7 +479,7 @@ final class KafkaMetadataLogTest {
assertEquals(log.earliestSnapshotId(), log.latestSnapshotId())
log.close()
mockTime.sleep(config.deleteDelayMillis)
mockTime.sleep(config.internalDeleteDelayMillis)
// Assert that the log dir doesn't contain any older snapshots
Files
.walk(logDir, 1)
@ -649,7 +650,7 @@ final class KafkaMetadataLogTest {
assertEquals(greaterSnapshotId, secondLog.latestSnapshotId().get)
assertEquals(3 * numberOfRecords, secondLog.startOffset)
assertEquals(epoch, secondLog.lastFetchedEpoch)
mockTime.sleep(config.deleteDelayMillis)
mockTime.sleep(config.internalDeleteDelayMillis)
// Assert that the log dir doesn't contain any older snapshots
Files
@ -687,15 +688,12 @@ final class KafkaMetadataLogTest {
val leaderEpoch = 5
val maxBatchSizeInBytes = 16384
val recordSize = 64
val config = new MetadataLogConfig(
val config = createMetadataLogConfig(
DefaultMetadataLogConfig.logSegmentBytes,
DefaultMetadataLogConfig.logSegmentMinBytes,
DefaultMetadataLogConfig.logSegmentMillis,
DefaultMetadataLogConfig.retentionMaxBytes,
DefaultMetadataLogConfig.retentionMillis,
maxBatchSizeInBytes,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
DefaultMetadataLogConfig.deleteDelayMillis
maxBatchSizeInBytes
)
val log = buildMetadataLog(tempDir, mockTime, config)
@ -907,15 +905,13 @@ final class KafkaMetadataLogTest {
@Test
def testAdvanceLogStartOffsetAfterCleaning(): Unit = {
val config = new MetadataLogConfig(
512,
val config = createMetadataLogConfig(
512,
10 * 1000,
256,
60 * 1000,
512,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT
DefaultMetadataLogConfig.internalMaxFetchSizeInBytes,
)
val log = buildMetadataLog(tempDir, mockTime, config)
@ -944,15 +940,12 @@ final class KafkaMetadataLogTest {
@Test
def testDeleteSnapshots(): Unit = {
// Generate some logs and a few snapshots, set retention low and verify that cleaning occurs
val config = new MetadataLogConfig(
1024,
val config = createMetadataLogConfig(
1024,
10 * 1000,
1024,
60 * 1000,
100,
DefaultMetadataLogConfig.maxBatchSizeInBytes,
DefaultMetadataLogConfig.maxFetchSizeInBytes
)
val log = buildMetadataLog(tempDir, mockTime, config)
@ -978,15 +971,12 @@ final class KafkaMetadataLogTest {
@Test
def testSoftRetentionLimit(): Unit = {
// Set retention equal to the segment size and generate slightly more than one segment of logs
val config = new MetadataLogConfig(
10240,
val config = createMetadataLogConfig(
10240,
10 * 1000,
10240,
60 * 1000,
100,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
DefaultMetadataLogConfig.deleteDelayMillis
)
val log = buildMetadataLog(tempDir, mockTime, config)
@ -1022,15 +1012,12 @@ final class KafkaMetadataLogTest {
@Test
def testSegmentsLessThanLatestSnapshot(): Unit = {
val config = new MetadataLogConfig(
10240,
val config = createMetadataLogConfig(
10240,
10 * 1000,
10240,
60 * 1000,
200,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
DefaultMetadataLogConfig.deleteDelayMillis
)
val log = buildMetadataLog(tempDir, mockTime, config)
@ -1081,15 +1068,11 @@ object KafkaMetadataLogTest {
override def read(input: protocol.Readable, size: Int): Array[Byte] = input.readArray(size)
}
val DefaultMetadataLogConfig = new MetadataLogConfig(
100 * 1024,
val DefaultMetadataLogConfig = createMetadataLogConfig(
100 * 1024,
10 * 1000,
100 * 1024,
60 * 1000,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT
)
def buildMetadataLogAndDir(
@ -1166,4 +1149,25 @@ object KafkaMetadataLogTest {
}
dir
}
private def createMetadataLogConfig(
internalLogSegmentBytes: Int,
logSegmentMillis: Long,
retentionMaxBytes: Long,
retentionMillis: Long,
internalMaxBatchSizeInBytes: Int = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
internalMaxFetchSizeInBytes: Int = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
internalDeleteDelayMillis: Long = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT
): MetadataLogConfig = {
val config: util.Map[String, Any] = util.Map.of(
MetadataLogConfig.INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG, internalLogSegmentBytes,
MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG, logSegmentMillis,
MetadataLogConfig.METADATA_MAX_RETENTION_BYTES_CONFIG, retentionMaxBytes,
MetadataLogConfig.METADATA_MAX_RETENTION_MILLIS_CONFIG, retentionMillis,
MetadataLogConfig.INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG, internalMaxBatchSizeInBytes,
MetadataLogConfig.INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG, internalMaxFetchSizeInBytes,
MetadataLogConfig.INTERNAL_DELETE_DELAY_MILLIS_CONFIG, internalDeleteDelayMillis,
)
new MetadataLogConfig(new AbstractConfig(MetadataLogConfig.CONFIG_DEF, config, false))
}
}

View File

@ -100,7 +100,7 @@ class AbstractPartitionTest {
def createLogProperties(overrides: Map[String, String]): Properties = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 512: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 512: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1000: java.lang.Integer)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, 999: java.lang.Integer)
overrides.foreach { case (k, v) => logProps.put(k, v) }

View File

@ -362,7 +362,7 @@ class PartitionLockTest extends Logging {
private def createLogProperties(overrides: Map[String, String]): Properties = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 512: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 512: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1000: java.lang.Integer)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, 999: java.lang.Integer)
overrides.foreach { case (k, v) => logProps.put(k, v) }

View File

@ -71,7 +71,7 @@ abstract class AbstractLogCleanerIntegrationTest {
maxCompactionLagMs: Long = defaultMaxCompactionLagMs): Properties = {
val props = new Properties()
props.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageSize: java.lang.Integer)
props.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentSize: java.lang.Integer)
props.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentSize: java.lang.Integer)
props.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 100*1024: java.lang.Integer)
props.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, deleteDelay: java.lang.Integer)
props.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)

View File

@ -53,7 +53,7 @@ class LogCleanerManagerTest extends Logging {
val topicPartition = new TopicPartition("log", 0)
val topicPartition2 = new TopicPartition("log2", 0)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
val logConfig: LogConfig = new LogConfig(logProps)
@ -370,7 +370,7 @@ class LogCleanerManagerTest extends Logging {
// change cleanup policy from delete to compact
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, log.config.segmentSize: Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, log.config.segmentSize(): Integer)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, log.config.retentionMs: java.lang.Long)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0: Integer)
@ -548,7 +548,7 @@ class LogCleanerManagerTest extends Logging {
@Test
def testCleanableOffsetsForNone(): Unit = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -570,7 +570,7 @@ class LogCleanerManagerTest extends Logging {
@Test
def testCleanableOffsetsActiveSegment(): Unit = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -592,7 +592,7 @@ class LogCleanerManagerTest extends Logging {
def testCleanableOffsetsForTime(): Unit = {
val compactionLag = 60 * 60 * 1000
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -625,7 +625,7 @@ class LogCleanerManagerTest extends Logging {
def testCleanableOffsetsForShortTime(): Unit = {
val compactionLag = 60 * 60 * 1000
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -667,7 +667,7 @@ class LogCleanerManagerTest extends Logging {
def testUndecidedTransactionalDataNotCleanable(): Unit = {
val compactionLag = 60 * 60 * 1000
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -711,7 +711,7 @@ class LogCleanerManagerTest extends Logging {
@Test
def testDoneCleaning(): Unit = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
while (log.numberOfSegments < 8)
log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), 0)
@ -830,7 +830,7 @@ class LogCleanerManagerTest extends Logging {
private def createLowRetentionLogConfig(segmentSize: Int, cleanupPolicy: String): LogConfig = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentSize: Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentSize: Integer)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, 1: Integer)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicy)
logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0.05: java.lang.Double) // small for easier and clearer tests

View File

@ -56,7 +56,7 @@ class LogCleanerTest extends Logging {
val tmpdir = TestUtils.tempDir()
val dir = TestUtils.randomPartitionLogDir(tmpdir)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
val logConfig = new LogConfig(logProps)
@ -148,7 +148,7 @@ class LogCleanerTest extends Logging {
def testCleanSegments(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -181,7 +181,7 @@ class LogCleanerTest extends Logging {
// Construct a log instance. The replaceSegments() method of the log instance is overridden so that
// it waits for another thread to execute deleteOldSegments()
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024 : java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024 : java.lang.Integer)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE)
val config = LogConfig.fromProps(logConfig.originals, logProps)
val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
@ -271,7 +271,7 @@ class LogCleanerTest extends Logging {
val originalMaxFileSize = 1024
val cleaner = makeCleaner(2)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, originalMaxFileSize: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, originalMaxFileSize: java.lang.Integer)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact": java.lang.String)
logProps.put(TopicConfig.PREALLOCATE_CONFIG, "true": java.lang.String)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -295,7 +295,7 @@ class LogCleanerTest extends Logging {
def testDuplicateCheckAfterCleaning(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 2048: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 2048: java.lang.Integer)
var log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
@ -464,7 +464,7 @@ class LogCleanerTest extends Logging {
def testBasicTransactionAwareCleaning(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 2048: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 2048: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
@ -497,7 +497,7 @@ class LogCleanerTest extends Logging {
def testCleanWithTransactionsSpanningSegments(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
@ -543,7 +543,7 @@ class LogCleanerTest extends Logging {
def testCommitMarkerRemoval(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
@ -591,7 +591,7 @@ class LogCleanerTest extends Logging {
val cleaner = makeCleaner(capacity = Int.MaxValue, maxMessageSize = 100)
val logProps = new Properties()
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 100: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1000: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1000: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
@ -619,7 +619,7 @@ class LogCleanerTest extends Logging {
def testCommitMarkerRetentionWithEmptyBatch(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
@ -682,7 +682,7 @@ class LogCleanerTest extends Logging {
def testCleanEmptyControlBatch(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
@ -714,7 +714,7 @@ class LogCleanerTest extends Logging {
def testCommittedTransactionSpanningSegments(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 128: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 128: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
val producerId = 1L
@ -736,7 +736,7 @@ class LogCleanerTest extends Logging {
def testAbortedTransactionSpanningSegments(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 128: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 128: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
val producerId = 1L
@ -765,7 +765,7 @@ class LogCleanerTest extends Logging {
def testAbortMarkerRemoval(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
@ -799,7 +799,7 @@ class LogCleanerTest extends Logging {
val producerId = 1L
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 2048: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 2048: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val appendFirstTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch, 0, AppendOrigin.REPLICATION)
@ -832,7 +832,7 @@ class LogCleanerTest extends Logging {
def testAbortMarkerRetentionWithEmptyBatch(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
@ -896,7 +896,7 @@ class LogCleanerTest extends Logging {
// Create cleaner with very small default max message size
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, largeMessageSize * 16: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, largeMessageSize * 16: java.lang.Integer)
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, largeMessageSize * 2: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -968,7 +968,7 @@ class LogCleanerTest extends Logging {
def createLogWithMessagesLargerThanMaxSize(largeMessageSize: Int): (UnifiedLog, FakeOffsetMap) = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, largeMessageSize * 16: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, largeMessageSize * 16: java.lang.Integer)
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, largeMessageSize * 2: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -994,7 +994,7 @@ class LogCleanerTest extends Logging {
def testCleaningWithDeletes(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -1021,7 +1021,7 @@ class LogCleanerTest extends Logging {
// because loadFactor is 0.75, this means we can fit 3 messages in the map
val cleaner = makeCleaner(4)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -1050,7 +1050,7 @@ class LogCleanerTest extends Logging {
def testLogCleanerRetainsProducerLastSequence(): Unit = {
val cleaner = makeCleaner(10)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
log.appendAsLeader(record(0, 0), 0) // offset 0
@ -1073,7 +1073,7 @@ class LogCleanerTest extends Logging {
def testLogCleanerRetainsLastSequenceEvenIfTransactionAborted(): Unit = {
val cleaner = makeCleaner(10)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
@ -1107,7 +1107,7 @@ class LogCleanerTest extends Logging {
def testCleaningWithKeysConflictingWithTxnMarkerKeys(): Unit = {
val cleaner = makeCleaner(10)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val leaderEpoch = 5
val producerEpoch = 0.toShort
@ -1151,7 +1151,7 @@ class LogCleanerTest extends Logging {
// because loadFactor is 0.75, this means we can fit 1 message in the map
val cleaner = makeCleaner(2)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -1182,7 +1182,7 @@ class LogCleanerTest extends Logging {
def testCleaningWithUncleanableSection(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -1225,7 +1225,7 @@ class LogCleanerTest extends Logging {
def testLogToClean(): Unit = {
// create a log with small segment size
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 100: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 100: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// create 6 segments with only one message in each segment
@ -1243,7 +1243,7 @@ class LogCleanerTest extends Logging {
def testLogToCleanWithUncleanableSection(): Unit = {
// create a log with small segment size
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 100: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 100: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// create 6 segments with only one message in each segment
@ -1276,7 +1276,7 @@ class LogCleanerTest extends Logging {
// create a log with compaction turned off so we can append unkeyed messages
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -1334,7 +1334,7 @@ class LogCleanerTest extends Logging {
def testCleanSegmentsWithAbort(): Unit = {
val cleaner = makeCleaner(Int.MaxValue, abortCheckDone)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -1356,7 +1356,7 @@ class LogCleanerTest extends Logging {
def testCleanSegmentsRetainingLastEmptyBatch(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -1388,7 +1388,7 @@ class LogCleanerTest extends Logging {
def testSegmentGrouping(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 300: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 300: java.lang.Integer)
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -1489,7 +1489,7 @@ class LogCleanerTest extends Logging {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 400: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 400: java.lang.Integer)
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@ -1541,7 +1541,7 @@ class LogCleanerTest extends Logging {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 400: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 400: java.lang.Integer)
//mimic the effect of loading an empty index file
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 400: java.lang.Integer)
@ -1666,7 +1666,7 @@ class LogCleanerTest extends Logging {
def testRecoveryAfterCrash(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 300: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 300: java.lang.Integer)
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1: java.lang.Integer)
logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, 10: java.lang.Integer)
@ -1797,7 +1797,7 @@ class LogCleanerTest extends Logging {
def testBuildOffsetMapFakeLarge(): Unit = {
val map = new FakeOffsetMap(1000)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 120: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 120: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 120: java.lang.Integer)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
val logConfig = new LogConfig(logProps)
@ -1945,7 +1945,7 @@ class LogCleanerTest extends Logging {
@Test
def testCleaningBeyondMissingOffsets(): Unit = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024*1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024*1024: java.lang.Integer)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
val logConfig = new LogConfig(logProps)
val cleaner = makeCleaner(Int.MaxValue)

View File

@ -20,7 +20,6 @@ package kafka.log
import java.util.{Optional, Properties}
import java.util.concurrent.{Callable, Executors}
import kafka.utils.TestUtils
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.SimpleRecord
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
@ -60,7 +59,7 @@ class LogConcurrencyTest {
@Test
def testUncommittedDataNotConsumedFrequentSegmentRolls(): Unit = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 237: Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 237: Integer)
val logConfig = new LogConfig(logProps)
testUncommittedDataNotConsumed(createLog(logConfig))
}

View File

@ -34,29 +34,8 @@ import org.apache.kafka.storage.internals.log.{LogConfig, ThrottledReplicaListVa
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
class LogConfigTest {
/**
* This test verifies that KafkaConfig object initialization does not depend on
* LogConfig initialization. Bad things happen due to static initialization
* order dependencies. For example, LogConfig.configDef ends up adding null
* values in serverDefaultConfigNames. This test ensures that the mapping of
* keys from LogConfig to KafkaConfig are not missing values.
*/
@Test
def ensureNoStaticInitializationOrderDependency(): Unit = {
// Access any KafkaConfig val to load KafkaConfig object before LogConfig.
assertNotNull(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG)
assertTrue(LogConfig.configNames.asScala
.filter(config => !LogConfig.CONFIGS_WITH_NO_SERVER_DEFAULTS.contains(config))
.forall { config =>
val serverConfigOpt = LogConfig.serverConfigName(config)
serverConfigOpt.isPresent && (serverConfigOpt.get != null)
})
}
@Test
def testKafkaConfigToProps(): Unit = {
val millisInHour = 60L * 60L * 1000L
@ -94,6 +73,7 @@ class LogConfigTest {
case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-0.1")
case TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0")
case TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0")
case LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG => // no op
case _ => assertPropertyInvalid(name, "not_a_number", "-1")
})

View File

@ -21,7 +21,6 @@ import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, MemoryRecords, RecordBatch, SimpleRecord, TimestampType}
import org.apache.kafka.common.utils.{Time, Utils}
@ -245,7 +244,7 @@ class LogLoaderTest {
@Test
def testProducerSnapshotsRecoveryAfterUncleanShutdown(): Unit = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, "640")
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "640")
val logConfig = new LogConfig(logProps)
var log = createLog(logDir, logConfig)
assertEquals(OptionalLong.empty(), log.oldestProducerSnapshotOffset)

View File

@ -60,7 +60,7 @@ class LogManagerTest {
val maxRollInterval = 100
val maxLogAgeMs: Int = 10 * 60 * 1000
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 4096: java.lang.Integer)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, maxLogAgeMs: java.lang.Integer)
val logConfig = new LogConfig(logProps)
@ -391,7 +391,7 @@ class LogManagerTest {
logManager.shutdown()
val segmentBytes = 10 * setSize
val properties = new Properties()
properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentBytes.toString)
properties.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentBytes.toString)
properties.put(TopicConfig.RETENTION_BYTES_CONFIG, (5L * 10L * setSize + 10L).toString)
val configRepository = MockConfigRepository.forTopic(name, properties)

View File

@ -75,7 +75,7 @@ object LogTestUtils {
remoteLogDeleteOnDisable: Boolean = DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG): LogConfig = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_MS_CONFIG, segmentMs: java.lang.Long)
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentBytes: Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentBytes: Integer)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs: java.lang.Long)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs: java.lang.Long)
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes: java.lang.Long)

View File

@ -2734,7 +2734,7 @@ class UnifiedLogTest {
@Test
def testLeaderEpochCacheCreatedAfterMessageFormatUpgrade(): Unit = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1000")
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "1000")
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1")
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "65536")
val logConfig = new LogConfig(logProps)

View File

@ -25,7 +25,7 @@ import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AlterReplicaLogDirsRequest, AlterReplicaLogDirsResponse}
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.storage.internals.log.LogFileUtils
import org.apache.kafka.storage.internals.log.{LogConfig, LogFileUtils}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@ -144,7 +144,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
// We don't want files with `.deleted` suffix are removed too fast,
// so we can validate there will be orphan files and orphan files will be removed eventually.
topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "10000")
topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024")
topicProperties.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "1024")
createTopic(topic, partitionNum, 1, topicProperties)
assertEquals(logDir1, brokers.head.logManager.getLog(tp).get.dir.getParent)

View File

@ -31,7 +31,7 @@ import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.DynamicThreadPool
import org.apache.kafka.server.authorizer._
@ -670,16 +670,6 @@ class DynamicBrokerConfigTest {
assertTrue(m.currentReporters.isEmpty)
}
@Test
def testNonInternalValuesDoesNotExposeInternalConfigs(): Unit = {
val props = TestUtils.createBrokerConfig(0, port = 8181)
props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, "1024")
val config = new KafkaConfig(props)
assertFalse(config.nonInternalValues.containsKey(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG))
config.updateCurrentConfig(new KafkaConfig(props))
assertFalse(config.nonInternalValues.containsKey(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG))
}
@Test
def testDynamicLogLocalRetentionMsConfig(): Unit = {
val props = TestUtils.createBrokerConfig(0, port = 8181)

View File

@ -95,17 +95,17 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
@Test
def testDynamicTopicConfigChange(): Unit = {
val tp = new TopicPartition("test", 0)
val oldSegmentSize = 1000
val oldSegmentSize = 2 * 1024 * 1024
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, oldSegmentSize.toString)
createTopic(tp.topic, 1, 1, logProps)
TestUtils.retry(10000) {
val logOpt = this.brokers.head.logManager.getLog(tp)
assertTrue(logOpt.isDefined)
assertEquals(oldSegmentSize, logOpt.get.config.segmentSize)
assertEquals(oldSegmentSize, logOpt.get.config.segmentSize())
}
val newSegmentSize = 2000
val newSegmentSize = 2 * 1024 * 1024
val admin = createAdminClient()
try {
val resource = new ConfigResource(ConfigResource.Type.TOPIC, tp.topic())
@ -117,7 +117,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
val log = brokers.head.logManager.getLog(tp).get
TestUtils.retry(10000) {
assertEquals(newSegmentSize, log.config.segmentSize)
assertEquals(newSegmentSize, log.config.segmentSize())
}
(1 to 50).foreach(i => TestUtils.produceMessage(brokers, tp.topic, i.toString))

View File

@ -793,6 +793,10 @@ class KafkaConfigTest {
case MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case MetadataLogConfig.METADATA_MAX_RETENTION_BYTES_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case MetadataLogConfig.METADATA_MAX_RETENTION_MILLIS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case MetadataLogConfig.INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG => // no op
case MetadataLogConfig.INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG => // no op
case MetadataLogConfig.INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG => // no op
case MetadataLogConfig.INTERNAL_DELETE_DELAY_MILLIS_CONFIG => // no op
case KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG => // ignore string
case MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
@ -1135,6 +1139,8 @@ class KafkaConfigTest {
// topic only config
case QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG =>
// topic only config
case "internal.segment.bytes" =>
// topic internal config
case prop =>
fail(prop + " must be explicitly checked for dynamic updatability. Note that LogConfig(s) require that KafkaConfig value lookups are dynamic and not static values.")
}

View File

@ -33,7 +33,7 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.{Assignment,
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.config.{AbstractConfig, TopicConfig}
import org.apache.kafka.common.message.{KRaftVersionRecord, LeaderChangeMessage, SnapshotFooterRecord, SnapshotHeaderRecord, VotersRecord}
import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRecord, TopicRecord}
import org.apache.kafka.common.protocol.{ApiMessage, ByteBufferAccessor, MessageUtil, ObjectSerializationCache}
@ -44,9 +44,8 @@ import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, ShareSnap
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogValue}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.{KafkaRaftClient, MetadataLogConfig, VoterSetTest}
import org.apache.kafka.raft.{MetadataLogConfig, VoterSetTest}
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, OffsetAndEpoch}
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentMetadataUpdate, RemoteLogSegmentState, RemotePartitionDeleteMetadata, RemotePartitionDeleteState}
import org.apache.kafka.server.storage.log.FetchIsolation
@ -584,15 +583,11 @@ class DumpLogSegmentsTest {
logDir,
time,
time.scheduler,
new MetadataLogConfig(
100 * 1024,
createMetadataLogConfig(
100 * 1024,
10 * 1000,
100 * 1024,
60 * 1000,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT
60 * 1000
),
1
)
@ -1195,4 +1190,19 @@ class DumpLogSegmentsTest {
))
)
}
private def createMetadataLogConfig(
internalLogSegmentBytes: Int,
logSegmentMillis: Long,
retentionMaxBytes: Long,
retentionMillis: Long
): MetadataLogConfig = {
val config: util.Map[String, Any] = util.Map.of(
MetadataLogConfig.INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG, internalLogSegmentBytes,
MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG, logSegmentMillis,
MetadataLogConfig.METADATA_MAX_RETENTION_BYTES_CONFIG, retentionMaxBytes,
MetadataLogConfig.METADATA_MAX_RETENTION_MILLIS_CONFIG, retentionMillis,
)
new MetadataLogConfig(new AbstractConfig(MetadataLogConfig.CONFIG_DEF, config, false))
}
}

View File

@ -166,6 +166,10 @@ public class KafkaConfigSchema {
ConfigDef configDef = configDefs.getOrDefault(ConfigResource.Type.TOPIC, EMPTY_CONFIG_DEF);
HashMap<String, ConfigEntry> effectiveConfigs = new HashMap<>();
for (ConfigDef.ConfigKey configKey : configDef.configKeys().values()) {
// This config is internal; if the user hasn't set it explicitly, it should not be returned.
if (configKey.internalConfig && !dynamicTopicConfigs.containsKey(configKey.name)) {
continue;
}
ConfigEntry entry = resolveEffectiveTopicConfig(configKey, staticNodeConfig,
dynamicClusterConfigs, dynamicNodeConfigs, dynamicTopicConfigs);
effectiveConfigs.put(entry.name(), entry);

View File

@ -54,7 +54,8 @@ public class KafkaConfigSchemaTest {
define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc doc").
define("def", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "def doc").
define("ghi", ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.HIGH, "ghi doc").
define("xyz", ConfigDef.Type.PASSWORD, "thedefault", ConfigDef.Importance.HIGH, "xyz doc"));
define("xyz", ConfigDef.Type.PASSWORD, "thedefault", ConfigDef.Importance.HIGH, "xyz doc").
defineInternal("internal", ConfigDef.Type.STRING, "internalValue", null, ConfigDef.Importance.HIGH, "internal doc"));
}
public static final Map<String, List<ConfigSynonym>> SYNONYMS = new HashMap<>();
@ -167,4 +168,30 @@ public class KafkaConfigSchemaTest {
dynamicNodeConfigs,
dynamicTopicConfigs));
}
@Test
public void testResolveEffectiveDynamicInternalTopicConfig() {
Map<String, String> dynamicTopicConfigs = Map.of(
"ghi", "true",
"internal", "internal,change"
);
Map<String, ConfigEntry> expected = Map.of(
"abc", new ConfigEntry("abc", null,
ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, List.of(),
ConfigEntry.ConfigType.LIST, "abc doc"),
"def", new ConfigEntry("def", null,
ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, List.of(),
ConfigEntry.ConfigType.LONG, "def doc"),
"ghi", new ConfigEntry("ghi", "true",
ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, List.of(),
ConfigEntry.ConfigType.BOOLEAN, "ghi doc"),
"xyz", new ConfigEntry("xyz", "thedefault",
ConfigEntry.ConfigSource.DEFAULT_CONFIG, true, false, List.of(),
ConfigEntry.ConfigType.PASSWORD, "xyz doc"),
"internal", new ConfigEntry("internal", "internal,change",
ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, List.of(),
ConfigEntry.ConfigType.STRING, "internal doc")
);
assertEquals(expected, SCHEMA.resolveEffectiveTopicConfigs(Map.of(), Map.of(), Map.of(), dynamicTopicConfigs));
}
}

View File

@ -18,7 +18,6 @@ package org.apache.kafka.raft;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.server.config.ServerLogConfigs;
import java.util.concurrent.TimeUnit;
@ -52,14 +51,13 @@ public class MetadataLogConfig {
"configuration. The Kafka node will generate a snapshot when either the maximum time interval is reached or the " +
"maximum bytes limit is reached.";
public static final String METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG = "metadata.log.segment.min.bytes";
public static final String METADATA_LOG_SEGMENT_MIN_BYTES_DOC = "Override the minimum size for a single metadata log file. This should be used for testing only.";
public static final int METADATA_LOG_SEGMENT_MIN_BYTES_DEFAULT = 8 * 1024 * 1024;
public static final String METADATA_LOG_SEGMENT_BYTES_CONFIG = "metadata.log.segment.bytes";
public static final String METADATA_LOG_SEGMENT_BYTES_DOC = "The maximum size of a single metadata log file.";
public static final int METADATA_LOG_SEGMENT_BYTES_DEFAULT = 1024 * 1024 * 1024;
public static final String INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG = "internal.metadata.log.segment.bytes";
public static final String INTERNAL_METADATA_LOG_SEGMENT_BYTES_DOC = "The maximum size of a single metadata log file, only for testing.";
public static final String METADATA_LOG_SEGMENT_MILLIS_CONFIG = "metadata.log.segment.ms";
public static final String METADATA_LOG_SEGMENT_MILLIS_DOC = "The maximum time before a new metadata log file is rolled out (in milliseconds).";
public static final long METADATA_LOG_SEGMENT_MILLIS_DEFAULT = 24 * 7 * 60 * 60 * 1000L;
@ -80,72 +78,55 @@ public class MetadataLogConfig {
"controller should write no-op records to the metadata partition. If the value is 0, no-op records " +
"are not appended to the metadata partition. The default value is " + METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT;
public static final String INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG = "internal.max.batch.size.in.bytes";
public static final String INTERNAL_MAX_BATCH_SIZE_IN_BYTES_DOC = "The largest record batch size allowed in the metadata log, only for testing.";
public static final String INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG = "internal.max.fetch.size.in.bytes";
public static final String INTERNAL_MAX_FETCH_SIZE_IN_BYTES_DOC = "The maximum number of bytes to read when fetching from the metadata log, only for testing.";
public static final String INTERNAL_DELETE_DELAY_MILLIS_CONFIG = "internal.delete.delay.millis";
public static final String INTERNAL_DELETE_DELAY_MILLIS_DOC = "The amount of time to wait before deleting a file from the filesystem, only for testing.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC)
.define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC)
.define(METADATA_LOG_DIR_CONFIG, STRING, null, null, HIGH, METADATA_LOG_DIR_DOC)
.define(METADATA_LOG_SEGMENT_BYTES_CONFIG, INT, METADATA_LOG_SEGMENT_BYTES_DEFAULT, atLeast(Records.LOG_OVERHEAD), HIGH, METADATA_LOG_SEGMENT_BYTES_DOC)
.defineInternal(METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, INT, METADATA_LOG_SEGMENT_MIN_BYTES_DEFAULT, atLeast(Records.LOG_OVERHEAD), HIGH, METADATA_LOG_SEGMENT_MIN_BYTES_DOC)
.define(METADATA_LOG_SEGMENT_BYTES_CONFIG, INT, METADATA_LOG_SEGMENT_BYTES_DEFAULT, atLeast(8 * 1024 * 1024), HIGH, METADATA_LOG_SEGMENT_BYTES_DOC)
.define(METADATA_LOG_SEGMENT_MILLIS_CONFIG, LONG, METADATA_LOG_SEGMENT_MILLIS_DEFAULT, null, HIGH, METADATA_LOG_SEGMENT_MILLIS_DOC)
.define(METADATA_MAX_RETENTION_BYTES_CONFIG, LONG, METADATA_MAX_RETENTION_BYTES_DEFAULT, null, HIGH, METADATA_MAX_RETENTION_BYTES_DOC)
.define(METADATA_MAX_RETENTION_MILLIS_CONFIG, LONG, METADATA_MAX_RETENTION_MILLIS_DEFAULT, null, HIGH, METADATA_MAX_RETENTION_MILLIS_DOC)
.define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, METADATA_MAX_IDLE_INTERVAL_MS_DOC);
.define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, METADATA_MAX_IDLE_INTERVAL_MS_DOC)
.defineInternal(INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG, INT, null, null, LOW, INTERNAL_METADATA_LOG_SEGMENT_BYTES_DOC)
.defineInternal(INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG, INT, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, null, LOW, INTERNAL_MAX_BATCH_SIZE_IN_BYTES_DOC)
.defineInternal(INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG, INT, KafkaRaftClient.MAX_FETCH_SIZE_BYTES, null, LOW, INTERNAL_MAX_FETCH_SIZE_IN_BYTES_DOC)
.defineInternal(INTERNAL_DELETE_DELAY_MILLIS_CONFIG, LONG, ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT, null, LOW, INTERNAL_DELETE_DELAY_MILLIS_DOC);
private final int logSegmentBytes;
private final int logSegmentMinBytes;
private final Integer internalSegmentBytes;
private final long logSegmentMillis;
private final long retentionMaxBytes;
private final long retentionMillis;
private final int maxBatchSizeInBytes;
private final int maxFetchSizeInBytes;
private final long deleteDelayMillis;
/**
* Configuration for the metadata log
* @param logSegmentBytes The maximum size of a single metadata log file
* @param logSegmentMinBytes The minimum size of a single metadata log file
* @param logSegmentMillis The maximum time before a new metadata log file is rolled out
* @param retentionMaxBytes The size of the metadata log and snapshots before deleting old snapshots and log files
* @param retentionMillis The time to keep a metadata log file or snapshot before deleting it
* @param maxBatchSizeInBytes The largest record batch size allowed in the metadata log
* @param maxFetchSizeInBytes The maximum number of bytes to read when fetching from the metadata log
* @param deleteDelayMillis The amount of time to wait before deleting a file from the filesystem
*/
public MetadataLogConfig(int logSegmentBytes,
int logSegmentMinBytes,
long logSegmentMillis,
long retentionMaxBytes,
long retentionMillis,
int maxBatchSizeInBytes,
int maxFetchSizeInBytes,
long deleteDelayMillis) {
this.logSegmentBytes = logSegmentBytes;
this.logSegmentMinBytes = logSegmentMinBytes;
this.logSegmentMillis = logSegmentMillis;
this.retentionMaxBytes = retentionMaxBytes;
this.retentionMillis = retentionMillis;
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
this.maxFetchSizeInBytes = maxFetchSizeInBytes;
this.deleteDelayMillis = deleteDelayMillis;
}
private final int internalMaxBatchSizeInBytes;
private final int internalMaxFetchSizeInBytes;
private final long internalDeleteDelayMillis;
public MetadataLogConfig(AbstractConfig config) {
this.logSegmentBytes = config.getInt(METADATA_LOG_SEGMENT_BYTES_CONFIG);
this.logSegmentMinBytes = config.getInt(METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG);
this.internalSegmentBytes = config.getInt(INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG);
this.logSegmentMillis = config.getLong(METADATA_LOG_SEGMENT_MILLIS_CONFIG);
this.retentionMaxBytes = config.getLong(METADATA_MAX_RETENTION_BYTES_CONFIG);
this.retentionMillis = config.getLong(METADATA_MAX_RETENTION_MILLIS_CONFIG);
this.maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES;
this.maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES;
this.deleteDelayMillis = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT;
this.internalMaxBatchSizeInBytes = config.getInt(INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG);
this.internalMaxFetchSizeInBytes = config.getInt(INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG);
this.internalDeleteDelayMillis = config.getLong(INTERNAL_DELETE_DELAY_MILLIS_CONFIG);
}
public int logSegmentBytes() {
return logSegmentBytes;
}
public int logSegmentMinBytes() {
return logSegmentMinBytes;
public Integer internalSegmentBytes() {
return internalSegmentBytes;
}
public long logSegmentMillis() {
@ -160,15 +141,15 @@ public class MetadataLogConfig {
return retentionMillis;
}
public int maxBatchSizeInBytes() {
return maxBatchSizeInBytes;
public int internalMaxBatchSizeInBytes() {
return internalMaxBatchSizeInBytes;
}
public int maxFetchSizeInBytes() {
return maxFetchSizeInBytes;
public int internalMaxFetchSizeInBytes() {
return internalMaxFetchSizeInBytes;
}
public long deleteDelayMillis() {
return deleteDelayMillis;
public long internalDeleteDelayMillis() {
return internalDeleteDelayMillis;
}
}

View File

@ -47,7 +47,6 @@ public final class ServerTopicConfigSynonyms {
* both the first and the second synonyms are configured, we will use only the value of
* the first synonym and ignore the second.
*/
// Topic configs with no mapping to a server config can be found in `LogConfig.CONFIGS_WITH_NO_SERVER_DEFAULTS`
public static final Map<String, List<ConfigSynonym>> ALL_TOPIC_CONFIG_SYNONYMS = Utils.mkMap(
sameNameWithLogPrefix(TopicConfig.SEGMENT_BYTES_CONFIG),
listWithLogPrefix(TopicConfig.SEGMENT_MS_CONFIG,

View File

@ -171,7 +171,7 @@ public class Cleaner {
List<List<LogSegment>> groupedSegments = groupSegmentsBySize(
log.logSegments(0, endOffset),
log.config().segmentSize,
log.config().segmentSize(),
log.config().maxIndexSize,
cleanable.firstUncleanableOffset()
);

View File

@ -26,7 +26,6 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ConfigUtils;
import org.apache.kafka.common.utils.Utils;
@ -140,14 +139,8 @@ public class LogConfig extends AbstractConfig {
public static final long DEFAULT_LOCAL_RETENTION_BYTES = -2; // It indicates the value to be derived from RetentionBytes
public static final long DEFAULT_LOCAL_RETENTION_MS = -2; // It indicates the value to be derived from RetentionMs
// Visible for testing
public static final Set<String> CONFIGS_WITH_NO_SERVER_DEFAULTS = Set.of(
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG,
QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG
);
public static final String INTERNAL_SEGMENT_BYTES_CONFIG = "internal.segment.bytes";
public static final String INTERNAL_SEGMENT_BYTES_DOC = "The maximum size of a single log file. This should be used for testing only.";
public static final ConfigDef SERVER_CONFIG_DEF = new ConfigDef()
.define(ServerLogConfigs.NUM_PARTITIONS_CONFIG, INT, ServerLogConfigs.NUM_PARTITIONS_DEFAULT, atLeast(1), MEDIUM, ServerLogConfigs.NUM_PARTITIONS_DOC)
@ -191,7 +184,7 @@ public class LogConfig extends AbstractConfig {
private static final LogConfigDef CONFIG = new LogConfigDef();
static {
CONFIG.
define(TopicConfig.SEGMENT_BYTES_CONFIG, INT, DEFAULT_SEGMENT_BYTES, atLeast(LegacyRecord.RECORD_OVERHEAD_V0), MEDIUM,
define(TopicConfig.SEGMENT_BYTES_CONFIG, INT, DEFAULT_SEGMENT_BYTES, atLeast(1024 * 1024), MEDIUM,
TopicConfig.SEGMENT_BYTES_DOC)
.define(TopicConfig.SEGMENT_MS_CONFIG, LONG, DEFAULT_SEGMENT_MS, atLeast(1), MEDIUM, TopicConfig.SEGMENT_MS_DOC)
.define(TopicConfig.SEGMENT_JITTER_MS_CONFIG, LONG, DEFAULT_SEGMENT_JITTER_MS, atLeast(0), MEDIUM,
@ -253,7 +246,8 @@ public class LogConfig extends AbstractConfig {
.define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM,
TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
.define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC)
.define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC);
.define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC)
.defineInternal(INTERNAL_SEGMENT_BYTES_CONFIG, INT, null, null, MEDIUM, INTERNAL_SEGMENT_BYTES_DOC);
}
public final Set<String> overriddenConfigs;
@ -262,7 +256,8 @@ public class LogConfig extends AbstractConfig {
* Important note: Any configuration parameter that is passed along from KafkaConfig to LogConfig
* should also be in `KafkaConfig#extractLogConfigMap`.
*/
public final int segmentSize;
private final int segmentSize;
private final Integer internalSegmentSize;
public final long segmentMs;
public final long segmentJitterMs;
public final int maxIndexSize;
@ -306,6 +301,7 @@ public class LogConfig extends AbstractConfig {
this.overriddenConfigs = Collections.unmodifiableSet(overriddenConfigs);
this.segmentSize = getInt(TopicConfig.SEGMENT_BYTES_CONFIG);
this.internalSegmentSize = getInt(INTERNAL_SEGMENT_BYTES_CONFIG);
this.segmentMs = getLong(TopicConfig.SEGMENT_MS_CONFIG);
this.segmentJitterMs = getLong(TopicConfig.SEGMENT_JITTER_MS_CONFIG);
this.maxIndexSize = getInt(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG);
@ -367,6 +363,11 @@ public class LogConfig extends AbstractConfig {
}
}
public int segmentSize() {
if (internalSegmentSize == null) return segmentSize;
return internalSegmentSize;
}
// Exposed as a method so it can be mocked
public int maxMessageSize() {
return maxMessageSize;
@ -388,7 +389,7 @@ public class LogConfig extends AbstractConfig {
public int initFileSize() {
if (preallocate)
return segmentSize;
return segmentSize();
else
return 0;
}
@ -446,8 +447,12 @@ public class LogConfig extends AbstractConfig {
return CONFIG.names().stream().sorted().toList();
}
public static Optional<String> serverConfigName(String configName) {
return CONFIG.serverConfigName(configName);
public static List<String> nonInternalConfigNames() {
return CONFIG.configKeys().entrySet()
.stream()
.filter(entry -> !entry.getValue().internalConfig)
.map(Map.Entry::getKey)
.sorted().toList();
}
public static Map<String, ConfigKey> configKeys() {
@ -630,7 +635,7 @@ public class LogConfig extends AbstractConfig {
@Override
public String toString() {
return "LogConfig{" +
"segmentSize=" + segmentSize +
"segmentSize=" + segmentSize() +
", segmentMs=" + segmentMs +
", segmentJitterMs=" + segmentJitterMs +
", maxIndexSize=" + maxIndexSize +

View File

@ -1181,9 +1181,9 @@ public class UnifiedLog implements AutoCloseable {
});
// check messages size does not exceed config.segmentSize
if (validRecords.sizeInBytes() > config().segmentSize) {
if (validRecords.sizeInBytes() > config().segmentSize()) {
throw new RecordBatchTooLargeException("Message batch size is " + validRecords.sizeInBytes() + " bytes in append " +
"to partition " + topicPartition() + ", which exceeds the maximum configured segment size of " + config().segmentSize + ".");
"to partition " + topicPartition() + ", which exceeds the maximum configured segment size of " + config().segmentSize() + ".");
}
// maybe roll the log if this segment is full
@ -2038,12 +2038,12 @@ public class UnifiedLog implements AutoCloseable {
long maxTimestampInMessages = appendInfo.maxTimestamp();
long maxOffsetInMessages = appendInfo.lastOffset();
if (segment.shouldRoll(new RollParams(config().maxSegmentMs(), config().segmentSize, appendInfo.maxTimestamp(), appendInfo.lastOffset(), messagesSize, now))) {
if (segment.shouldRoll(new RollParams(config().maxSegmentMs(), config().segmentSize(), appendInfo.maxTimestamp(), appendInfo.lastOffset(), messagesSize, now))) {
logger.debug("Rolling new log segment (log_size = {}/{}}, " +
"offset_index_size = {}/{}, " +
"time_index_size = {}/{}, " +
"inactive_time_ms = {}/{}).",
segment.size(), config().segmentSize,
segment.size(), config().segmentSize(),
segment.offsetIndex().entries(), segment.offsetIndex().maxEntries(),
segment.timeIndex().entries(), segment.timeIndex().maxEntries(),
segment.timeWaitedForRoll(now, maxTimestampInMessages), config().segmentMs - segment.rollJitterMs());

View File

@ -198,7 +198,7 @@ class LocalLogTest {
assertEquals(oldConfig, log.config());
Properties props = new Properties();
props.put(TopicConfig.SEGMENT_BYTES_CONFIG, oldConfig.segmentSize + 1);
props.put(TopicConfig.SEGMENT_BYTES_CONFIG, oldConfig.segmentSize() + 1);
LogConfig newConfig = new LogConfig(props);
log.updateConfig(newConfig);
assertEquals(newConfig, log.config());

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -114,7 +115,7 @@ public class PurgeRepartitionTopicIntegrationTest {
.get();
return config.get(TopicConfig.CLEANUP_POLICY_CONFIG).value().equals(TopicConfig.CLEANUP_POLICY_DELETE)
&& config.get(TopicConfig.SEGMENT_MS_CONFIG).value().equals(PURGE_INTERVAL_MS.toString())
&& config.get(TopicConfig.SEGMENT_BYTES_CONFIG).value().equals(PURGE_SEGMENT_BYTES.toString());
&& config.get(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG).value().equals(PURGE_SEGMENT_BYTES.toString());
} catch (final Exception e) {
return false;
}
@ -171,7 +172,7 @@ public class PurgeRepartitionTopicIntegrationTest {
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(APPLICATION_ID).getPath());
streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), PURGE_INTERVAL_MS);
streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), PURGE_SEGMENT_BYTES);
streamsConfiguration.put(StreamsConfig.topicPrefix(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG), PURGE_SEGMENT_BYTES);
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), PURGE_SEGMENT_BYTES / 2); // we cannot allow batch size larger than segment size
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -248,7 +248,7 @@ public class StreamsConfigTest {
props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 99_999L);
props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, 7L);
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:host");
props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 1024 * 1024);
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
@ -263,7 +263,7 @@ public class StreamsConfigTest {
);
assertEquals(7L, returnedProps.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
assertEquals("dummy:host", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG));
assertEquals(100, returnedProps.get(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)));
assertEquals(1024 * 1024, returnedProps.get(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)));
}
@Test

View File

@ -769,7 +769,7 @@ public abstract class TopicCommand {
.ofType(String.class);
nl = System.lineSeparator();
String logConfigNames = LogConfig.configNames().stream().map(config -> "\t" + config).collect(Collectors.joining(nl));
String logConfigNames = LogConfig.nonInternalConfigNames().stream().map(config -> "\t" + config).collect(Collectors.joining(nl));
configOpt = parser.accepts("config", "A topic configuration override for the topic being created." +
" The following is a list of valid configurations: " + nl + logConfigNames + nl +
"See the Kafka documentation for full details on the topic configs." +

View File

@ -40,6 +40,7 @@ import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.test.TestUtils;
import java.time.Duration;
@ -97,7 +98,7 @@ public class GetOffsetShellTest {
Map<String, String> rlsConfigs = new HashMap<>();
rlsConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true");
rlsConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
rlsConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "100");
rlsConfigs.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "100");
setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs);
sendProducerRecords(this::getRemoteLogStorageEnabledTopicName);
}

View File

@ -20,6 +20,7 @@ package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsOptions;
@ -56,6 +57,7 @@ import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.metadata.LeaderAndIsr;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
@ -82,6 +84,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@ -1400,6 +1403,27 @@ public class TopicCommandTest {
}
}
@ClusterTest
public void testCreateWithInternalConfig(ClusterInstance cluster) throws InterruptedException, ExecutionException {
String internalConfigTopicName = TestUtils.randomString(10);
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = cluster.admin()) {
CreateTopicsResult internalResult = adminClient.createTopics(List.of(new NewTopic(internalConfigTopicName, defaultNumPartitions, defaultReplicationFactor).configs(
Map.of(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "1000")
)));
ConfigEntry internalConfigEntry = internalResult.config(internalConfigTopicName).get().get(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG);
assertNotNull(internalConfigEntry, "Internal config entry should not be null");
assertEquals("1000", internalConfigEntry.value());
CreateTopicsResult nonInternalResult = adminClient.createTopics(List.of(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
ConfigEntry nonInternalConfigEntry = nonInternalResult.config(testTopicName).get().get(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG);
assertNull(nonInternalConfigEntry, "Non-internal config entry should be null");
}
}
private void checkReplicaDistribution(Map<Integer, List<Integer>> assignment,
Map<Integer, String> brokerRackMapping,
Integer numBrokers,