diff --git a/checkstyle/import-control-clients-integration-tests.xml b/checkstyle/import-control-clients-integration-tests.xml index 752dcace7f2..255efb43de8 100644 --- a/checkstyle/import-control-clients-integration-tests.xml +++ b/checkstyle/import-control-clients-integration-tests.xml @@ -21,7 +21,6 @@ - diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ListOffsetsIntegrationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ListOffsetsIntegrationTest.java index 6b40203c012..8487f0ccda5 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ListOffsetsIntegrationTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ListOffsetsIntegrationTest.java @@ -44,13 +44,9 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import scala.jdk.javaapi.CollectionConverters; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -242,9 +238,8 @@ public class ListOffsetsIntegrationTest { // case 2: test the offsets from recovery path. // server will rebuild offset index according to log files if the index files are nonexistent - Set indexFiles = clusterInstance.brokers().values().stream().flatMap(broker -> - CollectionConverters.asJava(broker.config().logDirs()).stream() - ).collect(Collectors.toUnmodifiableSet()); + List indexFiles = clusterInstance.brokers().values().stream().flatMap(broker -> + broker.config().logDirs().stream()).toList(); clusterInstance.brokers().values().forEach(KafkaBroker::shutdown); indexFiles.forEach(root -> { File[] files = new File(String.format("%s/%s-0", root, topic)).listFiles(); diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 400f0aa7248..d9da339e215 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1536,7 +1536,7 @@ object LogManager { val cleanerConfig = new CleanerConfig(config) val transactionLogConfig = new TransactionLogConfig(config) - new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile), + new LogManager(logDirs = config.logDirs.asScala.map(new File(_).getAbsoluteFile), initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile), configRepository = configRepository, initialDefaultConfig = defaultLogConfig, diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index 0727c660fe4..d7775d88dbe 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -79,6 +79,7 @@ object KafkaRaftManager { private def hasDifferentLogDir(config: KafkaConfig): Boolean = { !config .logDirs + .asScala .map(Paths.get(_).toAbsolutePath) .contains(Paths.get(config.metadataLogDir).toAbsolutePath) } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 3a7fd798801..b4be10656e2 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -711,7 +711,7 @@ class BrokerServer( None } - val rlm = new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, + val rlm = new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.get(0), clusterId, time, (tp: TopicPartition) => logManager.getLog(tp).toJava, (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => { logManager.getLog(tp).foreach { log => diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ad19902ea79..05ed16abffb 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2228,12 +2228,12 @@ class KafkaApis(val requestChannel: RequestChannel, (replicaManager.describeLogDirs(partitions), Errors.NONE) } else { - (List.empty[DescribeLogDirsResponseData.DescribeLogDirsResult], Errors.CLUSTER_AUTHORIZATION_FAILED) + (util.Collections.emptyList[DescribeLogDirsResponseData.DescribeLogDirsResult], Errors.CLUSTER_AUTHORIZATION_FAILED) } } requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => new DescribeLogDirsResponse(new DescribeLogDirsResponseData() .setThrottleTimeMs(throttleTimeMs) - .setResults(logDirInfos.asJava) + .setResults(logDirInfos) .setErrorCode(error.code))) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index c9ead9cfc64..487e0b1fac4 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -248,7 +248,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) def metadataLogDir: String = { Option(getString(MetadataLogConfig.METADATA_LOG_DIR_CONFIG)) match { case Some(dir) => dir - case None => logDirs.head + case None => logDirs.get(0) } } @@ -322,7 +322,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) /** ********* Log Configuration ***********/ val autoCreateTopicsEnable = getBoolean(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG) val numPartitions = getInt(ServerLogConfigs.NUM_PARTITIONS_CONFIG) - val logDirs: Seq[String] = Csv.parseCsvList(Option(getString(ServerLogConfigs.LOG_DIRS_CONFIG)).getOrElse(getString(ServerLogConfigs.LOG_DIR_CONFIG))).asScala def logSegmentBytes = getInt(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG) def logFlushIntervalMessages = getLong(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG) def logCleanerThreads = getInt(CleanerConfig.LOG_CLEANER_THREADS_PROP) @@ -587,7 +586,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1") require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0") require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1") - require(logDirs.nonEmpty, "At least one log directory must be defined via log.dirs or log.dir.") + require(logDirs.size > 0, "At least one log directory must be defined via log.dirs or log.dir.") require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.") require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" + " to prevent unnecessary socket timeouts") diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index 34ee4a725f2..e3497a6ff88 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -134,7 +134,7 @@ object KafkaRaftServer { // Load and verify the original ensemble. val loader = new MetaPropertiesEnsemble.Loader() loader.addMetadataLogDir(config.metadataLogDir) - .addLogDirs(config.logDirs.asJava) + .addLogDirs(config.logDirs) val initialMetaPropsEnsemble = loader.load() val verificationFlags = util.EnumSet.of(REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR) initialMetaPropsEnsemble.verify(Optional.empty(), OptionalInt.of(config.nodeId), verificationFlags) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index f0dee497939..b0b2e72602a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1275,10 +1275,10 @@ class ReplicaManager(val config: KafkaConfig, * 2) size and lag of current and future logs for each partition in the given log directory. Only logs of the queried partitions * are included. There may be future logs (which will replace the current logs of the partition in the future) on the broker after KIP-113 is implemented. */ - def describeLogDirs(partitions: Set[TopicPartition]): List[DescribeLogDirsResponseData.DescribeLogDirsResult] = { + def describeLogDirs(partitions: Set[TopicPartition]): util.List[DescribeLogDirsResponseData.DescribeLogDirsResult] = { val logsByDir = logManager.allLogs.groupBy(log => log.parentDir) - config.logDirs.toSet.map { logDir: String => + config.logDirs.stream().distinct().map(logDir => { val file = Paths.get(logDir) val absolutePath = file.toAbsolutePath.toString try { @@ -1326,7 +1326,7 @@ class ReplicaManager(val config: KafkaConfig, .setLogDir(absolutePath) .setErrorCode(Errors.forException(t).code) } - }.toList + }).toList() } // See: https://bugs.openjdk.java.net/browse/JDK-8162520 diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 40892bca38c..08a29b3d01d 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -376,7 +376,7 @@ object StorageTool extends Logging { def configToLogDirectories(config: KafkaConfig): Seq[String] = { val directories = new mutable.TreeSet[String] - directories ++= config.logDirs + directories ++= config.logDirs.asScala Option(config.metadataLogDir).foreach(directories.add) directories.toSeq } diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index d41c796374b..8af81d97a49 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -75,9 +75,9 @@ object CoreUtils { /** * Recursively delete the list of files/directories and any subfiles (if any exist) - * @param files sequence of files to be deleted + * @param files list of files to be deleted */ - def delete(files: Seq[String]): Unit = files.foreach(f => Utils.delete(new File(f))) + def delete(files: java.util.List[String]): Unit = files.forEach(f => Utils.delete(new File(f))) /** * Register the given mbean with the platform mbean server, diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 1fbda492d4f..4159e1d32e2 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -894,11 +894,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // Generate two mutually exclusive replicaAssignment val firstReplicaAssignment = brokers.map { server => - val logDir = new File(server.config.logDirs(randomNums(server))).getAbsolutePath + val logDir = new File(server.config.logDirs.get(randomNums(server))).getAbsolutePath new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir }.toMap val secondReplicaAssignment = brokers.map { server => - val logDir = new File(server.config.logDirs(1 - randomNums(server))).getAbsolutePath + val logDir = new File(server.config.logDirs.get(1 - randomNums(server))).getAbsolutePath new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir }.toMap @@ -1520,7 +1520,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } // we will create another dir just for one server - val futureLogDir = brokers(0).config.logDirs(1) + val futureLogDir = brokers(0).config.logDirs.get(1) val futureReplica = new TopicPartitionReplica(topic, 0, brokers(0).config.brokerId) // Verify that replica can be moved to the specified log directory @@ -3570,7 +3570,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertFutureThrows(classOf[InvalidTopicException], results.get(invalidTopicName)) assertFutureThrows(classOf[InvalidTopicException], client.alterReplicaLogDirs( - Map(new TopicPartitionReplica(longTopicName, 0, 0) -> brokers(0).config.logDirs(0)).asJava).all()) + util.Map.of(new TopicPartitionReplica(longTopicName, 0, 0), brokers(0).config.logDirs.get(0))).all()) client.close() } diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 234518ea83a..988fe9b264c 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -1551,7 +1551,7 @@ class KRaftClusterTest { // Copy foo-0 to targetParentDir // This is so that we can rename the main replica to a future down below val parentDir = log.parentDir - val targetParentDir = broker0.config.logDirs.filter(_ != parentDir).head + val targetParentDir = broker0.config.logDirs.stream().filter(l => !l.equals(parentDir)).findFirst().get() val targetDirFile = new File(targetParentDir, log.dir.getName) targetDirFile.mkdir() copyDirectory(log.dir.toString(), targetDirFile.toString()) diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index d36c0ed1b6f..57c2641393a 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -81,7 +81,7 @@ class KRaftQuorumImplementation( ): KafkaBroker = { val metaPropertiesEnsemble = { val loader = new MetaPropertiesEnsemble.Loader() - loader.addLogDirs(config.logDirs.asJava) + loader.addLogDirs(config.logDirs) loader.addMetadataLogDir(config.metadataLogDir) val ensemble = loader.load() val copier = new MetaPropertiesEnsemble.Copier(ensemble) diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala index b516be4d930..335e005d9a5 100644 --- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala +++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala @@ -62,7 +62,7 @@ class LocalLeaderEndPointTest extends Logging { def setUp(): Unit = { val props = TestUtils.createBrokerConfig(sourceBroker.id, port = sourceBroker.port) val config = KafkaConfig.fromProps(props) - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_))) val alterPartitionManager = mock(classOf[AlterPartitionManager]) val metrics = new Metrics quotaManager = QuotaFactory.instantiate(config, metrics, time, "", "") diff --git a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala index f5bd92ce15e..b45f3d62f47 100644 --- a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala @@ -56,7 +56,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { val partitionNum = 5 // Alter replica dir before topic creation - val logDir1 = new File(brokers.head.config.logDirs(Random.nextInt(logDirCount))).getAbsolutePath + val logDir1 = new File(brokers.head.config.logDirs.get(Random.nextInt(logDirCount))).getAbsolutePath val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) @@ -73,7 +73,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { } // Alter replica dir again after topic creation - val logDir2 = new File(brokers.head.config.logDirs(Random.nextInt(logDirCount))).getAbsolutePath + val logDir2 = new File(brokers.head.config.logDirs.get(Random.nextInt(logDirCount))).getAbsolutePath val partitionDirs2 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir2).toMap val alterReplicaLogDirsResponse2 = sendAlterReplicaLogDirsRequest(partitionDirs2) // The response should succeed for all partitions @@ -88,10 +88,10 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { @Test def testAlterReplicaLogDirsRequestErrorCode(): Unit = { - val offlineDir = new File(brokers.head.config.logDirs.tail.head).getAbsolutePath - val validDir1 = new File(brokers.head.config.logDirs(1)).getAbsolutePath - val validDir2 = new File(brokers.head.config.logDirs(2)).getAbsolutePath - val validDir3 = new File(brokers.head.config.logDirs(3)).getAbsolutePath + val offlineDir = new File(brokers.head.config.logDirs.get(1)).getAbsolutePath + val validDir1 = new File(brokers.head.config.logDirs.get(1)).getAbsolutePath + val validDir2 = new File(brokers.head.config.logDirs.get(2)).getAbsolutePath + val validDir3 = new File(brokers.head.config.logDirs.get(3)).getAbsolutePath // Test AlterReplicaDirRequest before topic creation val partitionDirs1 = mutable.Map.empty[TopicPartition, String] @@ -129,7 +129,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { val partitionNum = 1 // Alter replica dir before topic creation - val logDir1 = new File(brokers.head.config.logDirs(1)).getAbsolutePath + val logDir1 = new File(brokers.head.config.logDirs.get(1)).getAbsolutePath val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) @@ -162,7 +162,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { }, "timed out waiting for log segment to retention") // Alter replica dir again after topic creation - val logDir2 = new File(brokers.head.config.logDirs(2)).getAbsolutePath + val logDir2 = new File(brokers.head.config.logDirs.get(2)).getAbsolutePath val alterReplicaLogDirsResponse2 = sendAlterReplicaLogDirsRequest(Map(tp -> logDir2)) // The response should succeed for all partitions assertEquals(Errors.NONE, findErrorForPartition(alterReplicaLogDirsResponse2, tp)) diff --git a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala index c7efd608dfd..ab2ea99782d 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala @@ -40,8 +40,8 @@ class DescribeLogDirsRequestTest extends BaseRequestTest { @Test def testDescribeLogDirsRequest(): Unit = { - val onlineDir = new File(brokers.head.config.logDirs.head).getAbsolutePath - val offlineDir = new File(brokers.head.config.logDirs.tail.head).getAbsolutePath + val onlineDir = new File(brokers.head.config.logDirs.get(0)).getAbsolutePath + val offlineDir = new File(brokers.head.config.logDirs.get(1)).getAbsolutePath brokers.head.replicaManager.handleLogDirFailure(offlineDir) createTopic(topic, partitionNum, 1) TestUtils.generateAndProduceMessages(brokers, topic, 10) diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 378c38e530f..9ea25f76b46 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -34,6 +34,8 @@ import org.apache.kafka.storage.internals.log.{CleanerConfig, LogDirFailureChann import java.util.Optional +import scala.jdk.CollectionConverters._ + class HighwatermarkPersistenceTest { val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps) @@ -41,7 +43,7 @@ class HighwatermarkPersistenceTest { val configRepository = new MockConfigRepository() val logManagers = configs map { config => TestUtils.createLogManager( - logDirs = config.logDirs.map(new File(_)), + logDirs = config.logDirs.asScala.map(new File(_)), cleanerConfig = new CleanerConfig(true)) } @@ -195,7 +197,7 @@ class HighwatermarkPersistenceTest { } private def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = { - replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.head).getAbsolutePath).read().getOrDefault( + replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.get(0)).getAbsolutePath).read().getOrDefault( new TopicPartition(topic, partition), 0L) } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 7f78f2ba590..3f6063e6e02 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1164,7 +1164,7 @@ class KafkaConfigTest { assertEquals(1, config.brokerId) assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedBrokerListeners.map(JTestUtils.endpointToString)) assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides) - assertEquals(List("/tmp1", "/tmp2"), config.logDirs) + assertEquals(util.List.of("/tmp1", "/tmp2"), config.logDirs) assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis) assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis) assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis) @@ -1499,7 +1499,7 @@ class KafkaConfigTest { val config = KafkaConfig.fromProps(props) assertEquals(metadataDir, config.metadataLogDir) - assertEquals(Seq(dataDir), config.logDirs) + assertEquals(util.List.of(dataDir), config.logDirs) } @Test @@ -1517,7 +1517,7 @@ class KafkaConfigTest { val config = KafkaConfig.fromProps(props) assertEquals(dataDir1, config.metadataLogDir) - assertEquals(Seq(dataDir1, dataDir2), config.logDirs) + assertEquals(util.List.of(dataDir1, dataDir2), config.logDirs) } @Test diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index a05cb9f030d..6e2f3073ba1 100755 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -60,8 +60,8 @@ class LogRecoveryTest extends QuorumTestHarness { var admin: Admin = _ var producer: KafkaProducer[Integer, String] = _ - def hwFile1 = new OffsetCheckpointFile(new File(configProps1.logDirs.head, ReplicaManager.HighWatermarkFilename), null) - def hwFile2 = new OffsetCheckpointFile(new File(configProps2.logDirs.head, ReplicaManager.HighWatermarkFilename), null) + def hwFile1 = new OffsetCheckpointFile(new File(configProps1.logDirs.get(0), ReplicaManager.HighWatermarkFilename), null) + def hwFile2 = new OffsetCheckpointFile(new File(configProps2.logDirs.get(0), ReplicaManager.HighWatermarkFilename), null) var servers = Seq.empty[KafkaBroker] // Some tests restart the brokers then produce more data. But since test brokers use random ports, we need diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 8219b79d49e..d33a68a8348 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -187,7 +187,7 @@ class ReplicaManagerTest { @Test def testHighWaterMarkDirectoryMapping(): Unit = { - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_))) val rm = new ReplicaManager( metrics = metrics, config = config, @@ -203,8 +203,8 @@ class ReplicaManagerTest { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None) rm.checkpointHighWatermarks() - config.logDirs.map(s => Paths.get(s, ReplicaManager.HighWatermarkFilename)) - .foreach(checkpointFile => assertTrue(Files.exists(checkpointFile), + config.logDirs.stream().map(s => Paths.get(s, ReplicaManager.HighWatermarkFilename)) + .forEach(checkpointFile => assertTrue(Files.exists(checkpointFile), s"checkpoint file does not exist at $checkpointFile")) } finally { rm.shutdown(checkpointHW = false) @@ -216,7 +216,7 @@ class ReplicaManagerTest { val props = TestUtils.createBrokerConfig(1) props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) val config = KafkaConfig.fromProps(props) - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_))) val rm = new ReplicaManager( metrics = metrics, config = config, @@ -232,8 +232,8 @@ class ReplicaManagerTest { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None) rm.checkpointHighWatermarks() - config.logDirs.map(s => Paths.get(s, ReplicaManager.HighWatermarkFilename)) - .foreach(checkpointFile => assertTrue(Files.exists(checkpointFile), + config.logDirs.stream().map(s => Paths.get(s, ReplicaManager.HighWatermarkFilename)) + .forEach(checkpointFile => assertTrue(Files.exists(checkpointFile), s"checkpoint file does not exist at $checkpointFile")) } finally { rm.shutdown(checkpointHW = false) @@ -242,7 +242,7 @@ class ReplicaManagerTest { @Test def testIllegalRequiredAcks(): Unit = { - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_))) val rm = new ReplicaManager( metrics = metrics, config = config, @@ -293,7 +293,7 @@ class ReplicaManagerTest { val props = TestUtils.createBrokerConfig(0) props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) val config = KafkaConfig.fromProps(props) - val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) + val logManager = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(new Properties())) mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) when(metadataCache.metadataVersion()).thenReturn(MetadataVersion.MINIMUM_VERSION) val rm = new ReplicaManager( @@ -353,7 +353,7 @@ class ReplicaManagerTest { val props = TestUtils.createBrokerConfig(0) props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) val config = KafkaConfig.fromProps(props) - val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) + val logManager = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(new Properties())) val spyLogManager = spy(logManager) val metadataCache: MetadataCache = mock(classOf[MetadataCache]) mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) @@ -425,7 +425,7 @@ class ReplicaManagerTest { props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) val config = KafkaConfig.fromProps(props) val logProps = new Properties() - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(logProps)) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(logProps)) val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1)) mockGetAliveBrokerFunctions(metadataCache, aliveBrokers) when(metadataCache.metadataVersion()).thenReturn(MetadataVersion.MINIMUM_VERSION) @@ -2783,7 +2783,7 @@ class ReplicaManagerTest { props.asScala ++= extraProps.asScala val config = KafkaConfig.fromProps(props) val logConfig = new LogConfig(new Properties) - val logDir = new File(new File(config.logDirs.head), s"$topic-$topicPartition") + val logDir = new File(new File(config.logDirs.get(0)), s"$topic-$topicPartition") Files.createDirectories(logDir.toPath) val mockScheduler = new MockScheduler(time) val mockBrokerTopicStats = new BrokerTopicStats @@ -2844,7 +2844,7 @@ class ReplicaManagerTest { // Expect to call LogManager.truncateTo exactly once val topicPartitionObj = new TopicPartition(topic, topicPartition) val mockLogMgr: LogManager = mock(classOf[LogManager]) - when(mockLogMgr.liveLogDirs).thenReturn(config.logDirs.map(new File(_).getAbsoluteFile)) + when(mockLogMgr.liveLogDirs).thenReturn(config.logDirs.asScala.map(new File(_).getAbsoluteFile)) when(mockLogMgr.getOrCreateLog(ArgumentMatchers.eq(topicPartitionObj), ArgumentMatchers.eq(false), ArgumentMatchers.eq(false), any(), any())).thenReturn(mockLog) when(mockLogMgr.getLog(topicPartitionObj, isFuture = false)).thenReturn(Some(mockLog)) when(mockLogMgr.getLog(topicPartitionObj, isFuture = true)).thenReturn(None) @@ -3218,7 +3218,7 @@ class ReplicaManagerTest { transactionalTopicPartitions: List[TopicPartition], config: KafkaConfig = config, scheduler: Scheduler = new MockScheduler(time)): ReplicaManager = { - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_))) val replicaManager = new ReplicaManager( metrics = metrics, config = config, @@ -3269,7 +3269,7 @@ class ReplicaManagerTest { val mockLog = setupMockLog(path1) if (setupLogDirMetaProperties) { // add meta.properties file in each dir - config.logDirs.foreach(dir => { + config.logDirs.stream().forEach(dir => { val metaProps = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V0). setClusterId("clusterId"). @@ -3280,7 +3280,7 @@ class ReplicaManagerTest { new File(new File(dir), MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath, false) }) } - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(logProps), log = if (shouldMockLog) Some(mockLog) else None, remoteStorageSystemEnable = enableRemoteStorage) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(logProps), log = if (shouldMockLog) Some(mockLog) else None, remoteStorageSystemEnable = enableRemoteStorage) val logConfig = new LogConfig(logProps) when(mockLog.config).thenReturn(logConfig) when(mockLog.remoteLogEnabled()).thenReturn(enableRemoteStorage) @@ -3617,8 +3617,8 @@ class ReplicaManagerTest { val config0 = KafkaConfig.fromProps(props0) val config1 = KafkaConfig.fromProps(props1) - val mockLogMgr0 = TestUtils.createLogManager(config0.logDirs.map(new File(_))) - val mockLogMgr1 = TestUtils.createLogManager(config1.logDirs.map(new File(_))) + val mockLogMgr0 = TestUtils.createLogManager(config0.logDirs.asScala.map(new File(_))) + val mockLogMgr1 = TestUtils.createLogManager(config1.logDirs.asScala.map(new File(_))) val metadataCache0: MetadataCache = mock(classOf[MetadataCache]) val metadataCache1: MetadataCache = mock(classOf[MetadataCache]) @@ -4212,7 +4212,7 @@ class ReplicaManagerTest { def createReplicaManager(): ReplicaManager = { val props = TestUtils.createBrokerConfig(1) val config = KafkaConfig.fromProps(props) - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_))) new ReplicaManager( metrics = metrics, config = config, @@ -5806,7 +5806,7 @@ class ReplicaManagerTest { try { val responses = replicaManager.describeLogDirs(Set(new TopicPartition(topic, topicPartition))) assertEquals(mockLogMgr.liveLogDirs.size, responses.size) - responses.foreach { response => + responses.forEach { response => assertEquals(Errors.NONE.code, response.errorCode) assertTrue(response.totalBytes > 0) assertTrue(response.usableBytes >= 0) @@ -5838,7 +5838,7 @@ class ReplicaManagerTest { try { val responses = replicaManager.describeLogDirs(Set(new TopicPartition(noneTopic, topicPartition))) assertEquals(mockLogMgr.liveLogDirs.size, responses.size) - responses.foreach { response => + responses.forEach { response => assertEquals(Errors.NONE.code, response.errorCode) assertTrue(response.totalBytes > 0) assertTrue(response.usableBytes >= 0) @@ -5851,7 +5851,7 @@ class ReplicaManagerTest { @Test def testCheckpointHwOnShutdown(): Unit = { - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_))) val spyRm = spy(new ReplicaManager( metrics = metrics, config = config, @@ -6085,7 +6085,7 @@ class ReplicaManagerTest { @Test def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = { - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_))) val rm = new ReplicaManager( metrics = metrics, config = config, diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index cdc15b6c2be..ec1679dd632 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -104,7 +104,7 @@ class ServerShutdownTest extends KafkaServerTestHarness { // do a clean shutdown and check that offset checkpoint file exists shutdownBroker() - for (logDir <- config.logDirs) { + for (logDir <- config.logDirs.asScala) { val OffsetCheckpointFile = new File(logDir, LogManager.RECOVERY_POINT_CHECKPOINT_FILE) assertTrue(OffsetCheckpointFile.exists) assertTrue(OffsetCheckpointFile.length() > 0) @@ -146,7 +146,7 @@ class ServerShutdownTest extends KafkaServerTestHarness { def testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(): Unit = { createTopic(topic) shutdownBroker() - config.logDirs.foreach { dirName => + config.logDirs.forEach { dirName => val partitionDir = new File(dirName, s"$topic-0") partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, TestUtils.random.nextInt(1024) + 1)) } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index f237ee3a339..23eeb7e9926 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1089,14 +1089,14 @@ object TestUtils extends Logging { checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.containsKey(tp)) }), "Cleaner offset for deleted partition should have been removed") waitUntilTrue(() => brokers.forall(broker => - broker.config.logDirs.forall { logDir => + broker.config.logDirs.stream().allMatch { logDir => topicPartitions.forall { tp => !new File(logDir, tp.topic + "-" + tp.partition).exists() } } ), "Failed to soft-delete the data to a delete directory") waitUntilTrue(() => brokers.forall(broker => - broker.config.logDirs.forall { logDir => + broker.config.logDirs.stream().allMatch { logDir => topicPartitions.forall { tp => !util.Arrays.asList(new File(logDir).list()).asScala.exists { partitionDirectoryNames => partitionDirectoryNames.exists { directoryName => diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index d6b84c7d9a1..ae7a44a4401 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -93,7 +93,6 @@ import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import scala.Option; import scala.jdk.javaapi.CollectionConverters; @@ -129,7 +128,7 @@ public class ReplicaFetcherThreadBenchmark { BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false); LogDirFailureChannel logDirFailureChannel = new LogDirFailureChannel(config.logDirs().size()); - List logDirs = CollectionConverters.asJava(config.logDirs()).stream().map(File::new).collect(Collectors.toList()); + List logDirs = config.logDirs().stream().map(File::new).toList(); logManager = new LogManagerBuilder(). setLogDirs(logDirs). setInitialOfflineDirs(Collections.emptyList()). diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java index 2db7f2bb1c2..9e7c174f039 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java @@ -61,7 +61,6 @@ import java.util.List; import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import scala.Option; import scala.jdk.javaapi.CollectionConverters; @@ -108,8 +107,7 @@ public class CheckpointBench { this.metrics = new Metrics(); this.time = new MockTime(); this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size()); - final List files = - CollectionConverters.asJava(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList()); + final List files = brokerProperties.logDirs().stream().map(File::new).toList(); this.logManager = TestUtils.createLogManager(CollectionConverters.asScala(files), new LogConfig(new Properties()), new MockConfigRepository(), new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d, 1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, 4, false, Option.empty(), false, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java index 4d544926afa..11f9ca54c0b 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java @@ -65,7 +65,6 @@ import java.util.List; import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import scala.Option; import scala.jdk.javaapi.CollectionConverters; @@ -115,8 +114,7 @@ public class PartitionCreationBench { this.time = Time.SYSTEM; this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size()); final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false); - final List files = - CollectionConverters.asJava(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList()); + final List files = brokerProperties.logDirs().stream().map(File::new).toList(); CleanerConfig cleanerConfig = new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d, 1024 * 1024, 32 * 1024 * 1024, diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index ef61fee1542..0cb4be79f2a 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -31,11 +31,13 @@ import org.apache.kafka.raft.MetadataLogConfig; import org.apache.kafka.raft.QuorumConfig; import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; import org.apache.kafka.server.metrics.MetricConfigs; +import org.apache.kafka.server.util.Csv; import org.apache.kafka.storage.internals.log.CleanerConfig; import org.apache.kafka.storage.internals.log.LogConfig; import java.util.List; import java.util.Map; +import java.util.Optional; /** * During moving {@link kafka.server.KafkaConfig} out of core AbstractKafkaConfig will be the future KafkaConfig @@ -70,6 +72,10 @@ public abstract class AbstractKafkaConfig extends AbstractConfig { super(definition, originals, configProviderProps, doLog); } + public List logDirs() { + return Csv.parseCsvList(Optional.ofNullable(getString(ServerLogConfigs.LOG_DIRS_CONFIG)).orElse(getString(ServerLogConfigs.LOG_DIR_CONFIG))); + } + public int numIoThreads() { return getInt(ServerConfigs.NUM_IO_THREADS_CONFIG); } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java index a539321f6ac..40334a1726d 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.stream.Collectors; import scala.collection.Seq; @@ -154,7 +155,7 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness { public static List localStorages(Seq brokers) { return CollectionConverters.asJava(brokers).stream() - .map(b -> new BrokerLocalStorage(b.config().brokerId(), CollectionConverters.asJava(b.config().logDirs().toSet()), + .map(b -> new BrokerLocalStorage(b.config().brokerId(), Set.copyOf(b.config().logDirs()), STORAGE_WAIT_TIMEOUT_SEC)) .collect(Collectors.toList()); } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java index 908484ad227..f0bcfc8a22e 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java @@ -33,6 +33,7 @@ import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; import scala.jdk.javaapi.CollectionConverters; @@ -75,7 +76,7 @@ public class TransactionsWithTieredStoreTest extends TransactionsTest { public void maybeWaitForAtLeastOneSegmentUpload(scala.collection.Seq topicPartitions) { CollectionConverters.asJava(topicPartitions).forEach(topicPartition -> { List localStorages = CollectionConverters.asJava(brokers()).stream() - .map(b -> new BrokerLocalStorage(b.config().brokerId(), CollectionConverters.asJava(b.config().logDirs().toSet()), STORAGE_WAIT_TIMEOUT_SEC)) + .map(b -> new BrokerLocalStorage(b.config().brokerId(), Set.copyOf(b.config().logDirs()), STORAGE_WAIT_TIMEOUT_SEC)) .toList(); localStorages .stream() diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index 2fafc4139de..e6597e69cfd 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -352,14 +352,14 @@ public interface ClusterInstance { // Ensure that the topic directories are soft-deleted TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker -> - CollectionConverters.asJava(broker.config().logDirs()).stream().allMatch(logDir -> + broker.config().logDirs().stream().allMatch(logDir -> topicPartitions.stream().noneMatch(tp -> new File(logDir, tp.topic() + "-" + tp.partition()).exists()))), "Failed to soft-delete the data to a delete directory"); // Ensure that the topic directories are hard-deleted TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker -> - CollectionConverters.asJava(broker.config().logDirs()).stream().allMatch(logDir -> + broker.config().logDirs().stream().allMatch(logDir -> topicPartitions.stream().allMatch(tp -> Arrays.stream(Objects.requireNonNull(new File(logDir).list())).noneMatch(partitionDirectoryName -> partitionDirectoryName.startsWith(tp.topic() + "-" + tp.partition()) &&