KAFKA-19136 Move metadata-related configs from KRaftConfigs to MetadataLogConfig (#19465)

Separates metadata-related configurations from the `KRaftConfigs` into
the `MetadataLogConfig` class.

Previously, metadata-related configs were placed in `KRaftConfigs`,
which mixed server-related configs (like process.roles) with
metadata-specific ones (like metadata.log.*), leading to confusion and
tight coupling.

In this PR:
- Extract metadata-related config definitions and variables from
`KRaftConfig` into `MetadataLogConfig`.
- Move `node.id` out of `MetadataLogConfig` into `KafkaMetadataLog’s
constructor` to avoid redundant config references.
- Leave server-related configurations in `KRaftConfig`, consistent with
its role.

This separation makes `KafkaConfig` and `KRaftConfig` cleaner, and
aligns with the goal of having a dedicated MetadataLogConfig class for
managing metadata-specific configurations.

Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
 <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Logan Zhu 2025-04-17 22:17:11 +08:00 committed by GitHub
parent db62c7cdff
commit 50fb993ce0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 232 additions and 184 deletions

View File

@ -2416,6 +2416,7 @@ project(':tools') {
implementation project(':group-coordinator')
implementation project(':coordinator-common')
implementation project(':share-coordinator')
implementation project(':raft')
implementation libs.argparse4j
implementation libs.jacksonDatabind
implementation libs.jacksonDataformatCsv

View File

@ -295,6 +295,7 @@
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metadata.properties" />
<allow pkg="org.apache.kafka.network" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="kafka.admin" />
<allow pkg="kafka.server" />

View File

@ -29,7 +29,7 @@ import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, MetadataLogConfig, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.snapshot.FileRawSnapshotReader
@ -58,10 +58,11 @@ final class KafkaMetadataLog private (
// polling thread when snapshots are created. This object is also used to store any opened snapshot reader.
snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
topicPartition: TopicPartition,
config: MetadataLogConfig
config: MetadataLogConfig,
nodeId: Int
) extends ReplicatedLog with Logging {
this.logIdent = s"[MetadataLog partition=$topicPartition, nodeId=${config.nodeId}] "
this.logIdent = s"[MetadataLog partition=$topicPartition, nodeId=$nodeId] "
override def read(startOffset: Long, readIsolation: Isolation): LogFetchInfo = {
val isolation = readIsolation match {
@ -581,7 +582,8 @@ object KafkaMetadataLog extends Logging {
dataDir: File,
time: Time,
scheduler: Scheduler,
config: MetadataLogConfig
config: MetadataLogConfig,
nodeId: Int
): KafkaMetadataLog = {
val props = new Properties()
props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, config.maxBatchSizeInBytes.toString)
@ -597,7 +599,7 @@ object KafkaMetadataLog extends Logging {
if (config.logSegmentBytes < config.logSegmentMinBytes) {
throw new InvalidConfigurationException(
s"Cannot set ${KRaftConfigs.METADATA_LOG_SEGMENT_BYTES_CONFIG} below ${config.logSegmentMinBytes}: ${config.logSegmentBytes}"
s"Cannot set ${MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG} below ${config.logSegmentMinBytes}: ${config.logSegmentBytes}"
)
} else if (defaultLogConfig.retentionMs >= 0) {
throw new InvalidConfigurationException(
@ -631,12 +633,13 @@ object KafkaMetadataLog extends Logging {
scheduler,
recoverSnapshots(log),
topicPartition,
config
config,
nodeId
)
// Print a warning if users have overridden the internal config
if (config.logSegmentMinBytes != KafkaRaftClient.MAX_BATCH_SIZE_BYTES) {
metadataLog.error(s"Overriding ${KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG} is only supported for testing. Setting " +
metadataLog.error(s"Overriding ${MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG} is only supported for testing. Setting " +
s"this value too low may lead to an inability to write batches of metadata records.")
}

View File

@ -45,7 +45,6 @@ import org.apache.kafka.raft.{Endpoints, ExternalKRaftMetrics, FileQuorumStateSt
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.Feature
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.util.{FileLock, KafkaScheduler}
import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.server.util.timer.SystemTimer
@ -231,15 +230,8 @@ class KafkaRaftManager[T](
dataDir,
time,
scheduler,
config = new MetadataLogConfig(config.metadataLogSegmentBytes,
config.metadataLogSegmentMinBytes,
config.metadataLogSegmentMillis,
config.metadataRetentionBytes,
config.metadataRetentionMillis,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
config.metadataNodeIDConfig)
config = new MetadataLogConfig(config),
config.nodeId
)
}

View File

@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
@ -239,18 +239,12 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
}
def metadataLogDir: String = {
Option(getString(KRaftConfigs.METADATA_LOG_DIR_CONFIG)) match {
Option(getString(MetadataLogConfig.METADATA_LOG_DIR_CONFIG)) match {
case Some(dir) => dir
case None => logDirs.head
}
}
def metadataLogSegmentBytes = getInt(KRaftConfigs.METADATA_LOG_SEGMENT_BYTES_CONFIG)
def metadataLogSegmentMillis = getLong(KRaftConfigs.METADATA_LOG_SEGMENT_MILLIS_CONFIG)
def metadataRetentionBytes = getLong(KRaftConfigs.METADATA_MAX_RETENTION_BYTES_CONFIG)
def metadataRetentionMillis = getLong(KRaftConfigs.METADATA_MAX_RETENTION_MILLIS_CONFIG)
def metadataNodeIDConfig = getInt(KRaftConfigs.NODE_ID_CONFIG)
def metadataLogSegmentMinBytes = getInt(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG)
val serverMaxStartupTimeMs = getLong(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG)
def messageMaxBytes = getInt(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG)
@ -264,10 +258,10 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
}
/************* Metadata Configuration ***********/
val metadataSnapshotMaxNewRecordBytes = getLong(KRaftConfigs.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG)
val metadataSnapshotMaxIntervalMs = getLong(KRaftConfigs.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG)
val metadataSnapshotMaxNewRecordBytes = getLong(MetadataLogConfig.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG)
val metadataSnapshotMaxIntervalMs = getLong(MetadataLogConfig.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG)
val metadataMaxIdleIntervalNs: Option[Long] = {
val value = TimeUnit.NANOSECONDS.convert(getInt(KRaftConfigs.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG).toLong, TimeUnit.MILLISECONDS)
val value = TimeUnit.NANOSECONDS.convert(getInt(MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG).toLong, TimeUnit.MILLISECONDS)
if (value > 0) Some(value) else None
}

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.MetadataLogConfig
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@ -74,7 +75,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,share"))
cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true"))
}
cfgs.foreach(_.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, TestUtils.tempDir().getAbsolutePath))
cfgs.foreach(_.setProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, TestUtils.tempDir().getAbsolutePath))
insertControllerListenersIfNeeded(cfgs)
cfgs.map(KafkaConfig.fromProps)
}

View File

@ -56,7 +56,8 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.raft.MetadataLogConfig
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.ShutdownableThread
@ -1099,7 +1100,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
def testServersCanStartWithInvalidStaticConfigsAndValidDynamicConfigs(groupProtocol: String): Unit = {
// modify snapshot interval config to explicitly take snapshot on a broker with valid dynamic configs
val props = defaultStaticConfig(numServers)
props.put(KRaftConfigs.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, "10000")
props.put(MetadataLogConfig.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, "10000")
val kafkaConfig = KafkaConfig.fromProps(props)
val newBroker = createBroker(kafkaConfig).asInstanceOf[BrokerServer]

View File

@ -38,7 +38,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
import org.apache.kafka.metadata.storage.Formatter
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.queue.KafkaEventQueue
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.server.{ClientMetricsManager, ServerSocketFactory}
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVersion, TransactionVersion}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs}
@ -261,7 +261,7 @@ abstract class QuorumTestHarness extends Logging {
}
val nodeId = Integer.parseInt(props.getProperty(KRaftConfigs.NODE_ID_CONFIG))
val metadataDir = TestUtils.tempDir()
props.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, metadataDir.getAbsolutePath)
props.setProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, metadataDir.getAbsolutePath)
val proto = controllerListenerSecurityProtocol.toString
val securityProtocolMaps = extraControllerSecurityProtocols().map(sc => sc + ":" + sc).mkString(",")
val listeners = extraControllerSecurityProtocols().map(sc => sc + "://localhost:0").mkString(",")

View File

@ -21,7 +21,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.common.utils.BufferSupplier
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.server.config.KRaftConfigs
import org.apache.kafka.raft.MetadataLogConfig
import org.apache.kafka.snapshot.RecordsSnapshotReader
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotEquals
@ -48,8 +48,8 @@ class RaftClusterSnapshotTest {
.setNumControllerNodes(numberOfControllers)
.build()
)
.setConfigProp(KRaftConfigs.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, "10")
.setConfigProp(KRaftConfigs.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, "0")
.setConfigProp(MetadataLogConfig.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, "10")
.setConfigProp(MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, "0")
.build()
) { cluster =>
cluster.format()

View File

@ -76,35 +76,17 @@ final class KafkaMetadataLogTest {
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093")
props.put(KRaftConfigs.NODE_ID_CONFIG, Int.box(2))
props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
props.put(KRaftConfigs.METADATA_LOG_SEGMENT_BYTES_CONFIG, Int.box(10240))
props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MILLIS_CONFIG, Int.box(10 * 1024))
props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, Int.box(10240))
props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG, Int.box(10 * 1024))
assertThrows(classOf[InvalidConfigurationException], () => {
val kafkaConfig = KafkaConfig.fromProps(props)
val metadataConfig = new MetadataLogConfig(
kafkaConfig.metadataLogSegmentBytes,
kafkaConfig.metadataLogSegmentMinBytes,
kafkaConfig.metadataLogSegmentMillis,
kafkaConfig.metadataRetentionBytes,
kafkaConfig.metadataRetentionMillis,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
kafkaConfig.metadataNodeIDConfig)
val metadataConfig = new MetadataLogConfig(kafkaConfig)
buildMetadataLog(tempDir, mockTime, metadataConfig)
})
props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, Int.box(10240))
props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, Int.box(10240))
val kafkaConfig = KafkaConfig.fromProps(props)
val metadataConfig = new MetadataLogConfig(
kafkaConfig.metadataLogSegmentBytes,
kafkaConfig.metadataLogSegmentMinBytes,
kafkaConfig.metadataLogSegmentMillis,
kafkaConfig.metadataRetentionBytes,
kafkaConfig.metadataRetentionMillis,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
kafkaConfig.metadataNodeIDConfig)
val metadataConfig = new MetadataLogConfig(kafkaConfig)
buildMetadataLog(tempDir, mockTime, metadataConfig)
}
@ -713,8 +695,7 @@ final class KafkaMetadataLogTest {
DefaultMetadataLogConfig.retentionMillis,
maxBatchSizeInBytes,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
DefaultMetadataLogConfig.deleteDelayMillis,
DefaultMetadataLogConfig.nodeId
DefaultMetadataLogConfig.deleteDelayMillis
)
val log = buildMetadataLog(tempDir, mockTime, config)
@ -934,8 +915,7 @@ final class KafkaMetadataLogTest {
60 * 1000,
512,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
1
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT
)
val log = buildMetadataLog(tempDir, mockTime, config)
@ -972,8 +952,7 @@ final class KafkaMetadataLogTest {
60 * 1000,
100,
DefaultMetadataLogConfig.maxBatchSizeInBytes,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
DefaultMetadataLogConfig.nodeId
DefaultMetadataLogConfig.maxFetchSizeInBytes
)
val log = buildMetadataLog(tempDir, mockTime, config)
@ -1007,8 +986,7 @@ final class KafkaMetadataLogTest {
60 * 1000,
100,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
DefaultMetadataLogConfig.deleteDelayMillis,
DefaultMetadataLogConfig.nodeId
DefaultMetadataLogConfig.deleteDelayMillis
)
val log = buildMetadataLog(tempDir, mockTime, config)
@ -1052,8 +1030,7 @@ final class KafkaMetadataLogTest {
60 * 1000,
200,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
DefaultMetadataLogConfig.deleteDelayMillis,
DefaultMetadataLogConfig.nodeId
DefaultMetadataLogConfig.deleteDelayMillis
)
val log = buildMetadataLog(tempDir, mockTime, config)
@ -1112,8 +1089,7 @@ object KafkaMetadataLogTest {
60 * 1000,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
1
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT
)
def buildMetadataLogAndDir(
@ -1133,7 +1109,8 @@ object KafkaMetadataLogTest {
logDir,
time,
time.scheduler,
metadataLogConfig
metadataLogConfig,
1
)
(logDir.toPath, metadataLog, metadataLogConfig)

View File

@ -33,7 +33,6 @@ import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.config.KRaftConfigs
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
@ -57,6 +56,7 @@ import org.mockito.Mockito.{doAnswer, doThrow, spy}
import net.jqwik.api.AfterFailureMode
import net.jqwik.api.ForAll
import net.jqwik.api.Property
import org.apache.kafka.server.config.KRaftConfigs
import java.io._
import java.nio.ByteBuffer

View File

@ -31,8 +31,7 @@ import org.apache.kafka.common.Uuid
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.Endpoints
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.raft.{Endpoints, MetadataLogConfig, QuorumConfig}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.fault.FaultHandler
@ -58,7 +57,7 @@ class RaftManagerTest {
props.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, value.toString)
}
metadataDir.foreach { value =>
props.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, value.toString)
props.setProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, value.toString)
}
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, processRoles.mkString(","))
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)

View File

@ -31,7 +31,7 @@ import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.DynamicThreadPool
import org.apache.kafka.server.authorizer._
@ -673,11 +673,11 @@ class DynamicBrokerConfigTest {
@Test
def testNonInternalValuesDoesNotExposeInternalConfigs(): Unit = {
val props = TestUtils.createBrokerConfig(0, port = 8181)
props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, "1024")
props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, "1024")
val config = new KafkaConfig(props)
assertFalse(config.nonInternalValues.containsKey(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG))
assertFalse(config.nonInternalValues.containsKey(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG))
config.updateCurrentConfig(new KafkaConfig(props))
assertFalse(config.nonInternalValues.containsKey(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG))
assertFalse(config.nonInternalValues.containsKey(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG))
}
@Test

View File

@ -36,7 +36,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.EndPoint
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
@ -44,7 +44,6 @@ import org.apache.kafka.storage.internals.log.CleanerConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable
import org.apache.kafka.common.test.{TestUtils => JTestUtils}
import scala.jdk.CollectionConverters._
@ -790,13 +789,13 @@ class KafkaConfigTest {
case KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KRaftConfigs.NODE_ID_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KRaftConfigs.METADATA_LOG_DIR_CONFIG => // ignore string
case KRaftConfigs.METADATA_LOG_SEGMENT_BYTES_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KRaftConfigs.METADATA_LOG_SEGMENT_MILLIS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KRaftConfigs.METADATA_MAX_RETENTION_BYTES_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KRaftConfigs.METADATA_MAX_RETENTION_MILLIS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case MetadataLogConfig.METADATA_LOG_DIR_CONFIG => // ignore string
case MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case MetadataLogConfig.METADATA_MAX_RETENTION_BYTES_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case MetadataLogConfig.METADATA_MAX_RETENTION_MILLIS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG => // ignore string
case KRaftConfigs.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG => //ignore string
case ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG => //ignore string
@ -1494,7 +1493,7 @@ class KafkaConfigTest {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
props.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, metadataDir)
props.setProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, metadataDir)
props.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, dataDir)
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
@ -1674,12 +1673,12 @@ class KafkaConfigTest {
val validValue = 100
val props = new Properties()
props.putAll(kraftProps())
props.setProperty(KRaftConfigs.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, validValue.toString)
props.setProperty(MetadataLogConfig.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, validValue.toString)
val config = KafkaConfig.fromProps(props)
assertEquals(validValue, config.metadataSnapshotMaxIntervalMs)
props.setProperty(KRaftConfigs.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, "-1")
props.setProperty(MetadataLogConfig.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, "-1")
val errorMessage = assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage
assertEquals(

View File

@ -23,7 +23,7 @@ import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
import org.apache.kafka.server.common.MetadataVersion
@ -159,7 +159,7 @@ class KafkaRaftServerTest {
configProperties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"${nodeId + 1}@localhost:9092")
configProperties.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
configProperties.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG, invalidDir.getAbsolutePath)
configProperties.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, invalidDir.getAbsolutePath)
configProperties.put(ServerLogConfigs.LOG_DIR_CONFIG, validDir.getAbsolutePath)
configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
val config = KafkaConfig.fromProps(configProperties)
@ -189,7 +189,7 @@ class KafkaRaftServerTest {
configProperties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
configProperties.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"${nodeId + 1}@localhost:9092")
configProperties.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG, validDir.getAbsolutePath)
configProperties.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, validDir.getAbsolutePath)
configProperties.put(ServerLogConfigs.LOG_DIR_CONFIG, invalidDir.getAbsolutePath)
configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
val config = KafkaConfig.fromProps(configProperties)
@ -225,7 +225,7 @@ class KafkaRaftServerTest {
configProperties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
configProperties.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"${nodeId + 1}@localhost:9092")
configProperties.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG, metadataDir.getAbsolutePath)
configProperties.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, metadataDir.getAbsolutePath)
configProperties.put(ServerLogConfigs.LOG_DIR_CONFIG, dataDir.getAbsolutePath)
configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
val config = KafkaConfig.fromProps(configProperties)

View File

@ -552,9 +552,9 @@ class DumpLogSegmentsTest {
60 * 1000,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
1
)
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT
),
1
)
val lastContainedLogTimestamp = 10000

View File

@ -32,7 +32,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble, PropertiesUtils}
import org.apache.kafka.metadata.storage.FormatterException
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
import org.junit.jupiter.api.{Test, Timeout}
@ -67,7 +67,7 @@ class StorageToolTest {
@Test
def testConfigToLogDirectoriesWithMetaLogDir(): Unit = {
val properties = newSelfManagedProperties()
properties.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, "/tmp/baz")
properties.setProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, "/tmp/baz")
val config = new KafkaConfig(properties)
assertEquals(Seq("/tmp/bar", "/tmp/baz", "/tmp/foo"),
StorageTool.configToLogDirectories(config))

View File

@ -16,25 +16,159 @@
*/
package org.apache.kafka.raft;
/**
* Configuration for the metadata log
* @param logSegmentBytes The maximum size of a single metadata log file
* @param logSegmentMinBytes The minimum size of a single metadata log file
* @param logSegmentMillis The maximum time before a new metadata log file is rolled out
* @param retentionMaxBytes The size of the metadata log and snapshots before deleting old snapshots and log files
* @param retentionMillis The time to keep a metadata log file or snapshot before deleting it
* @param maxBatchSizeInBytes The largest record batch size allowed in the metadata log
* @param maxFetchSizeInBytes The maximum number of bytes to read when fetching from the metadata log
* @param deleteDelayMillis The amount of time to wait before deleting a file from the filesystem
* @param nodeId The node id
*/
public record MetadataLogConfig(int logSegmentBytes,
int logSegmentMinBytes,
long logSegmentMillis,
long retentionMaxBytes,
long retentionMillis,
int maxBatchSizeInBytes,
int maxFetchSizeInBytes,
long deleteDelayMillis,
int nodeId) {
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.server.config.ServerLogConfigs;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
public class MetadataLogConfig {
public static final String METADATA_LOG_DIR_CONFIG = "metadata.log.dir";
public static final String METADATA_LOG_DIR_DOC = "This configuration determines where we put the metadata log. " +
"If it is not set, the metadata log is placed in the first log directory from log.dirs.";
public static final String METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG = "metadata.log.max.snapshot.interval.ms";
public static final long METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT = TimeUnit.HOURS.toMillis(1);
public static final String METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG = "metadata.log.max.record.bytes.between.snapshots";
public static final int METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES = 20 * 1024 * 1024;
public static final String METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC = "This is the maximum number of bytes in the log between the latest " +
"snapshot and the high-watermark needed before generating a new snapshot. The default value is " +
METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES + ". To generate snapshots based on the time elapsed, see the <code>" +
METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG + "</code> configuration. The Kafka node will generate a snapshot when " +
"either the maximum time interval is reached or the maximum bytes limit is reached.";
public static final String METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC = "This is the maximum number of milliseconds to wait to generate a snapshot " +
"if there are committed records in the log that are not included in the latest snapshot. A value of zero disables " +
"time based snapshot generation. The default value is " + METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT + ". To generate " +
"snapshots based on the number of metadata bytes, see the <code>" + METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG + "</code> " +
"configuration. The Kafka node will generate a snapshot when either the maximum time interval is reached or the " +
"maximum bytes limit is reached.";
public static final String METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG = "metadata.log.segment.min.bytes";
public static final String METADATA_LOG_SEGMENT_MIN_BYTES_DOC = "Override the minimum size for a single metadata log file. This should be used for testing only.";
public static final int METADATA_LOG_SEGMENT_MIN_BYTES_DEFAULT = 8 * 1024 * 1024;
public static final String METADATA_LOG_SEGMENT_BYTES_CONFIG = "metadata.log.segment.bytes";
public static final String METADATA_LOG_SEGMENT_BYTES_DOC = "The maximum size of a single metadata log file.";
public static final int METADATA_LOG_SEGMENT_BYTES_DEFAULT = 1024 * 1024 * 1024;
public static final String METADATA_LOG_SEGMENT_MILLIS_CONFIG = "metadata.log.segment.ms";
public static final String METADATA_LOG_SEGMENT_MILLIS_DOC = "The maximum time before a new metadata log file is rolled out (in milliseconds).";
public static final long METADATA_LOG_SEGMENT_MILLIS_DEFAULT = 24 * 7 * 60 * 60 * 1000L;
public static final String METADATA_MAX_RETENTION_BYTES_CONFIG = "metadata.max.retention.bytes";
public static final int METADATA_MAX_RETENTION_BYTES_DEFAULT = 100 * 1024 * 1024;
public static final String METADATA_MAX_RETENTION_BYTES_DOC = "The maximum combined size of the metadata log and snapshots before deleting old " +
"snapshots and log files. Since at least one snapshot must exist before any logs can be deleted, this is a soft limit.";
public static final String METADATA_MAX_RETENTION_MILLIS_CONFIG = "metadata.max.retention.ms";
public static final String METADATA_MAX_RETENTION_MILLIS_DOC = "The number of milliseconds to keep a metadata log file or snapshot before " +
"deleting it. Since at least one snapshot must exist before any logs can be deleted, this is a soft limit.";
public static final long METADATA_MAX_RETENTION_MILLIS_DEFAULT = 24 * 7 * 60 * 60 * 1000L;
public static final String METADATA_MAX_IDLE_INTERVAL_MS_CONFIG = "metadata.max.idle.interval.ms";
public static final int METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT = 500;
public static final String METADATA_MAX_IDLE_INTERVAL_MS_DOC = "This configuration controls how often the active " +
"controller should write no-op records to the metadata partition. If the value is 0, no-op records " +
"are not appended to the metadata partition. The default value is " + METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT;
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC)
.define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC)
.define(METADATA_LOG_DIR_CONFIG, STRING, null, null, HIGH, METADATA_LOG_DIR_DOC)
.define(METADATA_LOG_SEGMENT_BYTES_CONFIG, INT, METADATA_LOG_SEGMENT_BYTES_DEFAULT, atLeast(Records.LOG_OVERHEAD), HIGH, METADATA_LOG_SEGMENT_BYTES_DOC)
.defineInternal(METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, INT, METADATA_LOG_SEGMENT_MIN_BYTES_DEFAULT, atLeast(Records.LOG_OVERHEAD), HIGH, METADATA_LOG_SEGMENT_MIN_BYTES_DOC)
.define(METADATA_LOG_SEGMENT_MILLIS_CONFIG, LONG, METADATA_LOG_SEGMENT_MILLIS_DEFAULT, null, HIGH, METADATA_LOG_SEGMENT_MILLIS_DOC)
.define(METADATA_MAX_RETENTION_BYTES_CONFIG, LONG, METADATA_MAX_RETENTION_BYTES_DEFAULT, null, HIGH, METADATA_MAX_RETENTION_BYTES_DOC)
.define(METADATA_MAX_RETENTION_MILLIS_CONFIG, LONG, METADATA_MAX_RETENTION_MILLIS_DEFAULT, null, HIGH, METADATA_MAX_RETENTION_MILLIS_DOC)
.define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, METADATA_MAX_IDLE_INTERVAL_MS_DOC);
private final int logSegmentBytes;
private final int logSegmentMinBytes;
private final long logSegmentMillis;
private final long retentionMaxBytes;
private final long retentionMillis;
private final int maxBatchSizeInBytes;
private final int maxFetchSizeInBytes;
private final long deleteDelayMillis;
/**
* Configuration for the metadata log
* @param logSegmentBytes The maximum size of a single metadata log file
* @param logSegmentMinBytes The minimum size of a single metadata log file
* @param logSegmentMillis The maximum time before a new metadata log file is rolled out
* @param retentionMaxBytes The size of the metadata log and snapshots before deleting old snapshots and log files
* @param retentionMillis The time to keep a metadata log file or snapshot before deleting it
* @param maxBatchSizeInBytes The largest record batch size allowed in the metadata log
* @param maxFetchSizeInBytes The maximum number of bytes to read when fetching from the metadata log
* @param deleteDelayMillis The amount of time to wait before deleting a file from the filesystem
*/
public MetadataLogConfig(int logSegmentBytes,
int logSegmentMinBytes,
long logSegmentMillis,
long retentionMaxBytes,
long retentionMillis,
int maxBatchSizeInBytes,
int maxFetchSizeInBytes,
long deleteDelayMillis) {
this.logSegmentBytes = logSegmentBytes;
this.logSegmentMinBytes = logSegmentMinBytes;
this.logSegmentMillis = logSegmentMillis;
this.retentionMaxBytes = retentionMaxBytes;
this.retentionMillis = retentionMillis;
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
this.maxFetchSizeInBytes = maxFetchSizeInBytes;
this.deleteDelayMillis = deleteDelayMillis;
}
public MetadataLogConfig(AbstractConfig config) {
this.logSegmentBytes = config.getInt(METADATA_LOG_SEGMENT_BYTES_CONFIG);
this.logSegmentMinBytes = config.getInt(METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG);
this.logSegmentMillis = config.getLong(METADATA_LOG_SEGMENT_MILLIS_CONFIG);
this.retentionMaxBytes = config.getLong(METADATA_MAX_RETENTION_BYTES_CONFIG);
this.retentionMillis = config.getLong(METADATA_MAX_RETENTION_MILLIS_CONFIG);
this.maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES;
this.maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES;
this.deleteDelayMillis = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT;
}
public int logSegmentBytes() {
return logSegmentBytes;
}
public int logSegmentMinBytes() {
return logSegmentMinBytes;
}
public long logSegmentMillis() {
return logSegmentMillis;
}
public long retentionMaxBytes() {
return retentionMaxBytes;
}
public long retentionMillis() {
return retentionMillis;
}
public int maxBatchSizeInBytes() {
return maxBatchSizeInBytes;
}
public int maxFetchSizeInBytes() {
return maxFetchSizeInBytes;
}
public long deleteDelayMillis() {
return deleteDelayMillis;
}
}

View File

@ -27,6 +27,7 @@ import org.apache.kafka.coordinator.transaction.AddPartitionsToTxnConfig;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.raft.MetadataLogConfig;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.metrics.MetricConfigs;
@ -47,6 +48,7 @@ public abstract class AbstractKafkaConfig extends AbstractConfig {
RemoteLogManagerConfig.configDef(),
ServerConfigs.CONFIG_DEF,
KRaftConfigs.CONFIG_DEF,
MetadataLogConfig.CONFIG_DEF,
SocketServerConfigs.CONFIG_DEF,
ReplicationConfigs.CONFIG_DEF,
GroupCoordinatorConfig.CONFIG_DEF,

View File

@ -18,13 +18,8 @@ package org.apache.kafka.server.config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.storage.internals.log.LogConfig;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
@ -54,26 +49,6 @@ public class KRaftConfigs {
public static final String NODE_ID_DOC = "The node ID associated with the roles this process is playing when <code>process.roles</code> is non-empty. " +
"This is required configuration when running in KRaft mode.";
public static final String METADATA_LOG_DIR_CONFIG = "metadata.log.dir";
public static final String METADATA_LOG_DIR_DOC = "This configuration determines where we put the metadata log. " +
"If it is not set, the metadata log is placed in the first log directory from log.dirs.";
public static final String METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG = "metadata.log.max.snapshot.interval.ms";
public static final long METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT = TimeUnit.HOURS.toMillis(1);
public static final String METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG = "metadata.log.max.record.bytes.between.snapshots";
public static final int METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES = 20 * 1024 * 1024;
public static final String METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC = "This is the maximum number of bytes in the log between the latest " +
"snapshot and the high-watermark needed before generating a new snapshot. The default value is " +
METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES + ". To generate snapshots based on the time elapsed, see the <code>" +
METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG + "</code> configuration. The Kafka node will generate a snapshot when " +
"either the maximum time interval is reached or the maximum bytes limit is reached.";
public static final String METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC = "This is the maximum number of milliseconds to wait to generate a snapshot " +
"if there are committed records in the log that are not included in the latest snapshot. A value of zero disables " +
"time based snapshot generation. The default value is " + METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT + ". To generate " +
"snapshots based on the number of metadata bytes, see the <code>" + METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG + "</code> " +
"configuration. The Kafka node will generate a snapshot when either the maximum time interval is reached or the " +
"maximum bytes limit is reached.";
public static final String CONTROLLER_LISTENER_NAMES_CONFIG = "controller.listener.names";
public static final String CONTROLLER_LISTENER_NAMES_DOC = "A comma-separated list of the names of the listeners used by the controller. This is required " +
"when communicating with the controller quorum, the broker will always use the first listener in this list.";
@ -81,30 +56,6 @@ public class KRaftConfigs {
public static final String SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG = "sasl.mechanism.controller.protocol";
public static final String SASL_MECHANISM_CONTROLLER_PROTOCOL_DOC = "SASL mechanism used for communication with controllers. Default is GSSAPI.";
public static final String METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG = "metadata.log.segment.min.bytes";
public static final String METADATA_LOG_SEGMENT_MIN_BYTES_DOC = "Override the minimum size for a single metadata log file. This should be used for testing only.";
public static final String METADATA_LOG_SEGMENT_BYTES_CONFIG = "metadata.log.segment.bytes";
public static final String METADATA_LOG_SEGMENT_BYTES_DOC = "The maximum size of a single metadata log file.";
public static final String METADATA_LOG_SEGMENT_MILLIS_CONFIG = "metadata.log.segment.ms";
public static final String METADATA_LOG_SEGMENT_MILLIS_DOC = "The maximum time before a new metadata log file is rolled out (in milliseconds).";
public static final String METADATA_MAX_RETENTION_BYTES_CONFIG = "metadata.max.retention.bytes";
public static final int METADATA_MAX_RETENTION_BYTES_DEFAULT = 100 * 1024 * 1024;
public static final String METADATA_MAX_RETENTION_BYTES_DOC = "The maximum combined size of the metadata log and snapshots before deleting old " +
"snapshots and log files. Since at least one snapshot must exist before any logs can be deleted, this is a soft limit.";
public static final String METADATA_MAX_RETENTION_MILLIS_CONFIG = "metadata.max.retention.ms";
public static final String METADATA_MAX_RETENTION_MILLIS_DOC = "The number of milliseconds to keep a metadata log file or snapshot before " +
"deleting it. Since at least one snapshot must exist before any logs can be deleted, this is a soft limit.";
public static final String METADATA_MAX_IDLE_INTERVAL_MS_CONFIG = "metadata.max.idle.interval.ms";
public static final int METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT = 500;
public static final String METADATA_MAX_IDLE_INTERVAL_MS_DOC = "This configuration controls how often the active " +
"controller should write no-op records to the metadata partition. If the value is 0, no-op records " +
"are not appended to the metadata partition. The default value is " + METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT;
public static final String SERVER_MAX_STARTUP_TIME_MS_CONFIG = "server.max.startup.time.ms";
public static final long SERVER_MAX_STARTUP_TIME_MS_DEFAULT = Long.MAX_VALUE;
public static final String SERVER_MAX_STARTUP_TIME_MS_DOC = "The maximum number of milliseconds we will wait for the server to come up. " +
@ -119,8 +70,6 @@ public class KRaftConfigs {
public static final String CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DOC = "We will log an error message about controller events that take longer than this threshold.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC)
.define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC)
.define(PROCESS_ROLES_CONFIG, LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.ValidList.in("broker", "controller"), HIGH, PROCESS_ROLES_DOC)
.define(NODE_ID_CONFIG, INT, ConfigDef.NO_DEFAULT_VALUE, atLeast(0), HIGH, NODE_ID_DOC)
.define(INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG, INT, INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DEFAULT, null, MEDIUM, INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DOC)
@ -128,13 +77,6 @@ public class KRaftConfigs {
.define(BROKER_SESSION_TIMEOUT_MS_CONFIG, INT, BROKER_SESSION_TIMEOUT_MS_DEFAULT, null, MEDIUM, BROKER_SESSION_TIMEOUT_MS_DOC)
.define(CONTROLLER_LISTENER_NAMES_CONFIG, STRING, null, null, HIGH, CONTROLLER_LISTENER_NAMES_DOC)
.define(SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH, SASL_MECHANISM_CONTROLLER_PROTOCOL_DOC)
.define(METADATA_LOG_DIR_CONFIG, STRING, null, null, HIGH, METADATA_LOG_DIR_DOC)
.define(METADATA_LOG_SEGMENT_BYTES_CONFIG, INT, LogConfig.DEFAULT_SEGMENT_BYTES, atLeast(Records.LOG_OVERHEAD), HIGH, METADATA_LOG_SEGMENT_BYTES_DOC)
.defineInternal(METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, INT, 8 * 1024 * 1024, atLeast(Records.LOG_OVERHEAD), HIGH, METADATA_LOG_SEGMENT_MIN_BYTES_DOC)
.define(METADATA_LOG_SEGMENT_MILLIS_CONFIG, LONG, LogConfig.DEFAULT_SEGMENT_MS, null, HIGH, METADATA_LOG_SEGMENT_MILLIS_DOC)
.define(METADATA_MAX_RETENTION_BYTES_CONFIG, LONG, METADATA_MAX_RETENTION_BYTES_DEFAULT, null, HIGH, METADATA_MAX_RETENTION_BYTES_DOC)
.define(METADATA_MAX_RETENTION_MILLIS_CONFIG, LONG, LogConfig.DEFAULT_RETENTION_MS, null, HIGH, METADATA_MAX_RETENTION_MILLIS_DOC)
.define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, METADATA_MAX_IDLE_INTERVAL_MS_DOC)
.defineInternal(CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS, LONG, CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS_DEFAULT, atLeast(100), MEDIUM, CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS_DOC)
.defineInternal(CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS, LONG, CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DEFAULT, atLeast(0), MEDIUM, CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DOC)
.defineInternal(SERVER_MAX_STARTUP_TIME_MS_CONFIG, LONG, SERVER_MAX_STARTUP_TIME_MS_DEFAULT, atLeast(0), MEDIUM, SERVER_MAX_STARTUP_TIME_MS_DOC);

View File

@ -40,6 +40,7 @@ import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.metadata.storage.Formatter;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.raft.DynamicVoters;
import org.apache.kafka.raft.MetadataLogConfig;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
@ -140,11 +141,11 @@ public class KafkaClusterTestKit implements AutoCloseable {
Integer.toString(node.id()));
// In combined mode, always prefer the metadata log directory of the controller node.
if (controllerNode != null) {
props.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG,
props.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG,
controllerNode.metadataDirectory());
setSecurityProtocolProps(props, controllerSecurityProtocol);
} else {
props.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG,
props.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG,
node.metadataDirectory());
}
if (brokerNode != null) {

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.raft.MetadataLogConfig;
import org.apache.kafka.server.config.KRaftConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.util.CommandLineUtils;
@ -359,8 +360,8 @@ public class MetadataQuorumCommand {
}
static String getMetadataDirectory(Properties props) throws TerseException {
if (props.containsKey(KRaftConfigs.METADATA_LOG_DIR_CONFIG)) {
return props.getProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG);
if (props.containsKey(MetadataLogConfig.METADATA_LOG_DIR_CONFIG)) {
return props.getProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG);
}
if (props.containsKey(ServerLogConfigs.LOG_DIRS_CONFIG)) {
String[] logDirs = props.getProperty(ServerLogConfigs.LOG_DIRS_CONFIG).trim().split(",");
@ -368,7 +369,7 @@ public class MetadataQuorumCommand {
return logDirs[0];
}
}
throw new TerseException("Neither " + KRaftConfigs.METADATA_LOG_DIR_CONFIG + " nor " +
throw new TerseException("Neither " + MetadataLogConfig.METADATA_LOG_DIR_CONFIG + " nor " +
ServerLogConfigs.LOG_DIRS_CONFIG + " were found. Is this a valid controller " +
"configuration file?");
}