diff --git a/build.gradle b/build.gradle
index 024c428b9fc..9b1971feed9 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2416,6 +2416,7 @@ project(':tools') {
implementation project(':group-coordinator')
implementation project(':coordinator-common')
implementation project(':share-coordinator')
+ implementation project(':raft')
implementation libs.argparse4j
implementation libs.jacksonDatabind
implementation libs.jacksonDataformatCsv
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 15ed859b817..ead4111389b 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -295,6 +295,7 @@
+
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 2b7404ef1e7..01f69b374bc 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, MetadataLogConfig, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
-import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
+import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.snapshot.FileRawSnapshotReader
@@ -58,10 +58,11 @@ final class KafkaMetadataLog private (
// polling thread when snapshots are created. This object is also used to store any opened snapshot reader.
snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
topicPartition: TopicPartition,
- config: MetadataLogConfig
+ config: MetadataLogConfig,
+ nodeId: Int
) extends ReplicatedLog with Logging {
- this.logIdent = s"[MetadataLog partition=$topicPartition, nodeId=${config.nodeId}] "
+ this.logIdent = s"[MetadataLog partition=$topicPartition, nodeId=$nodeId] "
override def read(startOffset: Long, readIsolation: Isolation): LogFetchInfo = {
val isolation = readIsolation match {
@@ -581,7 +582,8 @@ object KafkaMetadataLog extends Logging {
dataDir: File,
time: Time,
scheduler: Scheduler,
- config: MetadataLogConfig
+ config: MetadataLogConfig,
+ nodeId: Int
): KafkaMetadataLog = {
val props = new Properties()
props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, config.maxBatchSizeInBytes.toString)
@@ -597,7 +599,7 @@ object KafkaMetadataLog extends Logging {
if (config.logSegmentBytes < config.logSegmentMinBytes) {
throw new InvalidConfigurationException(
- s"Cannot set ${KRaftConfigs.METADATA_LOG_SEGMENT_BYTES_CONFIG} below ${config.logSegmentMinBytes}: ${config.logSegmentBytes}"
+ s"Cannot set ${MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG} below ${config.logSegmentMinBytes}: ${config.logSegmentBytes}"
)
} else if (defaultLogConfig.retentionMs >= 0) {
throw new InvalidConfigurationException(
@@ -631,12 +633,13 @@ object KafkaMetadataLog extends Logging {
scheduler,
recoverSnapshots(log),
topicPartition,
- config
+ config,
+ nodeId
)
// Print a warning if users have overridden the internal config
if (config.logSegmentMinBytes != KafkaRaftClient.MAX_BATCH_SIZE_BYTES) {
- metadataLog.error(s"Overriding ${KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG} is only supported for testing. Setting " +
+ metadataLog.error(s"Overriding ${MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG} is only supported for testing. Setting " +
s"this value too low may lead to an inability to write batches of metadata records.")
}
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index 522c37c0829..84dfa5ebee0 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -45,7 +45,6 @@ import org.apache.kafka.raft.{Endpoints, ExternalKRaftMetrics, FileQuorumStateSt
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.Feature
import org.apache.kafka.server.common.serialization.RecordSerde
-import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.util.{FileLock, KafkaScheduler}
import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.server.util.timer.SystemTimer
@@ -231,15 +230,8 @@ class KafkaRaftManager[T](
dataDir,
time,
scheduler,
- config = new MetadataLogConfig(config.metadataLogSegmentBytes,
- config.metadataLogSegmentMinBytes,
- config.metadataLogSegmentMillis,
- config.metadataRetentionBytes,
- config.metadataRetentionMillis,
- KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
- KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
- ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
- config.metadataNodeIDConfig)
+ config = new MetadataLogConfig(config),
+ config.nodeId
)
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index f5b0b7b271c..7049d5f2474 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.raft.QuorumConfig
+import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
@@ -239,18 +239,12 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
}
def metadataLogDir: String = {
- Option(getString(KRaftConfigs.METADATA_LOG_DIR_CONFIG)) match {
+ Option(getString(MetadataLogConfig.METADATA_LOG_DIR_CONFIG)) match {
case Some(dir) => dir
case None => logDirs.head
}
}
- def metadataLogSegmentBytes = getInt(KRaftConfigs.METADATA_LOG_SEGMENT_BYTES_CONFIG)
- def metadataLogSegmentMillis = getLong(KRaftConfigs.METADATA_LOG_SEGMENT_MILLIS_CONFIG)
- def metadataRetentionBytes = getLong(KRaftConfigs.METADATA_MAX_RETENTION_BYTES_CONFIG)
- def metadataRetentionMillis = getLong(KRaftConfigs.METADATA_MAX_RETENTION_MILLIS_CONFIG)
- def metadataNodeIDConfig = getInt(KRaftConfigs.NODE_ID_CONFIG)
- def metadataLogSegmentMinBytes = getInt(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG)
val serverMaxStartupTimeMs = getLong(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG)
def messageMaxBytes = getInt(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG)
@@ -264,10 +258,10 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
}
/************* Metadata Configuration ***********/
- val metadataSnapshotMaxNewRecordBytes = getLong(KRaftConfigs.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG)
- val metadataSnapshotMaxIntervalMs = getLong(KRaftConfigs.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG)
+ val metadataSnapshotMaxNewRecordBytes = getLong(MetadataLogConfig.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG)
+ val metadataSnapshotMaxIntervalMs = getLong(MetadataLogConfig.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG)
val metadataMaxIdleIntervalNs: Option[Long] = {
- val value = TimeUnit.NANOSECONDS.convert(getInt(KRaftConfigs.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG).toLong, TimeUnit.MILLISECONDS)
+ val value = TimeUnit.NANOSECONDS.convert(getInt(MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG).toLong, TimeUnit.MILLISECONDS)
if (value > 0) Some(value) else None
}
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index ed5611eb74b..6b23b2b9d3d 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.network.SocketServerConfigs
+import org.apache.kafka.raft.MetadataLogConfig
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@@ -74,7 +75,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,share"))
cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true"))
}
- cfgs.foreach(_.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, TestUtils.tempDir().getAbsolutePath))
+ cfgs.foreach(_.setProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, TestUtils.tempDir().getAbsolutePath))
insertControllerListenersIfNeeded(cfgs)
cfgs.map(KafkaConfig.fromProps)
}
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index d5fbdf78de1..3c5cd9396bd 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -56,7 +56,8 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
+import org.apache.kafka.raft.MetadataLogConfig
+import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.ShutdownableThread
@@ -1099,7 +1100,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
def testServersCanStartWithInvalidStaticConfigsAndValidDynamicConfigs(groupProtocol: String): Unit = {
// modify snapshot interval config to explicitly take snapshot on a broker with valid dynamic configs
val props = defaultStaticConfig(numServers)
- props.put(KRaftConfigs.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, "10000")
+ props.put(MetadataLogConfig.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, "10000")
val kafkaConfig = KafkaConfig.fromProps(props)
val newBroker = createBroker(kafkaConfig).asInstanceOf[BrokerServer]
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 1f4e94811c1..5e3f421caf2 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
import org.apache.kafka.metadata.storage.Formatter
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.queue.KafkaEventQueue
-import org.apache.kafka.raft.QuorumConfig
+import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.server.{ClientMetricsManager, ServerSocketFactory}
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVersion, TransactionVersion}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs}
@@ -261,7 +261,7 @@ abstract class QuorumTestHarness extends Logging {
}
val nodeId = Integer.parseInt(props.getProperty(KRaftConfigs.NODE_ID_CONFIG))
val metadataDir = TestUtils.tempDir()
- props.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, metadataDir.getAbsolutePath)
+ props.setProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, metadataDir.getAbsolutePath)
val proto = controllerListenerSecurityProtocol.toString
val securityProtocolMaps = extraControllerSecurityProtocols().map(sc => sc + ":" + sc).mkString(",")
val listeners = extraControllerSecurityProtocols().map(sc => sc + "://localhost:0").mkString(",")
diff --git a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
index 7196f6ed7ee..2288d37aaad 100644
--- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
+++ b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
@@ -21,7 +21,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.common.utils.BufferSupplier
import org.apache.kafka.metadata.MetadataRecordSerde
-import org.apache.kafka.server.config.KRaftConfigs
+import org.apache.kafka.raft.MetadataLogConfig
import org.apache.kafka.snapshot.RecordsSnapshotReader
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotEquals
@@ -48,8 +48,8 @@ class RaftClusterSnapshotTest {
.setNumControllerNodes(numberOfControllers)
.build()
)
- .setConfigProp(KRaftConfigs.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, "10")
- .setConfigProp(KRaftConfigs.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, "0")
+ .setConfigProp(MetadataLogConfig.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, "10")
+ .setConfigProp(MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, "0")
.build()
) { cluster =>
cluster.format()
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 466fb954d13..1d3b3493cce 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -76,35 +76,17 @@ final class KafkaMetadataLogTest {
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093")
props.put(KRaftConfigs.NODE_ID_CONFIG, Int.box(2))
props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
- props.put(KRaftConfigs.METADATA_LOG_SEGMENT_BYTES_CONFIG, Int.box(10240))
- props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MILLIS_CONFIG, Int.box(10 * 1024))
+ props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, Int.box(10240))
+ props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG, Int.box(10 * 1024))
assertThrows(classOf[InvalidConfigurationException], () => {
val kafkaConfig = KafkaConfig.fromProps(props)
- val metadataConfig = new MetadataLogConfig(
- kafkaConfig.metadataLogSegmentBytes,
- kafkaConfig.metadataLogSegmentMinBytes,
- kafkaConfig.metadataLogSegmentMillis,
- kafkaConfig.metadataRetentionBytes,
- kafkaConfig.metadataRetentionMillis,
- KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
- KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
- ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
- kafkaConfig.metadataNodeIDConfig)
+ val metadataConfig = new MetadataLogConfig(kafkaConfig)
buildMetadataLog(tempDir, mockTime, metadataConfig)
})
- props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, Int.box(10240))
+ props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, Int.box(10240))
val kafkaConfig = KafkaConfig.fromProps(props)
- val metadataConfig = new MetadataLogConfig(
- kafkaConfig.metadataLogSegmentBytes,
- kafkaConfig.metadataLogSegmentMinBytes,
- kafkaConfig.metadataLogSegmentMillis,
- kafkaConfig.metadataRetentionBytes,
- kafkaConfig.metadataRetentionMillis,
- KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
- KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
- ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
- kafkaConfig.metadataNodeIDConfig)
+ val metadataConfig = new MetadataLogConfig(kafkaConfig)
buildMetadataLog(tempDir, mockTime, metadataConfig)
}
@@ -713,8 +695,7 @@ final class KafkaMetadataLogTest {
DefaultMetadataLogConfig.retentionMillis,
maxBatchSizeInBytes,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
- DefaultMetadataLogConfig.deleteDelayMillis,
- DefaultMetadataLogConfig.nodeId
+ DefaultMetadataLogConfig.deleteDelayMillis
)
val log = buildMetadataLog(tempDir, mockTime, config)
@@ -934,8 +915,7 @@ final class KafkaMetadataLogTest {
60 * 1000,
512,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
- ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
- 1
+ ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT
)
val log = buildMetadataLog(tempDir, mockTime, config)
@@ -972,8 +952,7 @@ final class KafkaMetadataLogTest {
60 * 1000,
100,
DefaultMetadataLogConfig.maxBatchSizeInBytes,
- DefaultMetadataLogConfig.maxFetchSizeInBytes,
- DefaultMetadataLogConfig.nodeId
+ DefaultMetadataLogConfig.maxFetchSizeInBytes
)
val log = buildMetadataLog(tempDir, mockTime, config)
@@ -1007,8 +986,7 @@ final class KafkaMetadataLogTest {
60 * 1000,
100,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
- DefaultMetadataLogConfig.deleteDelayMillis,
- DefaultMetadataLogConfig.nodeId
+ DefaultMetadataLogConfig.deleteDelayMillis
)
val log = buildMetadataLog(tempDir, mockTime, config)
@@ -1052,8 +1030,7 @@ final class KafkaMetadataLogTest {
60 * 1000,
200,
DefaultMetadataLogConfig.maxFetchSizeInBytes,
- DefaultMetadataLogConfig.deleteDelayMillis,
- DefaultMetadataLogConfig.nodeId
+ DefaultMetadataLogConfig.deleteDelayMillis
)
val log = buildMetadataLog(tempDir, mockTime, config)
@@ -1112,8 +1089,7 @@ object KafkaMetadataLogTest {
60 * 1000,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
- ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
- 1
+ ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT
)
def buildMetadataLogAndDir(
@@ -1133,7 +1109,8 @@ object KafkaMetadataLogTest {
logDir,
time,
time.scheduler,
- metadataLogConfig
+ metadataLogConfig,
+ 1
)
(logDir.toPath, metadataLog, metadataLogConfig)
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 1d2653ab618..79d39f14578 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -33,7 +33,6 @@ import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.RequestLocal
-import org.apache.kafka.server.config.KRaftConfigs
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
@@ -57,6 +56,7 @@ import org.mockito.Mockito.{doAnswer, doThrow, spy}
import net.jqwik.api.AfterFailureMode
import net.jqwik.api.ForAll
import net.jqwik.api.Property
+import org.apache.kafka.server.config.KRaftConfigs
import java.io._
import java.nio.ByteBuffer
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 5f5f94efc95..4255648347c 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -31,8 +31,7 @@ import org.apache.kafka.common.Uuid
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.raft.Endpoints
-import org.apache.kafka.raft.QuorumConfig
+import org.apache.kafka.raft.{Endpoints, MetadataLogConfig, QuorumConfig}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.fault.FaultHandler
@@ -58,7 +57,7 @@ class RaftManagerTest {
props.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, value.toString)
}
metadataDir.foreach { value =>
- props.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, value.toString)
+ props.setProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, value.toString)
}
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, processRoles.mkString(","))
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 824eb283fd4..17ad2200dcc 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.raft.QuorumConfig
+import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.DynamicThreadPool
import org.apache.kafka.server.authorizer._
@@ -673,11 +673,11 @@ class DynamicBrokerConfigTest {
@Test
def testNonInternalValuesDoesNotExposeInternalConfigs(): Unit = {
val props = TestUtils.createBrokerConfig(0, port = 8181)
- props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, "1024")
+ props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, "1024")
val config = new KafkaConfig(props)
- assertFalse(config.nonInternalValues.containsKey(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG))
+ assertFalse(config.nonInternalValues.containsKey(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG))
config.updateCurrentConfig(new KafkaConfig(props))
- assertFalse(config.nonInternalValues.containsKey(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG))
+ assertFalse(config.nonInternalValues.containsKey(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index cb8a1b1f869..74e34b06dac 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.EndPoint
-import org.apache.kafka.raft.QuorumConfig
+import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
@@ -44,7 +44,6 @@ import org.apache.kafka.storage.internals.log.CleanerConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable
-
import org.apache.kafka.common.test.{TestUtils => JTestUtils}
import scala.jdk.CollectionConverters._
@@ -790,13 +789,13 @@ class KafkaConfigTest {
case KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KRaftConfigs.NODE_ID_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
- case KRaftConfigs.METADATA_LOG_DIR_CONFIG => // ignore string
- case KRaftConfigs.METADATA_LOG_SEGMENT_BYTES_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
- case KRaftConfigs.METADATA_LOG_SEGMENT_MILLIS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
- case KRaftConfigs.METADATA_MAX_RETENTION_BYTES_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
- case KRaftConfigs.METADATA_MAX_RETENTION_MILLIS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case MetadataLogConfig.METADATA_LOG_DIR_CONFIG => // ignore string
+ case MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case MetadataLogConfig.METADATA_MAX_RETENTION_BYTES_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case MetadataLogConfig.METADATA_MAX_RETENTION_MILLIS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG => // ignore string
- case KRaftConfigs.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG => //ignore string
case ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG => //ignore string
@@ -1494,7 +1493,7 @@ class KafkaConfigTest {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
- props.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, metadataDir)
+ props.setProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, metadataDir)
props.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, dataDir)
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
@@ -1674,12 +1673,12 @@ class KafkaConfigTest {
val validValue = 100
val props = new Properties()
props.putAll(kraftProps())
- props.setProperty(KRaftConfigs.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, validValue.toString)
+ props.setProperty(MetadataLogConfig.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, validValue.toString)
val config = KafkaConfig.fromProps(props)
assertEquals(validValue, config.metadataSnapshotMaxIntervalMs)
- props.setProperty(KRaftConfigs.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, "-1")
+ props.setProperty(MetadataLogConfig.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, "-1")
val errorMessage = assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage
assertEquals(
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index 48fb8081129..aa558dca7f3 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -23,7 +23,7 @@ import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
-import org.apache.kafka.raft.QuorumConfig
+import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
import org.apache.kafka.server.common.MetadataVersion
@@ -159,7 +159,7 @@ class KafkaRaftServerTest {
configProperties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"${nodeId + 1}@localhost:9092")
configProperties.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
- configProperties.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG, invalidDir.getAbsolutePath)
+ configProperties.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, invalidDir.getAbsolutePath)
configProperties.put(ServerLogConfigs.LOG_DIR_CONFIG, validDir.getAbsolutePath)
configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
val config = KafkaConfig.fromProps(configProperties)
@@ -189,7 +189,7 @@ class KafkaRaftServerTest {
configProperties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
configProperties.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"${nodeId + 1}@localhost:9092")
- configProperties.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG, validDir.getAbsolutePath)
+ configProperties.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, validDir.getAbsolutePath)
configProperties.put(ServerLogConfigs.LOG_DIR_CONFIG, invalidDir.getAbsolutePath)
configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
val config = KafkaConfig.fromProps(configProperties)
@@ -225,7 +225,7 @@ class KafkaRaftServerTest {
configProperties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
configProperties.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"${nodeId + 1}@localhost:9092")
- configProperties.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG, metadataDir.getAbsolutePath)
+ configProperties.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, metadataDir.getAbsolutePath)
configProperties.put(ServerLogConfigs.LOG_DIR_CONFIG, dataDir.getAbsolutePath)
configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
val config = KafkaConfig.fromProps(configProperties)
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index e4a13c8d304..5b3e9abe111 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -552,9 +552,9 @@ class DumpLogSegmentsTest {
60 * 1000,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
- ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
- 1
- )
+ ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT
+ ),
+ 1
)
val lastContainedLogTimestamp = 10000
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 90979426dd3..101b8f43bc4 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble, PropertiesUtils}
import org.apache.kafka.metadata.storage.FormatterException
import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.raft.QuorumConfig
+import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
import org.junit.jupiter.api.{Test, Timeout}
@@ -67,7 +67,7 @@ class StorageToolTest {
@Test
def testConfigToLogDirectoriesWithMetaLogDir(): Unit = {
val properties = newSelfManagedProperties()
- properties.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, "/tmp/baz")
+ properties.setProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, "/tmp/baz")
val config = new KafkaConfig(properties)
assertEquals(Seq("/tmp/bar", "/tmp/baz", "/tmp/foo"),
StorageTool.configToLogDirectories(config))
diff --git a/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java b/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java
index 869966e1791..b0a6f9f045a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java
+++ b/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java
@@ -16,25 +16,159 @@
*/
package org.apache.kafka.raft;
-/**
- * Configuration for the metadata log
- * @param logSegmentBytes The maximum size of a single metadata log file
- * @param logSegmentMinBytes The minimum size of a single metadata log file
- * @param logSegmentMillis The maximum time before a new metadata log file is rolled out
- * @param retentionMaxBytes The size of the metadata log and snapshots before deleting old snapshots and log files
- * @param retentionMillis The time to keep a metadata log file or snapshot before deleting it
- * @param maxBatchSizeInBytes The largest record batch size allowed in the metadata log
- * @param maxFetchSizeInBytes The maximum number of bytes to read when fetching from the metadata log
- * @param deleteDelayMillis The amount of time to wait before deleting a file from the filesystem
- * @param nodeId The node id
- */
-public record MetadataLogConfig(int logSegmentBytes,
- int logSegmentMinBytes,
- long logSegmentMillis,
- long retentionMaxBytes,
- long retentionMillis,
- int maxBatchSizeInBytes,
- int maxFetchSizeInBytes,
- long deleteDelayMillis,
- int nodeId) {
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.server.config.ServerLogConfigs;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public class MetadataLogConfig {
+
+ public static final String METADATA_LOG_DIR_CONFIG = "metadata.log.dir";
+ public static final String METADATA_LOG_DIR_DOC = "This configuration determines where we put the metadata log. " +
+ "If it is not set, the metadata log is placed in the first log directory from log.dirs.";
+
+ public static final String METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG = "metadata.log.max.snapshot.interval.ms";
+ public static final long METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT = TimeUnit.HOURS.toMillis(1);
+ public static final String METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG = "metadata.log.max.record.bytes.between.snapshots";
+ public static final int METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES = 20 * 1024 * 1024;
+ public static final String METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC = "This is the maximum number of bytes in the log between the latest " +
+ "snapshot and the high-watermark needed before generating a new snapshot. The default value is " +
+ METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES + ". To generate snapshots based on the time elapsed, see the " +
+ METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG + " configuration. The Kafka node will generate a snapshot when " +
+ "either the maximum time interval is reached or the maximum bytes limit is reached.";
+ public static final String METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC = "This is the maximum number of milliseconds to wait to generate a snapshot " +
+ "if there are committed records in the log that are not included in the latest snapshot. A value of zero disables " +
+ "time based snapshot generation. The default value is " + METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT + ". To generate " +
+ "snapshots based on the number of metadata bytes, see the " + METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG + " " +
+ "configuration. The Kafka node will generate a snapshot when either the maximum time interval is reached or the " +
+ "maximum bytes limit is reached.";
+
+ public static final String METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG = "metadata.log.segment.min.bytes";
+ public static final String METADATA_LOG_SEGMENT_MIN_BYTES_DOC = "Override the minimum size for a single metadata log file. This should be used for testing only.";
+ public static final int METADATA_LOG_SEGMENT_MIN_BYTES_DEFAULT = 8 * 1024 * 1024;
+
+ public static final String METADATA_LOG_SEGMENT_BYTES_CONFIG = "metadata.log.segment.bytes";
+ public static final String METADATA_LOG_SEGMENT_BYTES_DOC = "The maximum size of a single metadata log file.";
+ public static final int METADATA_LOG_SEGMENT_BYTES_DEFAULT = 1024 * 1024 * 1024;
+
+ public static final String METADATA_LOG_SEGMENT_MILLIS_CONFIG = "metadata.log.segment.ms";
+ public static final String METADATA_LOG_SEGMENT_MILLIS_DOC = "The maximum time before a new metadata log file is rolled out (in milliseconds).";
+ public static final long METADATA_LOG_SEGMENT_MILLIS_DEFAULT = 24 * 7 * 60 * 60 * 1000L;
+
+ public static final String METADATA_MAX_RETENTION_BYTES_CONFIG = "metadata.max.retention.bytes";
+ public static final int METADATA_MAX_RETENTION_BYTES_DEFAULT = 100 * 1024 * 1024;
+ public static final String METADATA_MAX_RETENTION_BYTES_DOC = "The maximum combined size of the metadata log and snapshots before deleting old " +
+ "snapshots and log files. Since at least one snapshot must exist before any logs can be deleted, this is a soft limit.";
+
+ public static final String METADATA_MAX_RETENTION_MILLIS_CONFIG = "metadata.max.retention.ms";
+ public static final String METADATA_MAX_RETENTION_MILLIS_DOC = "The number of milliseconds to keep a metadata log file or snapshot before " +
+ "deleting it. Since at least one snapshot must exist before any logs can be deleted, this is a soft limit.";
+ public static final long METADATA_MAX_RETENTION_MILLIS_DEFAULT = 24 * 7 * 60 * 60 * 1000L;
+
+ public static final String METADATA_MAX_IDLE_INTERVAL_MS_CONFIG = "metadata.max.idle.interval.ms";
+ public static final int METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT = 500;
+ public static final String METADATA_MAX_IDLE_INTERVAL_MS_DOC = "This configuration controls how often the active " +
+ "controller should write no-op records to the metadata partition. If the value is 0, no-op records " +
+ "are not appended to the metadata partition. The default value is " + METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT;
+
+ public static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC)
+ .define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC)
+ .define(METADATA_LOG_DIR_CONFIG, STRING, null, null, HIGH, METADATA_LOG_DIR_DOC)
+ .define(METADATA_LOG_SEGMENT_BYTES_CONFIG, INT, METADATA_LOG_SEGMENT_BYTES_DEFAULT, atLeast(Records.LOG_OVERHEAD), HIGH, METADATA_LOG_SEGMENT_BYTES_DOC)
+ .defineInternal(METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, INT, METADATA_LOG_SEGMENT_MIN_BYTES_DEFAULT, atLeast(Records.LOG_OVERHEAD), HIGH, METADATA_LOG_SEGMENT_MIN_BYTES_DOC)
+ .define(METADATA_LOG_SEGMENT_MILLIS_CONFIG, LONG, METADATA_LOG_SEGMENT_MILLIS_DEFAULT, null, HIGH, METADATA_LOG_SEGMENT_MILLIS_DOC)
+ .define(METADATA_MAX_RETENTION_BYTES_CONFIG, LONG, METADATA_MAX_RETENTION_BYTES_DEFAULT, null, HIGH, METADATA_MAX_RETENTION_BYTES_DOC)
+ .define(METADATA_MAX_RETENTION_MILLIS_CONFIG, LONG, METADATA_MAX_RETENTION_MILLIS_DEFAULT, null, HIGH, METADATA_MAX_RETENTION_MILLIS_DOC)
+ .define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, METADATA_MAX_IDLE_INTERVAL_MS_DOC);
+
+ private final int logSegmentBytes;
+ private final int logSegmentMinBytes;
+ private final long logSegmentMillis;
+ private final long retentionMaxBytes;
+ private final long retentionMillis;
+ private final int maxBatchSizeInBytes;
+ private final int maxFetchSizeInBytes;
+ private final long deleteDelayMillis;
+
+ /**
+ * Configuration for the metadata log
+ * @param logSegmentBytes The maximum size of a single metadata log file
+ * @param logSegmentMinBytes The minimum size of a single metadata log file
+ * @param logSegmentMillis The maximum time before a new metadata log file is rolled out
+ * @param retentionMaxBytes The size of the metadata log and snapshots before deleting old snapshots and log files
+ * @param retentionMillis The time to keep a metadata log file or snapshot before deleting it
+ * @param maxBatchSizeInBytes The largest record batch size allowed in the metadata log
+ * @param maxFetchSizeInBytes The maximum number of bytes to read when fetching from the metadata log
+ * @param deleteDelayMillis The amount of time to wait before deleting a file from the filesystem
+ */
+ public MetadataLogConfig(int logSegmentBytes,
+ int logSegmentMinBytes,
+ long logSegmentMillis,
+ long retentionMaxBytes,
+ long retentionMillis,
+ int maxBatchSizeInBytes,
+ int maxFetchSizeInBytes,
+ long deleteDelayMillis) {
+ this.logSegmentBytes = logSegmentBytes;
+ this.logSegmentMinBytes = logSegmentMinBytes;
+ this.logSegmentMillis = logSegmentMillis;
+ this.retentionMaxBytes = retentionMaxBytes;
+ this.retentionMillis = retentionMillis;
+ this.maxBatchSizeInBytes = maxBatchSizeInBytes;
+ this.maxFetchSizeInBytes = maxFetchSizeInBytes;
+ this.deleteDelayMillis = deleteDelayMillis;
+ }
+
+ public MetadataLogConfig(AbstractConfig config) {
+ this.logSegmentBytes = config.getInt(METADATA_LOG_SEGMENT_BYTES_CONFIG);
+ this.logSegmentMinBytes = config.getInt(METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG);
+ this.logSegmentMillis = config.getLong(METADATA_LOG_SEGMENT_MILLIS_CONFIG);
+ this.retentionMaxBytes = config.getLong(METADATA_MAX_RETENTION_BYTES_CONFIG);
+ this.retentionMillis = config.getLong(METADATA_MAX_RETENTION_MILLIS_CONFIG);
+ this.maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES;
+ this.maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES;
+ this.deleteDelayMillis = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT;
+ }
+
+ public int logSegmentBytes() {
+ return logSegmentBytes;
+ }
+
+ public int logSegmentMinBytes() {
+ return logSegmentMinBytes;
+ }
+
+ public long logSegmentMillis() {
+ return logSegmentMillis;
+ }
+
+ public long retentionMaxBytes() {
+ return retentionMaxBytes;
+ }
+
+ public long retentionMillis() {
+ return retentionMillis;
+ }
+
+ public int maxBatchSizeInBytes() {
+ return maxBatchSizeInBytes;
+ }
+
+ public int maxFetchSizeInBytes() {
+ return maxFetchSizeInBytes;
+ }
+
+ public long deleteDelayMillis() {
+ return deleteDelayMillis;
+ }
}
diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index 87bf18a412f..ef61fee1542 100644
--- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -27,6 +27,7 @@ import org.apache.kafka.coordinator.transaction.AddPartitionsToTxnConfig;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.MetadataLogConfig;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.metrics.MetricConfigs;
@@ -47,6 +48,7 @@ public abstract class AbstractKafkaConfig extends AbstractConfig {
RemoteLogManagerConfig.configDef(),
ServerConfigs.CONFIG_DEF,
KRaftConfigs.CONFIG_DEF,
+ MetadataLogConfig.CONFIG_DEF,
SocketServerConfigs.CONFIG_DEF,
ReplicationConfigs.CONFIG_DEF,
GroupCoordinatorConfig.CONFIG_DEF,
diff --git a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
index 4d82172c7d8..9e9580e6d75 100644
--- a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
+++ b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
@@ -18,13 +18,8 @@ package org.apache.kafka.server.config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.record.Records;
-import org.apache.kafka.storage.internals.log.LogConfig;
-
-import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
-import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
@@ -54,26 +49,6 @@ public class KRaftConfigs {
public static final String NODE_ID_DOC = "The node ID associated with the roles this process is playing when process.roles is non-empty. " +
"This is required configuration when running in KRaft mode.";
- public static final String METADATA_LOG_DIR_CONFIG = "metadata.log.dir";
- public static final String METADATA_LOG_DIR_DOC = "This configuration determines where we put the metadata log. " +
- "If it is not set, the metadata log is placed in the first log directory from log.dirs.";
-
- public static final String METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG = "metadata.log.max.snapshot.interval.ms";
- public static final long METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT = TimeUnit.HOURS.toMillis(1);
- public static final String METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG = "metadata.log.max.record.bytes.between.snapshots";
- public static final int METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES = 20 * 1024 * 1024;
- public static final String METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC = "This is the maximum number of bytes in the log between the latest " +
- "snapshot and the high-watermark needed before generating a new snapshot. The default value is " +
- METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES + ". To generate snapshots based on the time elapsed, see the " +
- METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG + " configuration. The Kafka node will generate a snapshot when " +
- "either the maximum time interval is reached or the maximum bytes limit is reached.";
- public static final String METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC = "This is the maximum number of milliseconds to wait to generate a snapshot " +
- "if there are committed records in the log that are not included in the latest snapshot. A value of zero disables " +
- "time based snapshot generation. The default value is " + METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT + ". To generate " +
- "snapshots based on the number of metadata bytes, see the " + METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG + " " +
- "configuration. The Kafka node will generate a snapshot when either the maximum time interval is reached or the " +
- "maximum bytes limit is reached.";
-
public static final String CONTROLLER_LISTENER_NAMES_CONFIG = "controller.listener.names";
public static final String CONTROLLER_LISTENER_NAMES_DOC = "A comma-separated list of the names of the listeners used by the controller. This is required " +
"when communicating with the controller quorum, the broker will always use the first listener in this list.";
@@ -81,30 +56,6 @@ public class KRaftConfigs {
public static final String SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG = "sasl.mechanism.controller.protocol";
public static final String SASL_MECHANISM_CONTROLLER_PROTOCOL_DOC = "SASL mechanism used for communication with controllers. Default is GSSAPI.";
- public static final String METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG = "metadata.log.segment.min.bytes";
- public static final String METADATA_LOG_SEGMENT_MIN_BYTES_DOC = "Override the minimum size for a single metadata log file. This should be used for testing only.";
-
- public static final String METADATA_LOG_SEGMENT_BYTES_CONFIG = "metadata.log.segment.bytes";
- public static final String METADATA_LOG_SEGMENT_BYTES_DOC = "The maximum size of a single metadata log file.";
-
- public static final String METADATA_LOG_SEGMENT_MILLIS_CONFIG = "metadata.log.segment.ms";
- public static final String METADATA_LOG_SEGMENT_MILLIS_DOC = "The maximum time before a new metadata log file is rolled out (in milliseconds).";
-
- public static final String METADATA_MAX_RETENTION_BYTES_CONFIG = "metadata.max.retention.bytes";
- public static final int METADATA_MAX_RETENTION_BYTES_DEFAULT = 100 * 1024 * 1024;
- public static final String METADATA_MAX_RETENTION_BYTES_DOC = "The maximum combined size of the metadata log and snapshots before deleting old " +
- "snapshots and log files. Since at least one snapshot must exist before any logs can be deleted, this is a soft limit.";
-
- public static final String METADATA_MAX_RETENTION_MILLIS_CONFIG = "metadata.max.retention.ms";
- public static final String METADATA_MAX_RETENTION_MILLIS_DOC = "The number of milliseconds to keep a metadata log file or snapshot before " +
- "deleting it. Since at least one snapshot must exist before any logs can be deleted, this is a soft limit.";
-
- public static final String METADATA_MAX_IDLE_INTERVAL_MS_CONFIG = "metadata.max.idle.interval.ms";
- public static final int METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT = 500;
- public static final String METADATA_MAX_IDLE_INTERVAL_MS_DOC = "This configuration controls how often the active " +
- "controller should write no-op records to the metadata partition. If the value is 0, no-op records " +
- "are not appended to the metadata partition. The default value is " + METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT;
-
public static final String SERVER_MAX_STARTUP_TIME_MS_CONFIG = "server.max.startup.time.ms";
public static final long SERVER_MAX_STARTUP_TIME_MS_DEFAULT = Long.MAX_VALUE;
public static final String SERVER_MAX_STARTUP_TIME_MS_DOC = "The maximum number of milliseconds we will wait for the server to come up. " +
@@ -119,8 +70,6 @@ public class KRaftConfigs {
public static final String CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DOC = "We will log an error message about controller events that take longer than this threshold.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC)
- .define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC)
.define(PROCESS_ROLES_CONFIG, LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.ValidList.in("broker", "controller"), HIGH, PROCESS_ROLES_DOC)
.define(NODE_ID_CONFIG, INT, ConfigDef.NO_DEFAULT_VALUE, atLeast(0), HIGH, NODE_ID_DOC)
.define(INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG, INT, INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DEFAULT, null, MEDIUM, INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DOC)
@@ -128,13 +77,6 @@ public class KRaftConfigs {
.define(BROKER_SESSION_TIMEOUT_MS_CONFIG, INT, BROKER_SESSION_TIMEOUT_MS_DEFAULT, null, MEDIUM, BROKER_SESSION_TIMEOUT_MS_DOC)
.define(CONTROLLER_LISTENER_NAMES_CONFIG, STRING, null, null, HIGH, CONTROLLER_LISTENER_NAMES_DOC)
.define(SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH, SASL_MECHANISM_CONTROLLER_PROTOCOL_DOC)
- .define(METADATA_LOG_DIR_CONFIG, STRING, null, null, HIGH, METADATA_LOG_DIR_DOC)
- .define(METADATA_LOG_SEGMENT_BYTES_CONFIG, INT, LogConfig.DEFAULT_SEGMENT_BYTES, atLeast(Records.LOG_OVERHEAD), HIGH, METADATA_LOG_SEGMENT_BYTES_DOC)
- .defineInternal(METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, INT, 8 * 1024 * 1024, atLeast(Records.LOG_OVERHEAD), HIGH, METADATA_LOG_SEGMENT_MIN_BYTES_DOC)
- .define(METADATA_LOG_SEGMENT_MILLIS_CONFIG, LONG, LogConfig.DEFAULT_SEGMENT_MS, null, HIGH, METADATA_LOG_SEGMENT_MILLIS_DOC)
- .define(METADATA_MAX_RETENTION_BYTES_CONFIG, LONG, METADATA_MAX_RETENTION_BYTES_DEFAULT, null, HIGH, METADATA_MAX_RETENTION_BYTES_DOC)
- .define(METADATA_MAX_RETENTION_MILLIS_CONFIG, LONG, LogConfig.DEFAULT_RETENTION_MS, null, HIGH, METADATA_MAX_RETENTION_MILLIS_DOC)
- .define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, METADATA_MAX_IDLE_INTERVAL_MS_DOC)
.defineInternal(CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS, LONG, CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS_DEFAULT, atLeast(100), MEDIUM, CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS_DOC)
.defineInternal(CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS, LONG, CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DEFAULT, atLeast(0), MEDIUM, CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DOC)
.defineInternal(SERVER_MAX_STARTUP_TIME_MS_CONFIG, LONG, SERVER_MAX_STARTUP_TIME_MS_DEFAULT, atLeast(0), MEDIUM, SERVER_MAX_STARTUP_TIME_MS_DOC);
diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 36acb6bc8a5..b026995ec70 100644
--- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -40,6 +40,7 @@ import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.metadata.storage.Formatter;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.raft.DynamicVoters;
+import org.apache.kafka.raft.MetadataLogConfig;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
@@ -140,11 +141,11 @@ public class KafkaClusterTestKit implements AutoCloseable {
Integer.toString(node.id()));
// In combined mode, always prefer the metadata log directory of the controller node.
if (controllerNode != null) {
- props.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG,
+ props.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG,
controllerNode.metadataDirectory());
setSecurityProtocolProps(props, controllerSecurityProtocol);
} else {
- props.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG,
+ props.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG,
node.metadataDirectory());
}
if (brokerNode != null) {
diff --git a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
index dba7951aa4c..a286524af4d 100644
--- a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.MetadataLogConfig;
import org.apache.kafka.server.config.KRaftConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.util.CommandLineUtils;
@@ -359,8 +360,8 @@ public class MetadataQuorumCommand {
}
static String getMetadataDirectory(Properties props) throws TerseException {
- if (props.containsKey(KRaftConfigs.METADATA_LOG_DIR_CONFIG)) {
- return props.getProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG);
+ if (props.containsKey(MetadataLogConfig.METADATA_LOG_DIR_CONFIG)) {
+ return props.getProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG);
}
if (props.containsKey(ServerLogConfigs.LOG_DIRS_CONFIG)) {
String[] logDirs = props.getProperty(ServerLogConfigs.LOG_DIRS_CONFIG).trim().split(",");
@@ -368,7 +369,7 @@ public class MetadataQuorumCommand {
return logDirs[0];
}
}
- throw new TerseException("Neither " + KRaftConfigs.METADATA_LOG_DIR_CONFIG + " nor " +
+ throw new TerseException("Neither " + MetadataLogConfig.METADATA_LOG_DIR_CONFIG + " nor " +
ServerLogConfigs.LOG_DIRS_CONFIG + " were found. Is this a valid controller " +
"configuration file?");
}