MINOR: Move logDirs config out of KafkaConfig (#19579)
CI / build (push) Waiting to run Details

Follow up https://github.com/apache/kafka/pull/19460/files#r2062664349

Reviewers: Ismael Juma <ismael@juma.me.uk>, PoAn Yang
<payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Jhen-Yung Hsu 2025-05-17 00:52:20 +08:00 committed by GitHub
parent 391b604c97
commit ced56a320b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 83 additions and 84 deletions

View File

@ -21,7 +21,6 @@
<import-control pkg="org.apache.kafka">
<allow pkg="java"/>
<allow pkg="org.junit"/>
<allow pkg="scala" />
<!-- These are tests, allow whatever -->
<allow pkg="org.apache.kafka"/>

View File

@ -44,13 +44,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import scala.jdk.javaapi.CollectionConverters;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -242,9 +238,8 @@ public class ListOffsetsIntegrationTest {
// case 2: test the offsets from recovery path.
// server will rebuild offset index according to log files if the index files are nonexistent
Set<String> indexFiles = clusterInstance.brokers().values().stream().flatMap(broker ->
CollectionConverters.asJava(broker.config().logDirs()).stream()
).collect(Collectors.toUnmodifiableSet());
List<String> indexFiles = clusterInstance.brokers().values().stream().flatMap(broker ->
broker.config().logDirs().stream()).toList();
clusterInstance.brokers().values().forEach(KafkaBroker::shutdown);
indexFiles.forEach(root -> {
File[] files = new File(String.format("%s/%s-0", root, topic)).listFiles();

View File

@ -1536,7 +1536,7 @@ object LogManager {
val cleanerConfig = new CleanerConfig(config)
val transactionLogConfig = new TransactionLogConfig(config)
new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
new LogManager(logDirs = config.logDirs.asScala.map(new File(_).getAbsoluteFile),
initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
configRepository = configRepository,
initialDefaultConfig = defaultLogConfig,

View File

@ -79,6 +79,7 @@ object KafkaRaftManager {
private def hasDifferentLogDir(config: KafkaConfig): Boolean = {
!config
.logDirs
.asScala
.map(Paths.get(_).toAbsolutePath)
.contains(Paths.get(config.metadataLogDir).toAbsolutePath)
}

View File

@ -711,7 +711,7 @@ class BrokerServer(
None
}
val rlm = new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
val rlm = new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.get(0), clusterId, time,
(tp: TopicPartition) => logManager.getLog(tp).toJava,
(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
logManager.getLog(tp).foreach { log =>

View File

@ -2228,12 +2228,12 @@ class KafkaApis(val requestChannel: RequestChannel,
(replicaManager.describeLogDirs(partitions), Errors.NONE)
} else {
(List.empty[DescribeLogDirsResponseData.DescribeLogDirsResult], Errors.CLUSTER_AUTHORIZATION_FAILED)
(util.Collections.emptyList[DescribeLogDirsResponseData.DescribeLogDirsResult], Errors.CLUSTER_AUTHORIZATION_FAILED)
}
}
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => new DescribeLogDirsResponse(new DescribeLogDirsResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setResults(logDirInfos.asJava)
.setResults(logDirInfos)
.setErrorCode(error.code)))
}

View File

@ -248,7 +248,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
def metadataLogDir: String = {
Option(getString(MetadataLogConfig.METADATA_LOG_DIR_CONFIG)) match {
case Some(dir) => dir
case None => logDirs.head
case None => logDirs.get(0)
}
}
@ -322,7 +322,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
/** ********* Log Configuration ***********/
val autoCreateTopicsEnable = getBoolean(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG)
val numPartitions = getInt(ServerLogConfigs.NUM_PARTITIONS_CONFIG)
val logDirs: Seq[String] = Csv.parseCsvList(Option(getString(ServerLogConfigs.LOG_DIRS_CONFIG)).getOrElse(getString(ServerLogConfigs.LOG_DIR_CONFIG))).asScala
def logSegmentBytes = getInt(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG)
def logFlushIntervalMessages = getLong(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG)
def logCleanerThreads = getInt(CleanerConfig.LOG_CLEANER_THREADS_PROP)
@ -587,7 +586,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1")
require(logDirs.nonEmpty, "At least one log directory must be defined via log.dirs or log.dir.")
require(logDirs.size > 0, "At least one log directory must be defined via log.dirs or log.dir.")
require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" +
" to prevent unnecessary socket timeouts")

View File

@ -134,7 +134,7 @@ object KafkaRaftServer {
// Load and verify the original ensemble.
val loader = new MetaPropertiesEnsemble.Loader()
loader.addMetadataLogDir(config.metadataLogDir)
.addLogDirs(config.logDirs.asJava)
.addLogDirs(config.logDirs)
val initialMetaPropsEnsemble = loader.load()
val verificationFlags = util.EnumSet.of(REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR)
initialMetaPropsEnsemble.verify(Optional.empty(), OptionalInt.of(config.nodeId), verificationFlags)

View File

@ -1275,10 +1275,10 @@ class ReplicaManager(val config: KafkaConfig,
* 2) size and lag of current and future logs for each partition in the given log directory. Only logs of the queried partitions
* are included. There may be future logs (which will replace the current logs of the partition in the future) on the broker after KIP-113 is implemented.
*/
def describeLogDirs(partitions: Set[TopicPartition]): List[DescribeLogDirsResponseData.DescribeLogDirsResult] = {
def describeLogDirs(partitions: Set[TopicPartition]): util.List[DescribeLogDirsResponseData.DescribeLogDirsResult] = {
val logsByDir = logManager.allLogs.groupBy(log => log.parentDir)
config.logDirs.toSet.map { logDir: String =>
config.logDirs.stream().distinct().map(logDir => {
val file = Paths.get(logDir)
val absolutePath = file.toAbsolutePath.toString
try {
@ -1326,7 +1326,7 @@ class ReplicaManager(val config: KafkaConfig,
.setLogDir(absolutePath)
.setErrorCode(Errors.forException(t).code)
}
}.toList
}).toList()
}
// See: https://bugs.openjdk.java.net/browse/JDK-8162520

View File

@ -376,7 +376,7 @@ object StorageTool extends Logging {
def configToLogDirectories(config: KafkaConfig): Seq[String] = {
val directories = new mutable.TreeSet[String]
directories ++= config.logDirs
directories ++= config.logDirs.asScala
Option(config.metadataLogDir).foreach(directories.add)
directories.toSeq
}

View File

@ -75,9 +75,9 @@ object CoreUtils {
/**
* Recursively delete the list of files/directories and any subfiles (if any exist)
* @param files sequence of files to be deleted
* @param files list of files to be deleted
*/
def delete(files: Seq[String]): Unit = files.foreach(f => Utils.delete(new File(f)))
def delete(files: java.util.List[String]): Unit = files.forEach(f => Utils.delete(new File(f)))
/**
* Register the given mbean with the platform mbean server,

View File

@ -894,11 +894,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// Generate two mutually exclusive replicaAssignment
val firstReplicaAssignment = brokers.map { server =>
val logDir = new File(server.config.logDirs(randomNums(server))).getAbsolutePath
val logDir = new File(server.config.logDirs.get(randomNums(server))).getAbsolutePath
new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir
}.toMap
val secondReplicaAssignment = brokers.map { server =>
val logDir = new File(server.config.logDirs(1 - randomNums(server))).getAbsolutePath
val logDir = new File(server.config.logDirs.get(1 - randomNums(server))).getAbsolutePath
new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir
}.toMap
@ -1520,7 +1520,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
// we will create another dir just for one server
val futureLogDir = brokers(0).config.logDirs(1)
val futureLogDir = brokers(0).config.logDirs.get(1)
val futureReplica = new TopicPartitionReplica(topic, 0, brokers(0).config.brokerId)
// Verify that replica can be moved to the specified log directory
@ -3570,7 +3570,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertFutureThrows(classOf[InvalidTopicException], results.get(invalidTopicName))
assertFutureThrows(classOf[InvalidTopicException],
client.alterReplicaLogDirs(
Map(new TopicPartitionReplica(longTopicName, 0, 0) -> brokers(0).config.logDirs(0)).asJava).all())
util.Map.of(new TopicPartitionReplica(longTopicName, 0, 0), brokers(0).config.logDirs.get(0))).all())
client.close()
}

View File

@ -1551,7 +1551,7 @@ class KRaftClusterTest {
// Copy foo-0 to targetParentDir
// This is so that we can rename the main replica to a future down below
val parentDir = log.parentDir
val targetParentDir = broker0.config.logDirs.filter(_ != parentDir).head
val targetParentDir = broker0.config.logDirs.stream().filter(l => !l.equals(parentDir)).findFirst().get()
val targetDirFile = new File(targetParentDir, log.dir.getName)
targetDirFile.mkdir()
copyDirectory(log.dir.toString(), targetDirFile.toString())

View File

@ -81,7 +81,7 @@ class KRaftQuorumImplementation(
): KafkaBroker = {
val metaPropertiesEnsemble = {
val loader = new MetaPropertiesEnsemble.Loader()
loader.addLogDirs(config.logDirs.asJava)
loader.addLogDirs(config.logDirs)
loader.addMetadataLogDir(config.metadataLogDir)
val ensemble = loader.load()
val copier = new MetaPropertiesEnsemble.Copier(ensemble)

View File

@ -62,7 +62,7 @@ class LocalLeaderEndPointTest extends Logging {
def setUp(): Unit = {
val props = TestUtils.createBrokerConfig(sourceBroker.id, port = sourceBroker.port)
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)))
val alterPartitionManager = mock(classOf[AlterPartitionManager])
val metrics = new Metrics
quotaManager = QuotaFactory.instantiate(config, metrics, time, "", "")

View File

@ -56,7 +56,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
val partitionNum = 5
// Alter replica dir before topic creation
val logDir1 = new File(brokers.head.config.logDirs(Random.nextInt(logDirCount))).getAbsolutePath
val logDir1 = new File(brokers.head.config.logDirs.get(Random.nextInt(logDirCount))).getAbsolutePath
val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap
val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1)
@ -73,7 +73,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
}
// Alter replica dir again after topic creation
val logDir2 = new File(brokers.head.config.logDirs(Random.nextInt(logDirCount))).getAbsolutePath
val logDir2 = new File(brokers.head.config.logDirs.get(Random.nextInt(logDirCount))).getAbsolutePath
val partitionDirs2 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir2).toMap
val alterReplicaLogDirsResponse2 = sendAlterReplicaLogDirsRequest(partitionDirs2)
// The response should succeed for all partitions
@ -88,10 +88,10 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
@Test
def testAlterReplicaLogDirsRequestErrorCode(): Unit = {
val offlineDir = new File(brokers.head.config.logDirs.tail.head).getAbsolutePath
val validDir1 = new File(brokers.head.config.logDirs(1)).getAbsolutePath
val validDir2 = new File(brokers.head.config.logDirs(2)).getAbsolutePath
val validDir3 = new File(brokers.head.config.logDirs(3)).getAbsolutePath
val offlineDir = new File(brokers.head.config.logDirs.get(1)).getAbsolutePath
val validDir1 = new File(brokers.head.config.logDirs.get(1)).getAbsolutePath
val validDir2 = new File(brokers.head.config.logDirs.get(2)).getAbsolutePath
val validDir3 = new File(brokers.head.config.logDirs.get(3)).getAbsolutePath
// Test AlterReplicaDirRequest before topic creation
val partitionDirs1 = mutable.Map.empty[TopicPartition, String]
@ -129,7 +129,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
val partitionNum = 1
// Alter replica dir before topic creation
val logDir1 = new File(brokers.head.config.logDirs(1)).getAbsolutePath
val logDir1 = new File(brokers.head.config.logDirs.get(1)).getAbsolutePath
val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap
val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1)
@ -162,7 +162,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
}, "timed out waiting for log segment to retention")
// Alter replica dir again after topic creation
val logDir2 = new File(brokers.head.config.logDirs(2)).getAbsolutePath
val logDir2 = new File(brokers.head.config.logDirs.get(2)).getAbsolutePath
val alterReplicaLogDirsResponse2 = sendAlterReplicaLogDirsRequest(Map(tp -> logDir2))
// The response should succeed for all partitions
assertEquals(Errors.NONE, findErrorForPartition(alterReplicaLogDirsResponse2, tp))

View File

@ -40,8 +40,8 @@ class DescribeLogDirsRequestTest extends BaseRequestTest {
@Test
def testDescribeLogDirsRequest(): Unit = {
val onlineDir = new File(brokers.head.config.logDirs.head).getAbsolutePath
val offlineDir = new File(brokers.head.config.logDirs.tail.head).getAbsolutePath
val onlineDir = new File(brokers.head.config.logDirs.get(0)).getAbsolutePath
val offlineDir = new File(brokers.head.config.logDirs.get(1)).getAbsolutePath
brokers.head.replicaManager.handleLogDirFailure(offlineDir)
createTopic(topic, partitionNum, 1)
TestUtils.generateAndProduceMessages(brokers, topic, 10)

View File

@ -34,6 +34,8 @@ import org.apache.kafka.storage.internals.log.{CleanerConfig, LogDirFailureChann
import java.util.Optional
import scala.jdk.CollectionConverters._
class HighwatermarkPersistenceTest {
val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps)
@ -41,7 +43,7 @@ class HighwatermarkPersistenceTest {
val configRepository = new MockConfigRepository()
val logManagers = configs map { config =>
TestUtils.createLogManager(
logDirs = config.logDirs.map(new File(_)),
logDirs = config.logDirs.asScala.map(new File(_)),
cleanerConfig = new CleanerConfig(true))
}
@ -195,7 +197,7 @@ class HighwatermarkPersistenceTest {
}
private def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = {
replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.head).getAbsolutePath).read().getOrDefault(
replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.get(0)).getAbsolutePath).read().getOrDefault(
new TopicPartition(topic, partition), 0L)
}
}

View File

@ -1164,7 +1164,7 @@ class KafkaConfigTest {
assertEquals(1, config.brokerId)
assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedBrokerListeners.map(JTestUtils.endpointToString))
assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides)
assertEquals(List("/tmp1", "/tmp2"), config.logDirs)
assertEquals(util.List.of("/tmp1", "/tmp2"), config.logDirs)
assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)
assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis)
assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis)
@ -1499,7 +1499,7 @@ class KafkaConfigTest {
val config = KafkaConfig.fromProps(props)
assertEquals(metadataDir, config.metadataLogDir)
assertEquals(Seq(dataDir), config.logDirs)
assertEquals(util.List.of(dataDir), config.logDirs)
}
@Test
@ -1517,7 +1517,7 @@ class KafkaConfigTest {
val config = KafkaConfig.fromProps(props)
assertEquals(dataDir1, config.metadataLogDir)
assertEquals(Seq(dataDir1, dataDir2), config.logDirs)
assertEquals(util.List.of(dataDir1, dataDir2), config.logDirs)
}
@Test

View File

@ -60,8 +60,8 @@ class LogRecoveryTest extends QuorumTestHarness {
var admin: Admin = _
var producer: KafkaProducer[Integer, String] = _
def hwFile1 = new OffsetCheckpointFile(new File(configProps1.logDirs.head, ReplicaManager.HighWatermarkFilename), null)
def hwFile2 = new OffsetCheckpointFile(new File(configProps2.logDirs.head, ReplicaManager.HighWatermarkFilename), null)
def hwFile1 = new OffsetCheckpointFile(new File(configProps1.logDirs.get(0), ReplicaManager.HighWatermarkFilename), null)
def hwFile2 = new OffsetCheckpointFile(new File(configProps2.logDirs.get(0), ReplicaManager.HighWatermarkFilename), null)
var servers = Seq.empty[KafkaBroker]
// Some tests restart the brokers then produce more data. But since test brokers use random ports, we need

View File

@ -187,7 +187,7 @@ class ReplicaManagerTest {
@Test
def testHighWaterMarkDirectoryMapping(): Unit = {
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)))
val rm = new ReplicaManager(
metrics = metrics,
config = config,
@ -203,8 +203,8 @@ class ReplicaManagerTest {
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
rm.checkpointHighWatermarks()
config.logDirs.map(s => Paths.get(s, ReplicaManager.HighWatermarkFilename))
.foreach(checkpointFile => assertTrue(Files.exists(checkpointFile),
config.logDirs.stream().map(s => Paths.get(s, ReplicaManager.HighWatermarkFilename))
.forEach(checkpointFile => assertTrue(Files.exists(checkpointFile),
s"checkpoint file does not exist at $checkpointFile"))
} finally {
rm.shutdown(checkpointHW = false)
@ -216,7 +216,7 @@ class ReplicaManagerTest {
val props = TestUtils.createBrokerConfig(1)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)))
val rm = new ReplicaManager(
metrics = metrics,
config = config,
@ -232,8 +232,8 @@ class ReplicaManagerTest {
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
rm.checkpointHighWatermarks()
config.logDirs.map(s => Paths.get(s, ReplicaManager.HighWatermarkFilename))
.foreach(checkpointFile => assertTrue(Files.exists(checkpointFile),
config.logDirs.stream().map(s => Paths.get(s, ReplicaManager.HighWatermarkFilename))
.forEach(checkpointFile => assertTrue(Files.exists(checkpointFile),
s"checkpoint file does not exist at $checkpointFile"))
} finally {
rm.shutdown(checkpointHW = false)
@ -242,7 +242,7 @@ class ReplicaManagerTest {
@Test
def testIllegalRequiredAcks(): Unit = {
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)))
val rm = new ReplicaManager(
metrics = metrics,
config = config,
@ -293,7 +293,7 @@ class ReplicaManagerTest {
val props = TestUtils.createBrokerConfig(0)
props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
val config = KafkaConfig.fromProps(props)
val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties()))
val logManager = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(new Properties()))
mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
when(metadataCache.metadataVersion()).thenReturn(MetadataVersion.MINIMUM_VERSION)
val rm = new ReplicaManager(
@ -353,7 +353,7 @@ class ReplicaManagerTest {
val props = TestUtils.createBrokerConfig(0)
props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
val config = KafkaConfig.fromProps(props)
val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties()))
val logManager = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(new Properties()))
val spyLogManager = spy(logManager)
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
@ -425,7 +425,7 @@ class ReplicaManagerTest {
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
val config = KafkaConfig.fromProps(props)
val logProps = new Properties()
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(logProps))
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(logProps))
val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1))
mockGetAliveBrokerFunctions(metadataCache, aliveBrokers)
when(metadataCache.metadataVersion()).thenReturn(MetadataVersion.MINIMUM_VERSION)
@ -2783,7 +2783,7 @@ class ReplicaManagerTest {
props.asScala ++= extraProps.asScala
val config = KafkaConfig.fromProps(props)
val logConfig = new LogConfig(new Properties)
val logDir = new File(new File(config.logDirs.head), s"$topic-$topicPartition")
val logDir = new File(new File(config.logDirs.get(0)), s"$topic-$topicPartition")
Files.createDirectories(logDir.toPath)
val mockScheduler = new MockScheduler(time)
val mockBrokerTopicStats = new BrokerTopicStats
@ -2844,7 +2844,7 @@ class ReplicaManagerTest {
// Expect to call LogManager.truncateTo exactly once
val topicPartitionObj = new TopicPartition(topic, topicPartition)
val mockLogMgr: LogManager = mock(classOf[LogManager])
when(mockLogMgr.liveLogDirs).thenReturn(config.logDirs.map(new File(_).getAbsoluteFile))
when(mockLogMgr.liveLogDirs).thenReturn(config.logDirs.asScala.map(new File(_).getAbsoluteFile))
when(mockLogMgr.getOrCreateLog(ArgumentMatchers.eq(topicPartitionObj), ArgumentMatchers.eq(false), ArgumentMatchers.eq(false), any(), any())).thenReturn(mockLog)
when(mockLogMgr.getLog(topicPartitionObj, isFuture = false)).thenReturn(Some(mockLog))
when(mockLogMgr.getLog(topicPartitionObj, isFuture = true)).thenReturn(None)
@ -3218,7 +3218,7 @@ class ReplicaManagerTest {
transactionalTopicPartitions: List[TopicPartition],
config: KafkaConfig = config,
scheduler: Scheduler = new MockScheduler(time)): ReplicaManager = {
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)))
val replicaManager = new ReplicaManager(
metrics = metrics,
config = config,
@ -3269,7 +3269,7 @@ class ReplicaManagerTest {
val mockLog = setupMockLog(path1)
if (setupLogDirMetaProperties) {
// add meta.properties file in each dir
config.logDirs.foreach(dir => {
config.logDirs.stream().forEach(dir => {
val metaProps = new MetaProperties.Builder().
setVersion(MetaPropertiesVersion.V0).
setClusterId("clusterId").
@ -3280,7 +3280,7 @@ class ReplicaManagerTest {
new File(new File(dir), MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath, false)
})
}
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(logProps), log = if (shouldMockLog) Some(mockLog) else None, remoteStorageSystemEnable = enableRemoteStorage)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(logProps), log = if (shouldMockLog) Some(mockLog) else None, remoteStorageSystemEnable = enableRemoteStorage)
val logConfig = new LogConfig(logProps)
when(mockLog.config).thenReturn(logConfig)
when(mockLog.remoteLogEnabled()).thenReturn(enableRemoteStorage)
@ -3617,8 +3617,8 @@ class ReplicaManagerTest {
val config0 = KafkaConfig.fromProps(props0)
val config1 = KafkaConfig.fromProps(props1)
val mockLogMgr0 = TestUtils.createLogManager(config0.logDirs.map(new File(_)))
val mockLogMgr1 = TestUtils.createLogManager(config1.logDirs.map(new File(_)))
val mockLogMgr0 = TestUtils.createLogManager(config0.logDirs.asScala.map(new File(_)))
val mockLogMgr1 = TestUtils.createLogManager(config1.logDirs.asScala.map(new File(_)))
val metadataCache0: MetadataCache = mock(classOf[MetadataCache])
val metadataCache1: MetadataCache = mock(classOf[MetadataCache])
@ -4212,7 +4212,7 @@ class ReplicaManagerTest {
def createReplicaManager(): ReplicaManager = {
val props = TestUtils.createBrokerConfig(1)
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)))
new ReplicaManager(
metrics = metrics,
config = config,
@ -5806,7 +5806,7 @@ class ReplicaManagerTest {
try {
val responses = replicaManager.describeLogDirs(Set(new TopicPartition(topic, topicPartition)))
assertEquals(mockLogMgr.liveLogDirs.size, responses.size)
responses.foreach { response =>
responses.forEach { response =>
assertEquals(Errors.NONE.code, response.errorCode)
assertTrue(response.totalBytes > 0)
assertTrue(response.usableBytes >= 0)
@ -5838,7 +5838,7 @@ class ReplicaManagerTest {
try {
val responses = replicaManager.describeLogDirs(Set(new TopicPartition(noneTopic, topicPartition)))
assertEquals(mockLogMgr.liveLogDirs.size, responses.size)
responses.foreach { response =>
responses.forEach { response =>
assertEquals(Errors.NONE.code, response.errorCode)
assertTrue(response.totalBytes > 0)
assertTrue(response.usableBytes >= 0)
@ -5851,7 +5851,7 @@ class ReplicaManagerTest {
@Test
def testCheckpointHwOnShutdown(): Unit = {
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)))
val spyRm = spy(new ReplicaManager(
metrics = metrics,
config = config,
@ -6085,7 +6085,7 @@ class ReplicaManagerTest {
@Test
def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)))
val rm = new ReplicaManager(
metrics = metrics,
config = config,

View File

@ -104,7 +104,7 @@ class ServerShutdownTest extends KafkaServerTestHarness {
// do a clean shutdown and check that offset checkpoint file exists
shutdownBroker()
for (logDir <- config.logDirs) {
for (logDir <- config.logDirs.asScala) {
val OffsetCheckpointFile = new File(logDir, LogManager.RECOVERY_POINT_CHECKPOINT_FILE)
assertTrue(OffsetCheckpointFile.exists)
assertTrue(OffsetCheckpointFile.length() > 0)
@ -146,7 +146,7 @@ class ServerShutdownTest extends KafkaServerTestHarness {
def testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(): Unit = {
createTopic(topic)
shutdownBroker()
config.logDirs.foreach { dirName =>
config.logDirs.forEach { dirName =>
val partitionDir = new File(dirName, s"$topic-0")
partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, TestUtils.random.nextInt(1024) + 1))
}

View File

@ -1089,14 +1089,14 @@ object TestUtils extends Logging {
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.containsKey(tp))
}), "Cleaner offset for deleted partition should have been removed")
waitUntilTrue(() => brokers.forall(broker =>
broker.config.logDirs.forall { logDir =>
broker.config.logDirs.stream().allMatch { logDir =>
topicPartitions.forall { tp =>
!new File(logDir, tp.topic + "-" + tp.partition).exists()
}
}
), "Failed to soft-delete the data to a delete directory")
waitUntilTrue(() => brokers.forall(broker =>
broker.config.logDirs.forall { logDir =>
broker.config.logDirs.stream().allMatch { logDir =>
topicPartitions.forall { tp =>
!util.Arrays.asList(new File(logDir).list()).asScala.exists { partitionDirectoryNames =>
partitionDirectoryNames.exists { directoryName =>

View File

@ -93,7 +93,6 @@ import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import scala.Option;
import scala.jdk.javaapi.CollectionConverters;
@ -129,7 +128,7 @@ public class ReplicaFetcherThreadBenchmark {
BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false);
LogDirFailureChannel logDirFailureChannel = new LogDirFailureChannel(config.logDirs().size());
List<File> logDirs = CollectionConverters.asJava(config.logDirs()).stream().map(File::new).collect(Collectors.toList());
List<File> logDirs = config.logDirs().stream().map(File::new).toList();
logManager = new LogManagerBuilder().
setLogDirs(logDirs).
setInitialOfflineDirs(Collections.emptyList()).

View File

@ -61,7 +61,6 @@ import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import scala.Option;
import scala.jdk.javaapi.CollectionConverters;
@ -108,8 +107,7 @@ public class CheckpointBench {
this.metrics = new Metrics();
this.time = new MockTime();
this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size());
final List<File> files =
CollectionConverters.asJava(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
final List<File> files = brokerProperties.logDirs().stream().map(File::new).toList();
this.logManager = TestUtils.createLogManager(CollectionConverters.asScala(files),
new LogConfig(new Properties()), new MockConfigRepository(), new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d,
1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, 4, false, Option.empty(), false, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT);

View File

@ -65,7 +65,6 @@ import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import scala.Option;
import scala.jdk.javaapi.CollectionConverters;
@ -115,8 +114,7 @@ public class PartitionCreationBench {
this.time = Time.SYSTEM;
this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size());
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false);
final List<File> files =
CollectionConverters.asJava(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
final List<File> files = brokerProperties.logDirs().stream().map(File::new).toList();
CleanerConfig cleanerConfig = new CleanerConfig(1,
4 * 1024 * 1024L, 0.9d,
1024 * 1024, 32 * 1024 * 1024,

View File

@ -31,11 +31,13 @@ import org.apache.kafka.raft.MetadataLogConfig;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.metrics.MetricConfigs;
import org.apache.kafka.server.util.Csv;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.storage.internals.log.LogConfig;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* During moving {@link kafka.server.KafkaConfig} out of core AbstractKafkaConfig will be the future KafkaConfig
@ -70,6 +72,10 @@ public abstract class AbstractKafkaConfig extends AbstractConfig {
super(definition, originals, configProviderProps, doLog);
}
public List<String> logDirs() {
return Csv.parseCsvList(Optional.ofNullable(getString(ServerLogConfigs.LOG_DIRS_CONFIG)).orElse(getString(ServerLogConfigs.LOG_DIR_CONFIG)));
}
public int numIoThreads() {
return getInt(ServerConfigs.NUM_IO_THREADS_CONFIG);
}

View File

@ -42,6 +42,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import scala.collection.Seq;
@ -154,7 +155,7 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
public static List<BrokerLocalStorage> localStorages(Seq<KafkaBroker> brokers) {
return CollectionConverters.asJava(brokers).stream()
.map(b -> new BrokerLocalStorage(b.config().brokerId(), CollectionConverters.asJava(b.config().logDirs().toSet()),
.map(b -> new BrokerLocalStorage(b.config().brokerId(), Set.copyOf(b.config().logDirs()),
STORAGE_WAIT_TIMEOUT_SEC))
.collect(Collectors.toList());
}

View File

@ -33,6 +33,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import scala.jdk.javaapi.CollectionConverters;
@ -75,7 +76,7 @@ public class TransactionsWithTieredStoreTest extends TransactionsTest {
public void maybeWaitForAtLeastOneSegmentUpload(scala.collection.Seq<TopicPartition> topicPartitions) {
CollectionConverters.asJava(topicPartitions).forEach(topicPartition -> {
List<BrokerLocalStorage> localStorages = CollectionConverters.asJava(brokers()).stream()
.map(b -> new BrokerLocalStorage(b.config().brokerId(), CollectionConverters.asJava(b.config().logDirs().toSet()), STORAGE_WAIT_TIMEOUT_SEC))
.map(b -> new BrokerLocalStorage(b.config().brokerId(), Set.copyOf(b.config().logDirs()), STORAGE_WAIT_TIMEOUT_SEC))
.toList();
localStorages
.stream()

View File

@ -352,14 +352,14 @@ public interface ClusterInstance {
// Ensure that the topic directories are soft-deleted
TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
CollectionConverters.asJava(broker.config().logDirs()).stream().allMatch(logDir ->
broker.config().logDirs().stream().allMatch(logDir ->
topicPartitions.stream().noneMatch(tp ->
new File(logDir, tp.topic() + "-" + tp.partition()).exists()))),
"Failed to soft-delete the data to a delete directory");
// Ensure that the topic directories are hard-deleted
TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
CollectionConverters.asJava(broker.config().logDirs()).stream().allMatch(logDir ->
broker.config().logDirs().stream().allMatch(logDir ->
topicPartitions.stream().allMatch(tp ->
Arrays.stream(Objects.requireNonNull(new File(logDir).list())).noneMatch(partitionDirectoryName ->
partitionDirectoryName.startsWith(tp.topic() + "-" + tp.partition()) &&