KAFKA-14549: Move LogDirFailureChannel to storage module (#13041)

For broader context on this change, please check:

* KAFKA-14470: Move log layer to storage module

Reviewers: dengziming <dengziming1993@gmail.com>, Mickael Maison <mickael.maison@gmail.com>, Satish Duggana <satishd@apache.org>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Federico Valeri 2022-12-23 16:13:43 +01:00 committed by GitHub
parent 5f265710f1
commit 06af8fc630
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 140 additions and 108 deletions

View File

@ -22,11 +22,11 @@ import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.log.ProducerStateManagerConfig;
import kafka.server.BrokerTopicStats;
import kafka.server.LogDirFailureChannel;
import kafka.server.metadata.ConfigRepository;
import kafka.utils.Scheduler;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.log.internals.LogDirFailureChannel;
import scala.collection.JavaConverters;
import java.io.File;

View File

@ -27,7 +27,6 @@ import kafka.server.DelayedFetch;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.DelayedProduce;
import kafka.server.KafkaConfig;
import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory.QuotaManagers;
import kafka.server.ReplicaManager;
@ -35,6 +34,7 @@ import kafka.utils.Scheduler;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.internals.LogDirFailureChannel;
import scala.compat.java8.OptionConverters;
import java.util.Collections;

View File

@ -23,14 +23,14 @@ import java.text.NumberFormat
import java.util.concurrent.atomic.AtomicLong
import java.util.regex.Pattern
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
import kafka.server.{FetchDataInfo, LogOffsetMetadata}
import kafka.utils.{Logging, Scheduler}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException}
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.log.internals.{AbortedTxn, OffsetPosition}
import org.apache.kafka.server.log.internals.{AbortedTxn, LogDirFailureChannel, OffsetPosition}
import scala.jdk.CollectionConverters._
import scala.collection.{Seq, immutable}

View File

@ -23,7 +23,7 @@ import java.util.Date
import java.util.concurrent.TimeUnit
import kafka.common._
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{BrokerReconfigurable, KafkaConfig, LogDirFailureChannel}
import kafka.server.{BrokerReconfigurable, KafkaConfig}
import kafka.utils._
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.config.ConfigException
@ -32,7 +32,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.server.log.internals.{AbortedTxn, TransactionIndex}
import org.apache.kafka.server.log.internals.{AbortedTxn, LogDirFailureChannel, TransactionIndex}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer

View File

@ -23,13 +23,13 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.common.LogCleaningAbortedException
import kafka.metrics.KafkaMetricsGroup
import kafka.server.LogDirFailureChannel
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.CoreUtils._
import kafka.utils.{Logging, Pool}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import scala.collection.{Iterable, Seq, mutable}

View File

@ -21,14 +21,14 @@ import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
import kafka.common.LogSegmentOffsetOverflowException
import kafka.log.UnifiedLog.{CleanedFileSuffix, DeletedFileSuffix, SwapFileSuffix, isIndexFile, isLogFile, offsetFromFile}
import kafka.server.{LogDirFailureChannel, LogOffsetMetadata}
import kafka.server.LogOffsetMetadata
import kafka.server.epoch.LeaderEpochFileCache
import kafka.utils.{CoreUtils, Logging, Scheduler}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidOffsetException
import org.apache.kafka.common.utils.Time
import org.apache.kafka.snapshot.Snapshots
import org.apache.kafka.server.log.internals.CorruptIndexException
import org.apache.kafka.server.log.internals.{CorruptIndexException, LogDirFailureChannel}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import scala.collection.{Set, mutable}

View File

@ -42,6 +42,7 @@ import kafka.utils.Implicits._
import java.util.Properties
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import scala.annotation.nowarn

View File

@ -28,7 +28,7 @@ import kafka.log.remote.RemoteLogManager
import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel, LogOffsetMetadata, OffsetAndEpoch, PartitionMetadataFile, RequestLocal}
import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogOffsetMetadata, OffsetAndEpoch, PartitionMetadataFile, RequestLocal}
import kafka.utils._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
@ -42,7 +42,7 @@ import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LogValidator}
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LogDirFailureChannel, LogValidator}
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.record.BrokerCompressionType

View File

@ -18,7 +18,7 @@ package kafka.raft
import kafka.log.{Defaults, LogConfig, LogOffsetSnapshot, ProducerStateManagerConfig, SnapshotGenerated, UnifiedLog}
import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMinBytesProp}
import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, LogDirFailureChannel, RequestLocal}
import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, RequestLocal}
import kafka.utils.{CoreUtils, Logging, Scheduler}
import org.apache.kafka.common.config.AbstractConfig
import org.apache.kafka.common.errors.InvalidConfigurationException
@ -26,7 +26,7 @@ import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords, Record
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
import org.apache.kafka.server.log.internals.AppendOrigin
import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel}
import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
import java.io.File

View File

@ -43,6 +43,7 @@ import org.apache.kafka.raft
import org.apache.kafka.raft.{RaftClient, RaftConfig}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.snapshot.SnapshotWriter

View File

@ -56,6 +56,7 @@ import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.zookeeper.client.ZKClientConfig

View File

@ -1,62 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.io.IOException
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
import kafka.utils.Logging
/*
* LogDirFailureChannel allows an external thread to block waiting for new offline log dirs.
*
* There should be a single instance of LogDirFailureChannel accessible by any class that does disk-IO operation.
* If IOException is encountered while accessing a log directory, the corresponding class can add the log directory name
* to the LogDirFailureChannel using maybeAddOfflineLogDir(). Each log directory will be added only once. After a log
* directory is added for the first time, a thread which is blocked waiting for new offline log directories
* can take the name of the new offline log directory out of the LogDirFailureChannel and handle the log failure properly.
* An offline log directory will stay offline until the broker is restarted.
*
*/
class LogDirFailureChannel(logDirNum: Int) extends Logging {
private val offlineLogDirs = new ConcurrentHashMap[String, String]
private val offlineLogDirQueue = new ArrayBlockingQueue[String](logDirNum)
def hasOfflineLogDir(logDir: String): Boolean = {
offlineLogDirs.containsKey(logDir)
}
/*
* If the given logDir is not already offline, add it to the
* set of offline log dirs and enqueue it to the logDirFailureEvent queue
*/
def maybeAddOfflineLogDir(logDir: String, msg: => String, e: IOException): Unit = {
error(msg, e)
if (offlineLogDirs.putIfAbsent(logDir, logDir) == null)
offlineLogDirQueue.add(logDir)
}
/*
* Get the next offline log dir from logDirFailureEvent queue.
* The method will wait if necessary until a new offline log directory becomes available
*/
def takeNextOfflineLogDir(): String = offlineLogDirQueue.take()
}

View File

@ -26,7 +26,7 @@ import kafka.utils.Logging
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.LogDirFailureChannel
object PartitionMetadataFile {

View File

@ -61,7 +61,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.log.internals.{AppendOrigin, RecordValidationException}
import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel, RecordValidationException}
import java.nio.file.{Files, Paths}
import java.util

View File

@ -16,9 +16,9 @@
*/
package kafka.server.checkpoints
import kafka.server.LogDirFailureChannel
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.server.common.CheckpointFile
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import CheckpointFile.EntryFormatter
import java.io._

View File

@ -16,9 +16,9 @@
*/
package kafka.server.checkpoints
import kafka.server.LogDirFailureChannel
import kafka.server.epoch.EpochEntry
import org.apache.kafka.server.common.CheckpointFile.EntryFormatter
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import java.io._
import java.util.Optional

View File

@ -16,10 +16,10 @@
*/
package kafka.server.checkpoints
import kafka.server.LogDirFailureChannel
import kafka.server.epoch.EpochEntry
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.server.common.CheckpointFile.EntryFormatter
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import java.io._
import java.util.Optional

View File

@ -27,7 +27,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.server.log.internals.AppendOrigin
import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.api.Assertions._
import org.mockito.Mockito.mock

View File

@ -21,11 +21,12 @@ import java.util.Properties
import java.util.concurrent.atomic._
import kafka.log._
import kafka.server.{BrokerTopicStats, FetchLogEnd, LogDirFailureChannel}
import kafka.server.{BrokerTopicStats, FetchLogEnd}
import kafka.utils._
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException
import org.apache.kafka.common.record.FileRecords
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.LogDirFailureChannel
/**
* A stress test that instantiates a log and then runs continual appends against it from one thread and continual reads against it

View File

@ -25,10 +25,11 @@ import java.util.{Properties, Random}
import joptsimple._
import kafka.log._
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.server.BrokerTopicStats
import kafka.utils._
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import scala.math._

View File

@ -35,7 +35,7 @@ import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.log.internals.AppendOrigin
import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers

View File

@ -54,7 +54,7 @@ import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.log.internals.AppendOrigin
import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

View File

@ -19,12 +19,13 @@ package kafka.log
import java.io.File
import java.nio.file.Files
import java.util.Properties
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.server.BrokerTopicStats
import kafka.utils.{MockTime, Pool, TestUtils}
import kafka.utils.Implicits._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import org.junit.jupiter.api.{AfterEach, Tag}
import scala.collection.Seq

View File

@ -17,10 +17,11 @@
package kafka.log
import kafka.server.{BrokerTopicStats, FetchLogEnd, LogDirFailureChannel}
import kafka.server.{BrokerTopicStats, FetchLogEnd}
import kafka.utils._
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import org.apache.kafka.server.record.BrokerCompressionType
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api._

View File

@ -22,12 +22,13 @@ import java.nio.channels.ClosedChannelException
import java.nio.charset.StandardCharsets
import java.util.regex.Pattern
import java.util.Collections
import kafka.server.{FetchDataInfo, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata}
import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata}
import kafka.utils.{MockTime, Scheduler, TestUtils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record, SimpleRecord}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import org.junit.jupiter.api.Assertions.{assertFalse, _}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

View File

@ -20,12 +20,12 @@ package kafka.log
import java.io.File
import java.nio.file.Files
import java.util.Properties
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.server.{BrokerTopicStats}
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.AppendOrigin
import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}

View File

@ -24,13 +24,13 @@ import java.nio.file.Paths
import java.util.Properties
import java.util.concurrent.{CountDownLatch, TimeUnit}
import kafka.common._
import kafka.server.{BrokerTopicStats, KafkaConfig, LogDirFailureChannel}
import kafka.server.{BrokerTopicStats, KafkaConfig}
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin}
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, LogDirFailureChannel}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}

View File

@ -20,10 +20,11 @@ package kafka.log
import java.util.Properties
import java.util.concurrent.{Callable, Executors}
import kafka.server.{BrokerTopicStats, FetchHighWatermark, LogDirFailureChannel}
import kafka.server.{BrokerTopicStats, FetchHighWatermark}
import kafka.utils.{KafkaScheduler, TestUtils}
import org.apache.kafka.common.record.SimpleRecord
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

View File

@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import java.nio.file.{Files, NoSuchFileException, Paths}
import java.util.Properties
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel}
import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig}
import kafka.server.metadata.MockConfigRepository
import kafka.utils.{CoreUtils, MockTime, Scheduler, TestUtils}
import org.apache.kafka.common.TopicPartition
@ -31,7 +31,7 @@ import org.apache.kafka.common.record.{CompressionType, ControlRecordType, Defau
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
import org.apache.kafka.server.log.internals.{AbortedTxn, OffsetIndex}
import org.apache.kafka.server.log.internals.{AbortedTxn, LogDirFailureChannel, OffsetIndex}
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

View File

@ -21,7 +21,7 @@ import com.yammer.metrics.core.{Gauge, MetricName}
import kafka.log.remote.RemoteIndexCache
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchLogEnd, LogDirFailureChannel}
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchLogEnd}
import kafka.utils._
import org.apache.directory.api.util.FileUtils
import org.apache.kafka.common.errors.OffsetOutOfRangeException
@ -37,6 +37,7 @@ import java.io._
import java.nio.file.Files
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Future}
import java.util.{Collections, Properties}
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import scala.collection.{Map, mutable}

View File

@ -22,11 +22,12 @@ import kafka.log.remote.RemoteLogManager
import java.io.File
import java.util.Properties
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, LogDirFailureChannel}
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd}
import kafka.utils.{Scheduler, TestUtils}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
import java.nio.file.Files

View File

@ -30,6 +30,7 @@ import kafka.cluster.Partition
import kafka.server.metadata.MockConfigRepository
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.SimpleRecord
import org.apache.kafka.server.log.internals.LogDirFailureChannel
class HighwatermarkPersistenceTest {

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.Mockito.{atLeastOnce, mock, verify, when}

View File

@ -39,7 +39,7 @@ import org.apache.kafka.common.{IsolationLevel, TopicIdPartition, TopicPartition
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.server.log.internals.AppendOrigin
import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.Mockito

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong}

View File

@ -58,7 +58,7 @@ import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, C
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.log.internals.AppendOrigin
import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest

View File

@ -16,10 +16,10 @@
*/
package kafka.server.checkpoints
import kafka.server.LogDirFailureChannel
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.Mockito

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.Mockito.{mock, when}

View File

@ -23,7 +23,7 @@ import java.util
import java.util.Properties
import kafka.log.{Defaults, LogConfig, LogTestUtils, ProducerStateManagerConfig, UnifiedLog}
import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
import kafka.server.{BrokerTopicStats, FetchLogEnd, KafkaRaftServer, LogDirFailureChannel}
import kafka.server.{BrokerTopicStats, FetchLogEnd, KafkaRaftServer}
import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.Uuid
@ -35,7 +35,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.log.internals.AppendOrigin
import org.apache.kafka.server.log.internals.{AppendOrigin, LogDirFailureChannel}
import org.apache.kafka.snapshot.RecordsSnapshotWriter
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

View File

@ -20,8 +20,9 @@ import java.util.Properties
import java.util.concurrent.atomic._
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import kafka.log.{LocalLog, LogConfig, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog}
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.server.BrokerTopicStats
import kafka.utils.TestUtils.retry
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}

View File

@ -70,6 +70,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.controller.QuorumController
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.apache.zookeeper.KeeperException.SessionExpiredException

View File

@ -32,7 +32,6 @@ import kafka.server.BrokerTopicStats;
import kafka.server.FailedPartitions;
import kafka.server.InitialFetchState;
import kafka.server.KafkaConfig;
import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache;
import kafka.server.OffsetAndEpoch;
import kafka.server.OffsetTruncationState;
@ -73,6 +72,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.log.internals.LogDirFailureChannel;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;

View File

@ -26,7 +26,6 @@ import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.server.AlterPartitionManager;
import kafka.server.BrokerTopicStats;
import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.checkpoints.OffsetCheckpoints;
@ -41,6 +40,7 @@ import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.log.internals.LogDirFailureChannel;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;

View File

@ -27,7 +27,6 @@ import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.server.AlterPartitionManager;
import kafka.server.BrokerTopicStats;
import kafka.server.LogDirFailureChannel;
import kafka.server.LogOffsetMetadata;
import kafka.server.MetadataCache;
import kafka.server.builders.LogManagerBuilder;
@ -39,6 +38,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.log.internals.LogDirFailureChannel;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;

View File

@ -24,7 +24,7 @@ import kafka.server.AlterPartitionManager;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import kafka.server.LogDirFailureChannel;
import org.apache.kafka.server.log.internals.LogDirFailureChannel;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;

View File

@ -25,7 +25,6 @@ import kafka.server.AlterPartitionManager;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
@ -45,6 +44,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.log.internals.LogDirFailureChannel;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
* LogDirFailureChannel allows an external thread to block waiting for new offline log dirs.
*
* There should be a single instance of LogDirFailureChannel accessible by any class that does disk-IO operation.
* If IOException is encountered while accessing a log directory, the corresponding class can add the log directory name
* to the LogDirFailureChannel using maybeAddOfflineLogDir(). Each log directory will be added only once. After a log
* directory is added for the first time, a thread which is blocked waiting for new offline log directories
* can take the name of the new offline log directory out of the LogDirFailureChannel and handle the log failure properly.
* An offline log directory will stay offline until the broker is restarted.
*/
public class LogDirFailureChannel {
private static final Logger log = LoggerFactory.getLogger(LogDirFailureChannel.class);
private final ConcurrentMap<String, String> offlineLogDirs;
private final BlockingQueue<String> offlineLogDirQueue;
public LogDirFailureChannel(int logDirNum) {
this.offlineLogDirs = new ConcurrentHashMap<>();
this.offlineLogDirQueue = new ArrayBlockingQueue<>(logDirNum);
}
public boolean hasOfflineLogDir(String logDir) {
return offlineLogDirs.containsKey(logDir);
}
/**
* If the given logDir is not already offline, add it to the
* set of offline log dirs and enqueue it to the logDirFailureEvent queue.
*
* @param logDir The offline logDir.
* @param msg Error message.
* @param e Exception instance.
*/
public void maybeAddOfflineLogDir(String logDir, String msg, IOException e) {
log.error(msg, e);
if (offlineLogDirs.putIfAbsent(logDir, logDir) == null) {
offlineLogDirQueue.add(logDir);
}
}
/**
* Get the next offline log dir from logDirFailureEvent queue.
* The method will wait if necessary until a new offline log directory becomes available
*
* @return The next offline log dir.
* @throws InterruptedException
*/
public String takeNextOfflineLogDir() throws InterruptedException {
return offlineLogDirQueue.take();
}
}