KAFKA-15599 Move MetadataLogConfig to raft module (#19246)

Rewrite the class in Java and move it to the raft module.

Reviewers: PoAn Yang <payang@apache.org>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2025-03-21 06:44:20 +01:00 committed by GitHub
parent 1c582a4a35
commit 121ec2a662
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 154 additions and 115 deletions

View File

@ -488,6 +488,7 @@
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.common.serialization" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.fault"/>
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.test"/>

View File

@ -1,48 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.raft
import org.apache.kafka.server.config.ServerLogConfigs
import kafka.server.KafkaConfig
final case class MetadataLogConfig(
logSegmentBytes: Int,
logSegmentMinBytes: Int,
logSegmentMillis: Long,
retentionMaxBytes: Long,
retentionMillis: Long,
maxBatchSizeInBytes: Int,
maxFetchSizeInBytes: Int,
fileDeleteDelayMs: Long,
nodeId: Int
)
object MetadataLogConfig {
def apply(config: KafkaConfig, maxBatchSizeInBytes: Int, maxFetchSizeInBytes: Int): MetadataLogConfig = {
new MetadataLogConfig(
config.metadataLogSegmentBytes,
config.metadataLogSegmentMinBytes,
config.metadataLogSegmentMillis,
config.metadataRetentionBytes,
config.metadataRetentionMillis,
maxBatchSizeInBytes,
maxFetchSizeInBytes,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
config.metadataNodeIDConfig
)
}
}

View File

@ -28,7 +28,7 @@ import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, MetadataLogConfig, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.Scheduler
@ -554,7 +554,7 @@ final class KafkaMetadataLog private (
scheduler.scheduleOnce(
"delete-snapshot-files",
() => KafkaMetadataLog.deleteSnapshotFiles(log.dir.toPath, expiredSnapshots, this),
config.fileDeleteDelayMs
config.deleteDelayMillis
)
}
}

View File

@ -41,10 +41,11 @@ import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.raft.{ExternalKRaftMetrics, Endpoints, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog, TimingWheelExpirationService}
import org.apache.kafka.raft.{Endpoints, ExternalKRaftMetrics, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, MetadataLogConfig, QuorumConfig, RaftClient, ReplicatedLog, TimingWheelExpirationService}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.Feature
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.util.{FileLock, KafkaScheduler}
import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.server.util.timer.SystemTimer
@ -230,7 +231,15 @@ class KafkaRaftManager[T](
dataDir,
time,
scheduler,
config = MetadataLogConfig(config, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
config = new MetadataLogConfig(config.metadataLogSegmentBytes,
config.metadataLogSegmentMinBytes,
config.metadataLogSegmentMillis,
config.metadataRetentionBytes,
config.metadataRetentionMillis,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
config.metadataNodeIDConfig)
)
}

View File

@ -27,7 +27,7 @@ import org.apache.kafka.common.record.ArbitraryMemoryRecords
import org.apache.kafka.common.record.InvalidMemoryRecordsProvider
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.raft._
import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, MetadataLogConfig, OffsetAndEpoch, QuorumConfig, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
import org.apache.kafka.raft.internals.BatchBuilder
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
@ -80,13 +80,31 @@ final class KafkaMetadataLogTest {
props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MILLIS_CONFIG, Int.box(10 * 1024))
assertThrows(classOf[InvalidConfigurationException], () => {
val kafkaConfig = KafkaConfig.fromProps(props)
val metadataConfig = MetadataLogConfig(kafkaConfig, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
val metadataConfig = new MetadataLogConfig(
kafkaConfig.metadataLogSegmentBytes,
kafkaConfig.metadataLogSegmentMinBytes,
kafkaConfig.metadataLogSegmentMillis,
kafkaConfig.metadataRetentionBytes,
kafkaConfig.metadataRetentionMillis,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
kafkaConfig.metadataNodeIDConfig)
buildMetadataLog(tempDir, mockTime, metadataConfig)
})
props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, Int.box(10240))
val kafkaConfig = KafkaConfig.fromProps(props)
val metadataConfig = MetadataLogConfig(kafkaConfig, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
val metadataConfig = new MetadataLogConfig(
kafkaConfig.metadataLogSegmentBytes,
kafkaConfig.metadataLogSegmentMinBytes,
kafkaConfig.metadataLogSegmentMillis,
kafkaConfig.metadataRetentionBytes,
kafkaConfig.metadataRetentionMillis,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
kafkaConfig.metadataNodeIDConfig)
buildMetadataLog(tempDir, mockTime, metadataConfig)
}
@ -129,8 +147,8 @@ final class KafkaMetadataLogTest {
def testEmptyAppendNotAllowed(): Unit = {
val log = buildMetadataLog(tempDir, mockTime)
assertThrows(classOf[IllegalArgumentException], () => log.appendAsFollower(MemoryRecords.EMPTY, 1));
assertThrows(classOf[IllegalArgumentException], () => log.appendAsLeader(MemoryRecords.EMPTY, 1));
assertThrows(classOf[IllegalArgumentException], () => log.appendAsFollower(MemoryRecords.EMPTY, 1))
assertThrows(classOf[IllegalArgumentException], () => log.appendAsLeader(MemoryRecords.EMPTY, 1))
}
@ParameterizedTest
@ -140,7 +158,7 @@ final class KafkaMetadataLogTest {
val previousEndOffset = log.endOffset().offset()
val action: Executable = () => log.appendAsFollower(records, Int.MaxValue)
if (expectedException.isPresent()) {
if (expectedException.isPresent) {
assertThrows(expectedException.get, action)
} else {
assertThrows(classOf[CorruptRecordException], action)
@ -478,7 +496,7 @@ final class KafkaMetadataLogTest {
assertEquals(log.earliestSnapshotId(), log.latestSnapshotId())
log.close()
mockTime.sleep(config.fileDeleteDelayMs)
mockTime.sleep(config.deleteDelayMillis)
// Assert that the log dir doesn't contain any older snapshots
Files
.walk(logDir, 1)
@ -649,7 +667,7 @@ final class KafkaMetadataLogTest {
assertEquals(greaterSnapshotId, secondLog.latestSnapshotId().get)
assertEquals(3 * numberOfRecords, secondLog.startOffset)
assertEquals(epoch, secondLog.lastFetchedEpoch)
mockTime.sleep(config.fileDeleteDelayMs)
mockTime.sleep(config.deleteDelayMillis)
// Assert that the log dir doesn't contain any older snapshots
Files
@ -687,7 +705,18 @@ final class KafkaMetadataLogTest {
val leaderEpoch = 5
val maxBatchSizeInBytes = 16384
val recordSize = 64
val log = buildMetadataLog(tempDir, mockTime, DefaultMetadataLogConfig.copy(maxBatchSizeInBytes = maxBatchSizeInBytes))
val config = new MetadataLogConfig(
DefaultMetadataLogConfig.logSegmentBytes,
DefaultMetadataLogConfig.logSegmentMinBytes,
DefaultMetadataLogConfig.logSegmentMillis,
DefaultMetadataLogConfig.retentionMaxBytes,
DefaultMetadataLogConfig.retentionMillis,
maxBatchSizeInBytes,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
DefaultMetadataLogConfig.deleteDelayMillis,
DefaultMetadataLogConfig.nodeId
)
val log = buildMetadataLog(tempDir, mockTime, config)
val oversizeBatch = buildFullBatch(leaderEpoch, recordSize, maxBatchSizeInBytes + recordSize)
assertThrows(classOf[RecordTooLargeException], () => {
@ -897,18 +926,17 @@ final class KafkaMetadataLogTest {
@Test
def testAdvanceLogStartOffsetAfterCleaning(): Unit = {
val config = MetadataLogConfig(
logSegmentBytes = 512,
logSegmentMinBytes = 512,
logSegmentMillis = 10 * 1000,
retentionMaxBytes = 256,
retentionMillis = 60 * 1000,
maxBatchSizeInBytes = 512,
maxFetchSizeInBytes = DefaultMetadataLogConfig.maxFetchSizeInBytes,
fileDeleteDelayMs = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
nodeId = 1
val config = new MetadataLogConfig(
512,
512,
10 * 1000,
256,
60 * 1000,
512,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
1
)
config.copy()
val log = buildMetadataLog(tempDir, mockTime, config)
// Generate some segments
@ -936,13 +964,16 @@ final class KafkaMetadataLogTest {
@Test
def testDeleteSnapshots(): Unit = {
// Generate some logs and a few snapshots, set retention low and verify that cleaning occurs
val config = DefaultMetadataLogConfig.copy(
logSegmentBytes = 1024,
logSegmentMinBytes = 1024,
logSegmentMillis = 10 * 1000,
retentionMaxBytes = 1024,
retentionMillis = 60 * 1000,
maxBatchSizeInBytes = 100
val config = new MetadataLogConfig(
1024,
1024,
10 * 1000,
1024,
60 * 1000,
100,
DefaultMetadataLogConfig.maxBatchSizeInBytes,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
DefaultMetadataLogConfig.nodeId
)
val log = buildMetadataLog(tempDir, mockTime, config)
@ -968,13 +999,16 @@ final class KafkaMetadataLogTest {
@Test
def testSoftRetentionLimit(): Unit = {
// Set retention equal to the segment size and generate slightly more than one segment of logs
val config = DefaultMetadataLogConfig.copy(
logSegmentBytes = 10240,
logSegmentMinBytes = 10240,
logSegmentMillis = 10 * 1000,
retentionMaxBytes = 10240,
retentionMillis = 60 * 1000,
maxBatchSizeInBytes = 100
val config = new MetadataLogConfig(
10240,
10240,
10 * 1000,
10240,
60 * 1000,
100,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
DefaultMetadataLogConfig.deleteDelayMillis,
DefaultMetadataLogConfig.nodeId
)
val log = buildMetadataLog(tempDir, mockTime, config)
@ -1010,13 +1044,16 @@ final class KafkaMetadataLogTest {
@Test
def testSegmentsLessThanLatestSnapshot(): Unit = {
val config = DefaultMetadataLogConfig.copy(
logSegmentBytes = 10240,
logSegmentMinBytes = 10240,
logSegmentMillis = 10 * 1000,
retentionMaxBytes = 10240,
retentionMillis = 60 * 1000,
maxBatchSizeInBytes = 200
val config = new MetadataLogConfig(
10240,
10240,
10 * 1000,
10240,
60 * 1000,
200,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
DefaultMetadataLogConfig.deleteDelayMillis,
DefaultMetadataLogConfig.nodeId
)
val log = buildMetadataLog(tempDir, mockTime, config)
@ -1067,16 +1104,16 @@ object KafkaMetadataLogTest {
override def read(input: protocol.Readable, size: Int): Array[Byte] = input.readArray(size)
}
val DefaultMetadataLogConfig = MetadataLogConfig(
logSegmentBytes = 100 * 1024,
logSegmentMinBytes = 100 * 1024,
logSegmentMillis = 10 * 1000,
retentionMaxBytes = 100 * 1024,
retentionMillis = 60 * 1000,
maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
fileDeleteDelayMs = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
nodeId = 1
val DefaultMetadataLogConfig = new MetadataLogConfig(
100 * 1024,
100 * 1024,
10 * 1000,
100 * 1024,
60 * 1000,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
1
)
def buildMetadataLogAndDir(

View File

@ -25,7 +25,7 @@ import java.util.Optional
import java.util.Properties
import java.util.stream.IntStream
import kafka.log.LogTestUtils
import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
import kafka.raft.KafkaMetadataLog
import kafka.server.KafkaRaftServer
import kafka.tools.DumpLogSegments.{OffsetsMessageParser, ShareGroupStateMessageParser, TimeIndexDumpErrors, TransactionLogMessageParser}
import kafka.utils.TestUtils
@ -43,7 +43,7 @@ import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, ShareSnap
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogValue}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch, VoterSetTest}
import org.apache.kafka.raft.{KafkaRaftClient, MetadataLogConfig, OffsetAndEpoch, VoterSetTest}
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion}
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde
@ -544,16 +544,16 @@ class DumpLogSegmentsTest {
logDir,
time,
time.scheduler,
MetadataLogConfig(
logSegmentBytes = 100 * 1024,
logSegmentMinBytes = 100 * 1024,
logSegmentMillis = 10 * 1000,
retentionMaxBytes = 100 * 1024,
retentionMillis = 60 * 1000,
maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
fileDeleteDelayMs = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
nodeId = 1
new MetadataLogConfig(
100 * 1024,
100 * 1024,
10 * 1000,
100 * 1024,
60 * 1000,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
1
)
)

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft;
/**
* Configuration for the metadata log
* @param logSegmentBytes The maximum size of a single metadata log file
* @param logSegmentMinBytes The minimum size of a single metadata log file
* @param logSegmentMillis The maximum time before a new metadata log file is rolled out
* @param retentionMaxBytes The size of the metadata log and snapshots before deleting old snapshots and log files
* @param retentionMillis The time to keep a metadata log file or snapshot before deleting it
* @param maxBatchSizeInBytes The largest record batch size allowed in the metadata log
* @param maxFetchSizeInBytes The maximum number of bytes to read when fetching from the metadata log
* @param deleteDelayMillis The amount of time to wait before deleting a file from the filesystem
* @param nodeId The node id
*/
public record MetadataLogConfig(int logSegmentBytes,
int logSegmentMinBytes,
long logSegmentMillis,
long retentionMaxBytes,
long retentionMillis,
int maxBatchSizeInBytes,
int maxFetchSizeInBytes,
long deleteDelayMillis,
int nodeId) {
}