diff --git a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java index de43d55203f..2e5e293b120 100644 --- a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java @@ -22,11 +22,11 @@ import kafka.log.LogConfig; import kafka.log.LogManager; import kafka.log.ProducerStateManagerConfig; import kafka.server.BrokerTopicStats; -import kafka.server.LogDirFailureChannel; import kafka.server.metadata.ConfigRepository; import kafka.utils.Scheduler; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.log.internals.LogDirFailureChannel; import scala.collection.JavaConverters; import java.io.File; diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java index e6b46f41c7e..bc7ebd8a2e7 100644 --- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java @@ -27,7 +27,6 @@ import kafka.server.DelayedFetch; import kafka.server.DelayedOperationPurgatory; import kafka.server.DelayedProduce; import kafka.server.KafkaConfig; -import kafka.server.LogDirFailureChannel; import kafka.server.MetadataCache; import kafka.server.QuotaFactory.QuotaManagers; import kafka.server.ReplicaManager; @@ -35,6 +34,7 @@ import kafka.utils.Scheduler; import kafka.zk.KafkaZkClient; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.log.internals.LogDirFailureChannel; import scala.compat.java8.OptionConverters; import java.util.Collections; diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index 7d2524091a6..10d5217069b 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -23,14 +23,14 @@ import java.text.NumberFormat import java.util.concurrent.atomic.AtomicLong import java.util.regex.Pattern import kafka.metrics.KafkaMetricsGroup -import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata} +import kafka.server.{FetchDataInfo, LogOffsetMetadata} import kafka.utils.{Logging, Scheduler} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException} import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.server.log.internals.{AbortedTxn, OffsetPosition} +import org.apache.kafka.server.log.internals.{AbortedTxn, LogDirFailureChannel, OffsetPosition} import scala.jdk.CollectionConverters._ import scala.collection.{Seq, immutable} diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 8bafc0aae60..1588f758d23 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -23,7 +23,7 @@ import java.util.Date import java.util.concurrent.TimeUnit import kafka.common._ import kafka.metrics.KafkaMetricsGroup -import kafka.server.{BrokerReconfigurable, KafkaConfig, LogDirFailureChannel} +import kafka.server.{BrokerReconfigurable, KafkaConfig} import kafka.utils._ import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.config.ConfigException @@ -32,7 +32,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{BufferSupplier, Time} -import org.apache.kafka.server.log.internals.{AbortedTxn, TransactionIndex} +import org.apache.kafka.server.log.internals.{AbortedTxn, LogDirFailureChannel, TransactionIndex} import scala.jdk.CollectionConverters._ import scala.collection.mutable.ListBuffer diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 48f4d49b6d6..14f55cc0518 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -23,13 +23,13 @@ import java.util.concurrent.locks.ReentrantLock import kafka.common.LogCleaningAbortedException import kafka.metrics.KafkaMetricsGroup -import kafka.server.LogDirFailureChannel import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils.CoreUtils._ import kafka.utils.{Logging, Pool} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.log.internals.LogDirFailureChannel import scala.collection.{Iterable, Seq, mutable} diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala index 2eb055ba167..e69283f71a6 100644 --- a/core/src/main/scala/kafka/log/LogLoader.scala +++ b/core/src/main/scala/kafka/log/LogLoader.scala @@ -21,14 +21,14 @@ import java.io.{File, IOException} import java.nio.file.{Files, NoSuchFileException} import kafka.common.LogSegmentOffsetOverflowException import kafka.log.UnifiedLog.{CleanedFileSuffix, DeletedFileSuffix, SwapFileSuffix, isIndexFile, isLogFile, offsetFromFile} -import kafka.server.{LogDirFailureChannel, LogOffsetMetadata} +import kafka.server.LogOffsetMetadata import kafka.server.epoch.LeaderEpochFileCache import kafka.utils.{CoreUtils, Logging, Scheduler} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.InvalidOffsetException import org.apache.kafka.common.utils.Time import org.apache.kafka.snapshot.Snapshots -import org.apache.kafka.server.log.internals.CorruptIndexException +import org.apache.kafka.server.log.internals.{CorruptIndexException, LogDirFailureChannel} import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import scala.collection.{Set, mutable} diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index dc599b544ae..c663ddc167b 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -42,6 +42,7 @@ import kafka.utils.Implicits._ import java.util.Properties import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.log.internals.LogDirFailureChannel import scala.annotation.nowarn diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 8c2e4f74626..718e779c970 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -28,7 +28,7 @@ import kafka.log.remote.RemoteLogManager import kafka.metrics.KafkaMetricsGroup import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.epoch.LeaderEpochFileCache -import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel, LogOffsetMetadata, OffsetAndEpoch, PartitionMetadataFile, RequestLocal} +import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogOffsetMetadata, OffsetAndEpoch, PartitionMetadataFile, RequestLocal} import kafka.utils._ import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic @@ -42,7 +42,7 @@ import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils} import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0 -import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LogValidator} +import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LogDirFailureChannel, LogValidator} import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.record.BrokerCompressionType diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index 6e1ff92ecd2..7056f5db0e6 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -18,7 +18,7 @@ package kafka.raft import kafka.log.{Defaults, LogConfig, LogOffsetSnapshot, ProducerStateManagerConfig, SnapshotGenerated, UnifiedLog} import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMinBytesProp} -import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, LogDirFailureChannel, RequestLocal} +import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, RequestLocal} import kafka.utils.{CoreUtils, Logging, Scheduler} import org.apache.kafka.common.config.AbstractConfig import org.apache.kafka.common.errors.InvalidConfigurationException @@ -26,7 +26,7 @@ import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords, Record import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch} -import org.apache.kafka.server.log.internals.AppendOrigin +import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel} import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots} import java.io.File diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 2b7561c31f3..cbe700358f6 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -43,6 +43,7 @@ import org.apache.kafka.raft import org.apache.kafka.raft.{RaftClient, RaftConfig} import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.ApiMessageAndVersion +import org.apache.kafka.server.log.internals.LogDirFailureChannel import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.snapshot.SnapshotWriter diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 335e48f25ee..11e81fb55ae 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -56,6 +56,7 @@ import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion} import org.apache.kafka.server.common.MetadataVersion._ import org.apache.kafka.server.metrics.KafkaYammerMetrics +import org.apache.kafka.server.log.internals.LogDirFailureChannel import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.zookeeper.client.ZKClientConfig diff --git a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala deleted file mode 100644 index 71ba9ac305f..00000000000 --- a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package kafka.server - -import java.io.IOException -import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap} - -import kafka.utils.Logging - -/* - * LogDirFailureChannel allows an external thread to block waiting for new offline log dirs. - * - * There should be a single instance of LogDirFailureChannel accessible by any class that does disk-IO operation. - * If IOException is encountered while accessing a log directory, the corresponding class can add the log directory name - * to the LogDirFailureChannel using maybeAddOfflineLogDir(). Each log directory will be added only once. After a log - * directory is added for the first time, a thread which is blocked waiting for new offline log directories - * can take the name of the new offline log directory out of the LogDirFailureChannel and handle the log failure properly. - * An offline log directory will stay offline until the broker is restarted. - * - */ -class LogDirFailureChannel(logDirNum: Int) extends Logging { - - private val offlineLogDirs = new ConcurrentHashMap[String, String] - private val offlineLogDirQueue = new ArrayBlockingQueue[String](logDirNum) - - def hasOfflineLogDir(logDir: String): Boolean = { - offlineLogDirs.containsKey(logDir) - } - - /* - * If the given logDir is not already offline, add it to the - * set of offline log dirs and enqueue it to the logDirFailureEvent queue - */ - def maybeAddOfflineLogDir(logDir: String, msg: => String, e: IOException): Unit = { - error(msg, e) - if (offlineLogDirs.putIfAbsent(logDir, logDir) == null) - offlineLogDirQueue.add(logDir) - } - - /* - * Get the next offline log dir from logDirFailureEvent queue. - * The method will wait if necessary until a new offline log directory becomes available - */ - def takeNextOfflineLogDir(): String = offlineLogDirQueue.take() - -} diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala index f88a4cc9075..d0d4552ade5 100644 --- a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala +++ b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala @@ -26,7 +26,7 @@ import kafka.utils.Logging import org.apache.kafka.common.Uuid import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException} import org.apache.kafka.common.utils.Utils - +import org.apache.kafka.server.log.internals.LogDirFailureChannel object PartitionMetadataFile { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 7862771e48f..c88cc688693 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -61,7 +61,7 @@ import org.apache.kafka.common.utils.Time import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.server.common.MetadataVersion._ -import org.apache.kafka.server.log.internals.{AppendOrigin, RecordValidationException} +import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel, RecordValidationException} import java.nio.file.{Files, Paths} import java.util diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFileWithFailureHandler.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFileWithFailureHandler.scala index 7021c6742ca..0e669249bdd 100644 --- a/core/src/main/scala/kafka/server/checkpoints/CheckpointFileWithFailureHandler.scala +++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFileWithFailureHandler.scala @@ -16,9 +16,9 @@ */ package kafka.server.checkpoints -import kafka.server.LogDirFailureChannel import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.server.common.CheckpointFile +import org.apache.kafka.server.log.internals.LogDirFailureChannel import CheckpointFile.EntryFormatter import java.io._ diff --git a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala index c772b82dece..93ef93d0bd3 100644 --- a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala +++ b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala @@ -16,9 +16,9 @@ */ package kafka.server.checkpoints -import kafka.server.LogDirFailureChannel import kafka.server.epoch.EpochEntry import org.apache.kafka.server.common.CheckpointFile.EntryFormatter +import org.apache.kafka.server.log.internals.LogDirFailureChannel import java.io._ import java.util.Optional diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala index f7b83eae6af..483a186c8dd 100644 --- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala +++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala @@ -16,10 +16,10 @@ */ package kafka.server.checkpoints -import kafka.server.LogDirFailureChannel import kafka.server.epoch.EpochEntry import org.apache.kafka.common.TopicPartition import org.apache.kafka.server.common.CheckpointFile.EntryFormatter +import org.apache.kafka.server.log.internals.LogDirFailureChannel import java.io._ import java.util.Optional diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala index 1ca8334342a..08734b2e97a 100644 --- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala +++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.requests.LeaderAndIsrRequest import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse -import org.apache.kafka.server.log.internals.AppendOrigin +import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel} import org.junit.jupiter.api.{BeforeEach, Test} import org.junit.jupiter.api.Assertions._ import org.mockito.Mockito.mock diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index de5a65a41ec..5492a15feba 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -21,11 +21,12 @@ import java.util.Properties import java.util.concurrent.atomic._ import kafka.log._ -import kafka.server.{BrokerTopicStats, FetchLogEnd, LogDirFailureChannel} +import kafka.server.{BrokerTopicStats, FetchLogEnd} import kafka.utils._ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException import org.apache.kafka.common.record.FileRecords import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.log.internals.LogDirFailureChannel /** * A stress test that instantiates a log and then runs continual appends against it from one thread and continual reads against it diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index 026afd0ddd2..ab681e690af 100755 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -25,10 +25,11 @@ import java.util.{Properties, Random} import joptsimple._ import kafka.log._ -import kafka.server.{BrokerTopicStats, LogDirFailureChannel} +import kafka.server.BrokerTopicStats import kafka.utils._ import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.server.log.internals.LogDirFailureChannel import scala.math._ diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index e5dab17e7ee..22f2e75cb58 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -35,7 +35,7 @@ import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.server.common.MetadataVersion -import org.apache.kafka.server.log.internals.AppendOrigin +import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.ArgumentMatchers diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index b42372cea68..2c52888bdbe 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -54,7 +54,7 @@ import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 -import org.apache.kafka.server.log.internals.AppendOrigin +import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala index 4df20c5a90d..d6f97f36e00 100644 --- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala @@ -19,12 +19,13 @@ package kafka.log import java.io.File import java.nio.file.Files import java.util.Properties -import kafka.server.{BrokerTopicStats, LogDirFailureChannel} +import kafka.server.BrokerTopicStats import kafka.utils.{MockTime, Pool, TestUtils} import kafka.utils.Implicits._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch} import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.log.internals.LogDirFailureChannel import org.junit.jupiter.api.{AfterEach, Tag} import scala.collection.Seq diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala index c27f5a9586b..b2daa24962d 100755 --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -17,10 +17,11 @@ package kafka.log -import kafka.server.{BrokerTopicStats, FetchLogEnd, LogDirFailureChannel} +import kafka.server.{BrokerTopicStats, FetchLogEnd} import kafka.utils._ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord} import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.log.internals.LogDirFailureChannel import org.apache.kafka.server.record.BrokerCompressionType import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api._ diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala index c816ac5e009..deee5685bc5 100644 --- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala @@ -22,12 +22,13 @@ import java.nio.channels.ClosedChannelException import java.nio.charset.StandardCharsets import java.util.regex.Pattern import java.util.Collections -import kafka.server.{FetchDataInfo, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata} +import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata} import kafka.utils.{MockTime, Scheduler, TestUtils} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record, SimpleRecord} import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.server.log.internals.LogDirFailureChannel import org.junit.jupiter.api.Assertions.{assertFalse, _} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 3415eb09902..a2176d290c2 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -20,12 +20,12 @@ package kafka.log import java.io.File import java.nio.file.Files import java.util.Properties -import kafka.server.{BrokerTopicStats, LogDirFailureChannel} +import kafka.server.{BrokerTopicStats} import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.log.internals.AppendOrigin +import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 6001d3094f8..fec7ecedc72 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -24,13 +24,13 @@ import java.nio.file.Paths import java.util.Properties import java.util.concurrent.{CountDownLatch, TimeUnit} import kafka.common._ -import kafka.server.{BrokerTopicStats, KafkaConfig, LogDirFailureChannel} +import kafka.server.{BrokerTopicStats, KafkaConfig} import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin} +import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, LogDirFailureChannel} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} diff --git a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala index 010c9f7960d..14a23f68e09 100644 --- a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala @@ -20,10 +20,11 @@ package kafka.log import java.util.Properties import java.util.concurrent.{Callable, Executors} -import kafka.server.{BrokerTopicStats, FetchHighWatermark, LogDirFailureChannel} +import kafka.server.{BrokerTopicStats, FetchHighWatermark} import kafka.utils.{KafkaScheduler, TestUtils} import org.apache.kafka.common.record.SimpleRecord import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.server.log.internals.LogDirFailureChannel import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index f877860a832..e59d43e0cf7 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import java.nio.file.{Files, NoSuchFileException, Paths} import java.util.Properties import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} -import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel} +import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig} import kafka.server.metadata.MockConfigRepository import kafka.utils.{CoreUtils, MockTime, Scheduler, TestUtils} import org.apache.kafka.common.TopicPartition @@ -31,7 +31,7 @@ import org.apache.kafka.common.record.{CompressionType, ControlRecordType, Defau import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0 -import org.apache.kafka.server.log.internals.{AbortedTxn, OffsetIndex} +import org.apache.kafka.server.log.internals.{AbortedTxn, LogDirFailureChannel, OffsetIndex} import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue} import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 2d70eba7c43..6e1cdec4110 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -21,7 +21,7 @@ import com.yammer.metrics.core.{Gauge, MetricName} import kafka.log.remote.RemoteIndexCache import kafka.server.checkpoints.OffsetCheckpointFile import kafka.server.metadata.{ConfigRepository, MockConfigRepository} -import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchLogEnd, LogDirFailureChannel} +import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchLogEnd} import kafka.utils._ import org.apache.directory.api.util.FileUtils import org.apache.kafka.common.errors.OffsetOutOfRangeException @@ -37,6 +37,7 @@ import java.io._ import java.nio.file.Files import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Future} import java.util.{Collections, Properties} +import org.apache.kafka.server.log.internals.LogDirFailureChannel import org.apache.kafka.server.metrics.KafkaYammerMetrics import scala.collection.{Map, mutable} diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala index 489bf97d9a0..b614e11c7e7 100644 --- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala @@ -22,11 +22,12 @@ import kafka.log.remote.RemoteLogManager import java.io.File import java.util.Properties import kafka.server.checkpoints.LeaderEpochCheckpointFile -import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, LogDirFailureChannel} +import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd} import kafka.utils.{Scheduler, TestUtils} import org.apache.kafka.common.Uuid import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord} import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.server.log.internals.LogDirFailureChannel import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse} import java.nio.file.Files diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index b03427b1aa7..283a8daec4f 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -30,6 +30,7 @@ import kafka.cluster.Partition import kafka.server.metadata.MockConfigRepository import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.SimpleRecord +import org.apache.kafka.server.log.internals.LogDirFailureChannel class HighwatermarkPersistenceTest { diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala index baed478a0bf..e4d929553e4 100644 --- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala @@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.utils.Time import org.apache.kafka.metadata.LeaderRecoveryState +import org.apache.kafka.server.log.internals.LogDirFailureChannel import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.Mockito.{atLeastOnce, mock, verify, when} diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala index df41bafb315..1b91f61724b 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala @@ -39,7 +39,7 @@ import org.apache.kafka.common.{IsolationLevel, TopicIdPartition, TopicPartition import org.apache.kafka.image.{MetadataDelta, MetadataImage} import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.metadata.PartitionRegistration -import org.apache.kafka.server.log.internals.AppendOrigin +import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} import org.mockito.Mockito diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index d0826f54c5d..a4d907c4acc 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -29,6 +29,7 @@ import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.metadata.LeaderRecoveryState +import org.apache.kafka.server.log.internals.LogDirFailureChannel import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong} diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 1958233184c..8c63350e396 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -58,7 +58,7 @@ import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, C import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 -import org.apache.kafka.server.log.internals.AppendOrigin +import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala index 4889c541125..9439f388d43 100644 --- a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala +++ b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala @@ -16,10 +16,10 @@ */ package kafka.server.checkpoints -import kafka.server.LogDirFailureChannel import kafka.utils.{Logging, TestUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.server.log.internals.LogDirFailureChannel import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.mockito.Mockito diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala index 4c6d74652cf..8983ad4497c 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala @@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} +import org.apache.kafka.server.log.internals.LogDirFailureChannel import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.Mockito.{mock, when} diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index 864c5436b33..628d1b66169 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -23,7 +23,7 @@ import java.util import java.util.Properties import kafka.log.{Defaults, LogConfig, LogTestUtils, ProducerStateManagerConfig, UnifiedLog} import kafka.raft.{KafkaMetadataLog, MetadataLogConfig} -import kafka.server.{BrokerTopicStats, FetchLogEnd, KafkaRaftServer, LogDirFailureChannel} +import kafka.server.{BrokerTopicStats, FetchLogEnd, KafkaRaftServer} import kafka.tools.DumpLogSegments.TimeIndexDumpErrors import kafka.utils.{MockTime, TestUtils} import org.apache.kafka.common.Uuid @@ -35,7 +35,7 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch} import org.apache.kafka.server.common.ApiMessageAndVersion -import org.apache.kafka.server.log.internals.AppendOrigin +import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel} import org.apache.kafka.snapshot.RecordsSnapshotWriter import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala index b1a1a66fa30..1f104a39ff1 100644 --- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala +++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala @@ -20,8 +20,9 @@ import java.util.Properties import java.util.concurrent.atomic._ import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import kafka.log.{LocalLog, LogConfig, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog} -import kafka.server.{BrokerTopicStats, LogDirFailureChannel} +import kafka.server.BrokerTopicStats import kafka.utils.TestUtils.retry +import org.apache.kafka.server.log.internals.LogDirFailureChannel import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 7110237ce4c..5d1abea0e66 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -70,6 +70,7 @@ import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.controller.QuorumController import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer} import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.log.internals.LogDirFailureChannel import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils} import org.apache.zookeeper.KeeperException.SessionExpiredException diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index 30b93b4b1c0..107b0e16d7f 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -32,7 +32,6 @@ import kafka.server.BrokerTopicStats; import kafka.server.FailedPartitions; import kafka.server.InitialFetchState; import kafka.server.KafkaConfig; -import kafka.server.LogDirFailureChannel; import kafka.server.MetadataCache; import kafka.server.OffsetAndEpoch; import kafka.server.OffsetTruncationState; @@ -73,6 +72,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.log.internals.LogDirFailureChannel; import org.mockito.Mockito; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java index 634927c9a21..7c5b342d9cb 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java @@ -26,7 +26,6 @@ import kafka.log.LogConfig; import kafka.log.LogManager; import kafka.server.AlterPartitionManager; import kafka.server.BrokerTopicStats; -import kafka.server.LogDirFailureChannel; import kafka.server.MetadataCache; import kafka.server.builders.LogManagerBuilder; import kafka.server.checkpoints.OffsetCheckpoints; @@ -41,6 +40,7 @@ import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.log.internals.LogDirFailureChannel; import org.mockito.Mockito; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java index 36f596a4980..75da931a658 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java @@ -27,7 +27,6 @@ import kafka.log.LogConfig; import kafka.log.LogManager; import kafka.server.AlterPartitionManager; import kafka.server.BrokerTopicStats; -import kafka.server.LogDirFailureChannel; import kafka.server.LogOffsetMetadata; import kafka.server.MetadataCache; import kafka.server.builders.LogManagerBuilder; @@ -39,6 +38,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.log.internals.LogDirFailureChannel; import org.mockito.Mockito; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java index d83f748bcad..bb07e63868f 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java @@ -24,7 +24,7 @@ import kafka.server.AlterPartitionManager; import kafka.server.BrokerFeatures; import kafka.server.BrokerTopicStats; import kafka.server.KafkaConfig; -import kafka.server.LogDirFailureChannel; +import org.apache.kafka.server.log.internals.LogDirFailureChannel; import kafka.server.MetadataCache; import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java index 3a8343772cb..68880830cae 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java @@ -25,7 +25,6 @@ import kafka.server.AlterPartitionManager; import kafka.server.BrokerFeatures; import kafka.server.BrokerTopicStats; import kafka.server.KafkaConfig; -import kafka.server.LogDirFailureChannel; import kafka.server.MetadataCache; import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; @@ -45,6 +44,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.log.internals.LogDirFailureChannel; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/LogDirFailureChannel.java b/storage/src/main/java/org/apache/kafka/server/log/internals/LogDirFailureChannel.java new file mode 100644 index 00000000000..2d0e8d1e6eb --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/LogDirFailureChannel.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * LogDirFailureChannel allows an external thread to block waiting for new offline log dirs. + * + * There should be a single instance of LogDirFailureChannel accessible by any class that does disk-IO operation. + * If IOException is encountered while accessing a log directory, the corresponding class can add the log directory name + * to the LogDirFailureChannel using maybeAddOfflineLogDir(). Each log directory will be added only once. After a log + * directory is added for the first time, a thread which is blocked waiting for new offline log directories + * can take the name of the new offline log directory out of the LogDirFailureChannel and handle the log failure properly. + * An offline log directory will stay offline until the broker is restarted. + */ +public class LogDirFailureChannel { + private static final Logger log = LoggerFactory.getLogger(LogDirFailureChannel.class); + private final ConcurrentMap offlineLogDirs; + private final BlockingQueue offlineLogDirQueue; + + public LogDirFailureChannel(int logDirNum) { + this.offlineLogDirs = new ConcurrentHashMap<>(); + this.offlineLogDirQueue = new ArrayBlockingQueue<>(logDirNum); + } + + public boolean hasOfflineLogDir(String logDir) { + return offlineLogDirs.containsKey(logDir); + } + + /** + * If the given logDir is not already offline, add it to the + * set of offline log dirs and enqueue it to the logDirFailureEvent queue. + * + * @param logDir The offline logDir. + * @param msg Error message. + * @param e Exception instance. + */ + public void maybeAddOfflineLogDir(String logDir, String msg, IOException e) { + log.error(msg, e); + if (offlineLogDirs.putIfAbsent(logDir, logDir) == null) { + offlineLogDirQueue.add(logDir); + } + } + + /** + * Get the next offline log dir from logDirFailureEvent queue. + * The method will wait if necessary until a new offline log directory becomes available + * + * @return The next offline log dir. + * @throws InterruptedException + */ + public String takeNextOfflineLogDir() throws InterruptedException { + return offlineLogDirQueue.take(); + } +}