KAFKA-14706: Move/rewrite ShutdownableThread to server-common module. (#13234)

Move/rewrite ShutdownableThread to server-common module.

Reviewers: Luke Chen <showuon@gmail.com>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Satish Duggana 2023-02-17 09:21:17 +05:30 committed by GitHub
parent 82d5720aae
commit 322ac86ba2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 222 additions and 164 deletions

View File

@ -1296,7 +1296,7 @@ project(':examples') {
dependencies {
implementation project(':clients')
implementation project(':core')
implementation project(':server-common')
}
javadoc {

View File

@ -71,6 +71,7 @@
<subpackage name="examples">
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.server.util" />
</subpackage>
<subpackage name="server">

View File

@ -16,15 +16,17 @@
*/
package kafka.common
import kafka.utils.Logging
import java.util.Map.Entry
import java.util.{ArrayDeque, ArrayList, Collection, Collections, HashMap, Iterator}
import kafka.utils.ShutdownableThread
import org.apache.kafka.clients.{ClientRequest, ClientResponse, KafkaClient, RequestCompletionHandler}
import org.apache.kafka.common.Node
import org.apache.kafka.common.errors.{AuthenticationException, DisconnectException}
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.util.ShutdownableThread
import scala.jdk.CollectionConverters._
@ -37,7 +39,9 @@ abstract class InterBrokerSendThread(
requestTimeoutMs: Int,
time: Time,
isInterruptible: Boolean = true
) extends ShutdownableThread(name, isInterruptible) {
) extends ShutdownableThread(name, isInterruptible) with Logging {
this.logIdent = logPrefix
private val unsentRequests = new UnsentRequests

View File

@ -20,10 +20,11 @@ import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicBoolean
import kafka.utils.{Logging, ShutdownableThread}
import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, StateChangeHandlers}
import kafka.zookeeper.{StateChangeHandler, ZNodeChildChangeHandler}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.util.ShutdownableThread
import scala.collection.Seq
import scala.util.{Failure, Try}
@ -142,7 +143,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
/* get the change number from a change notification znode */
private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong
class ChangeEventProcessThread(name: String) extends ShutdownableThread(name = name) {
class ChangeEventProcessThread(name: String) extends ShutdownableThread(name) {
override def doWork(): Unit = queue.take().process()
}

View File

@ -37,6 +37,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.util.ShutdownableThread
import java.net.SocketTimeoutException
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
@ -222,9 +223,10 @@ class RequestSendThread(val controllerId: Int,
val requestRateAndQueueTimeMetrics: Timer,
val stateChangeLogger: StateChangeLogger,
name: String)
extends ShutdownableThread(name = name) {
extends ShutdownableThread(name, true, s"[RequestSendThread controllerId=$controllerId] ")
with Logging {
logIdent = s"[RequestSendThread controllerId=$controllerId] "
logIdent = logPrefix
private val socketTimeoutMs = config.controllerSocketTimeoutMs

View File

@ -25,8 +25,9 @@ import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
import java.util.concurrent.locks.ReentrantLock
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.CoreUtils.inLock
import kafka.utils.ShutdownableThread
import kafka.utils.Logging
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.util.ShutdownableThread
import scala.collection._
@ -115,8 +116,12 @@ class ControllerEventManager(controllerId: Int,
def isEmpty: Boolean = queue.isEmpty
class ControllerEventThread(name: String) extends ShutdownableThread(name = name, isInterruptible = false) {
logIdent = s"[ControllerEventThread controllerId=$controllerId] "
class ControllerEventThread(name: String)
extends ShutdownableThread(
name, false, s"[ControllerEventThread controllerId=$controllerId] ")
with Logging {
logIdent = logPrefix
override def doWork(): Unit = {
val dequeued = pollFromEventQueue()

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogDirFailureChannel, OffsetMap, SkimpyOffsetMap, TransactionIndex}
import scala.jdk.CollectionConverters._
@ -298,10 +299,11 @@ class LogCleaner(initialConfig: CleanerConfig,
* choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments.
*/
private[log] class CleanerThread(threadId: Int)
extends ShutdownableThread(name = s"kafka-log-cleaner-thread-$threadId", isInterruptible = false) {
extends ShutdownableThread(s"kafka-log-cleaner-thread-$threadId", false) with Logging {
protected override def loggerName = classOf[LogCleaner].getName
this.logIdent = logPrefix
if (config.dedupeBufferSize / config.numThreads > Int.MaxValue)
warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...")

View File

@ -18,14 +18,14 @@ package kafka.log.remote
import kafka.log.UnifiedLog
import kafka.log.remote.RemoteIndexCache.DirName
import kafka.utils.{CoreUtils, Logging, ShutdownableThread}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageManager}
import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, TimeIndex, TransactionIndex}
import org.apache.kafka.server.util.ShutdownableThread
import java.io.{Closeable, File, InputStream}
import java.nio.file.{Files, Path}
import java.util

View File

@ -30,7 +30,6 @@ import kafka.server.{KafkaConfig, MetaProperties}
import kafka.utils.CoreUtils
import kafka.utils.FileLock
import kafka.utils.Logging
import kafka.utils.ShutdownableThread
import kafka.utils.timer.SystemTimer
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.KafkaException
@ -46,7 +45,7 @@ import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, LeaderAndEpoch, RaftClient, RaftConfig, RaftRequest, ReplicatedLog}
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.server.util.{KafkaScheduler, ShutdownableThread}
import org.apache.kafka.server.fault.FaultHandler
import scala.jdk.CollectionConverters._
@ -56,10 +55,10 @@ object KafkaRaftManager {
client: KafkaRaftClient[_],
threadNamePrefix: String,
fatalFaultHandler: FaultHandler
) extends ShutdownableThread(
name = threadNamePrefix + "-io-thread",
isInterruptible = false
) {
) extends ShutdownableThread(threadNamePrefix + "-io-thread", false) with Logging {
this.logIdent = logPrefix
override def doWork(): Unit = {
try {
client.poll()

View File

@ -17,11 +17,10 @@
package kafka.raft
import java.util.concurrent.CompletableFuture
import kafka.utils.ShutdownableThread
import kafka.utils.timer.{Timer, TimerTask}
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.raft.ExpirationService
import org.apache.kafka.server.util.ShutdownableThread
object TimingWheelExpirationService {
private val WorkTimeoutMs: Long = 200L
@ -50,8 +49,7 @@ class TimingWheelExpirationService(timer: Timer) extends ExpirationService {
future
}
private class ExpiredOperationReaper extends ShutdownableThread(
name = "raft-expiration-reaper", isInterruptible = false) {
private class ExpiredOperationReaper extends ShutdownableThread("raft-expiration-reaper", false) {
override def doWork(): Unit = {
timer.advanceClock(WorkTimeoutMs)

View File

@ -23,7 +23,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
import kafka.utils.CoreUtils.inLock
import kafka.utils.Implicits._
import kafka.utils.{DelayedItem, Pool, ShutdownableThread}
import kafka.utils.{DelayedItem, Logging, Pool}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.PartitionStates
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
@ -33,6 +33,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid}
import org.apache.kafka.server.util.ShutdownableThread
import java.nio.ByteBuffer
import java.util
@ -55,7 +56,9 @@ abstract class AbstractFetcherThread(name: String,
fetchBackOffMs: Int = 0,
isInterruptible: Boolean = true,
val brokerTopicStats: BrokerTopicStats) //BrokerTopicStats's lifecycle managed by ReplicaManager
extends ShutdownableThread(name, isInterruptible) {
extends ShutdownableThread(name, isInterruptible) with Logging {
this.logIdent = this.logPrefix
type FetchData = FetchResponseData.PartitionData
type EpochData = OffsetForLeaderEpochRequestData.OffsetForLeaderPartition

View File

@ -23,7 +23,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.network.RequestChannel
import kafka.network.RequestChannel._
import kafka.server.ClientQuotaManager._
import kafka.utils.{Logging, QuotaUtils, ShutdownableThread}
import kafka.utils.{Logging, QuotaUtils}
import org.apache.kafka.common.{Cluster, MetricName}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.Metrics
@ -31,6 +31,7 @@ import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType}
import org.apache.kafka.server.util.ShutdownableThread
import scala.jdk.CollectionConverters._

View File

@ -20,11 +20,11 @@ package kafka.server
import java.util.concurrent._
import java.util.concurrent.atomic._
import java.util.concurrent.locks.{Lock, ReentrantLock}
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.CoreUtils.inLock
import kafka.utils._
import kafka.utils.timer._
import org.apache.kafka.server.util.ShutdownableThread
import scala.collection._
import scala.collection.mutable.ListBuffer

View File

@ -20,10 +20,11 @@ package kafka.server
import kafka.server.metadata.ZkMetadataCache
import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
import kafka.utils.{Logging, ShutdownableThread}
import kafka.utils.Logging
import kafka.zk.{FeatureZNode, FeatureZNodeStatus, KafkaZkClient, ZkVersion}
import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler}
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.server.util.ShutdownableThread
import scala.concurrent.TimeoutException
@ -144,7 +145,10 @@ class FinalizedFeatureChangeListener(private val finalizedFeatureCache: ZkMetada
*
* @param name name of the thread
*/
private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) {
private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name) with Logging {
this.logIdent = logPrefix
override def doWork(): Unit = {
try {
queue.take.updateLatestOrThrow()

View File

@ -61,7 +61,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, RecordValidationException}
import java.nio.file.{Files, Paths}

View File

@ -29,7 +29,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.AbstractRequest.Builder
import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, FetchResponse, ListOffsetsRequest}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.server.util.CommandLineUtils
import org.apache.kafka.server.util.{CommandLineUtils, ShutdownableThread}
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{Node, TopicPartition, Uuid}
@ -393,7 +393,9 @@ private class ReplicaFetcher(name: String, sourceBroker: Node, topicPartitions:
topicIds: Map[String, Uuid], replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int,
fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean, consumerConfig: Properties,
fetcherId: Int)
extends ShutdownableThread(name) {
extends ShutdownableThread(name) with Logging {
this.logIdent = logPrefix
private val fetchEndpoint = new ReplicaFetcherBlockingSend(sourceBroker, new ConsumerConfig(consumerConfig), new Metrics(), Time.SYSTEM, fetcherId,
s"broker-${FetchRequest.DEBUGGING_CONSUMER_ID}-fetcher-$fetcherId")

View File

@ -24,7 +24,7 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.{KafkaRaftManager, RaftManager}
import kafka.security.CredentialProvider
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, MetaProperties, SimpleApiVersionManager}
import kafka.utils.{CoreUtils, Exit, Logging, ShutdownableThread}
import kafka.utils.{CoreUtils, Exit, Logging}
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
@ -39,7 +39,7 @@ import org.apache.kafka.raft.errors.NotLeaderException
import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient, RaftConfig}
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.fault.ProcessTerminatingFaultHandler
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils, ShutdownableThread}
import org.apache.kafka.snapshot.SnapshotReader
import scala.jdk.CollectionConverters._
@ -147,7 +147,7 @@ class TestRaftServer(
time: Time,
recordsPerSec: Int,
recordSize: Int
) extends ShutdownableThread(name = "raft-workload-generator")
) extends ShutdownableThread("raft-workload-generator")
with RaftClient.Listener[Array[Byte]] {
sealed trait RaftEvent

View File

@ -1,113 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.kafka.common.internals.FatalExitError
abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean = true)
extends Thread(name) with Logging {
this.setDaemon(false)
this.logIdent = "[" + name + "]: "
private val shutdownInitiated = new CountDownLatch(1)
private val shutdownComplete = new CountDownLatch(1)
@volatile private var isStarted: Boolean = false
def shutdown(): Unit = {
initiateShutdown()
awaitShutdown()
}
def isShutdownInitiated: Boolean = shutdownInitiated.getCount == 0
def isShutdownComplete: Boolean = shutdownComplete.getCount == 0
/**
* @return true if there has been an unexpected error and the thread shut down
*/
// mind that run() might set both when we're shutting down the broker
// but the return value of this function at that point wouldn't matter
def isThreadFailed: Boolean = isShutdownComplete && !isShutdownInitiated
def initiateShutdown(): Boolean = {
this.synchronized {
if (isRunning) {
info("Shutting down")
shutdownInitiated.countDown()
if (isInterruptible)
interrupt()
true
} else
false
}
}
/**
* After calling initiateShutdown(), use this API to wait until the shutdown is complete
*/
def awaitShutdown(): Unit = {
if (!isShutdownInitiated)
throw new IllegalStateException("initiateShutdown() was not called before awaitShutdown()")
else {
if (isStarted)
shutdownComplete.await()
info("Shutdown completed")
}
}
/**
* Causes the current thread to wait until the shutdown is initiated,
* or the specified waiting time elapses.
*
* @param timeout
* @param unit
*/
def pause(timeout: Long, unit: TimeUnit): Unit = {
if (shutdownInitiated.await(timeout, unit))
trace("shutdownInitiated latch count reached zero. Shutdown called.")
}
/**
* This method is repeatedly invoked until the thread shuts down or this method throws an exception
*/
def doWork(): Unit
override def run(): Unit = {
isStarted = true
info("Starting")
try {
while (isRunning)
doWork()
} catch {
case e: FatalExitError =>
shutdownInitiated.countDown()
shutdownComplete.countDown()
info("Stopped")
Exit.exit(e.statusCode())
case e: Throwable =>
if (isRunning)
error("Error due to", e)
} finally {
shutdownComplete.countDown()
}
info("Stopped")
}
def isRunning: Boolean = !isShutdownInitiated
}

View File

@ -24,7 +24,7 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.TopicPartition
import kafka.utils.{ShutdownableThread, TestUtils}
import kafka.utils.TestUtils
import kafka.server.{BaseRequestTest, KafkaConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, TestInfo}
@ -33,6 +33,7 @@ import scala.jdk.CollectionConverters._
import scala.collection.mutable.{ArrayBuffer, Buffer}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.server.util.ShutdownableThread
import scala.collection.mutable

View File

@ -17,7 +17,7 @@ import java.time
import java.util.concurrent._
import java.util.{Collection, Collections, Properties}
import kafka.server.KafkaConfig
import kafka.utils.{Logging, ShutdownableThread, TestUtils}
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.GroupMaxSizeReachedException
import org.apache.kafka.common.message.FindCoordinatorRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse}
import org.apache.kafka.server.util.ShutdownableThread
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Disabled, Test}

View File

@ -20,11 +20,12 @@ package kafka.api
import java.util.Properties
import kafka.server.KafkaConfig
import kafka.utils.{ShutdownableThread, TestUtils}
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.server.util.ShutdownableThread
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test

View File

@ -61,6 +61,7 @@ import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
@ -1785,7 +1786,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
private class ProducerThread(clientId: String, retries: Int)
extends ShutdownableThread(clientId, isInterruptible = false) {
extends ShutdownableThread(clientId, false) {
private val producer = ProducerBuilder().maxRetries(retries).clientId(clientId).build()
val lastSent = new ConcurrentHashMap[Int, Int]()
@ -1806,7 +1807,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
}
private class ConsumerThread(producerThread: ProducerThread) extends ShutdownableThread("test-consumer", isInterruptible = false) {
private class ConsumerThread(producerThread: ProducerThread) extends ShutdownableThread("test-consumer", false) {
private val consumer = ConsumerBuilder("group1").enableAutoCommit(true).build()
val lastReceived = new ConcurrentHashMap[Int, Int]()
val missingRecords = new ConcurrentLinkedQueue[Int]()

View File

@ -21,12 +21,10 @@ import java.lang.management.ManagementFactory
import java.lang.management.OperatingSystemMXBean
import java.util.Random
import java.util.concurrent._
import joptsimple._
import kafka.server.{DelayedOperation, DelayedOperationPurgatory}
import kafka.utils._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.util.CommandLineUtils
import org.apache.kafka.server.util.{CommandLineUtils, ShutdownableThread}
import scala.math._
import scala.jdk.CollectionConverters._
@ -257,7 +255,7 @@ object TestPurgatoryPerformance {
private class CompletionQueue {
private[this] val delayQueue = new DelayQueue[Scheduled]()
private[this] val thread = new ShutdownableThread(name = "completion thread", isInterruptible = false) {
private[this] val thread = new ShutdownableThread("completion thread", false) {
override def doWork(): Unit = {
val scheduled = delayQueue.poll(100, TimeUnit.MILLISECONDS)
if (scheduled != null) {

View File

@ -24,7 +24,7 @@ import kafka.api.LeaderAndIsr
import kafka.server.metadata.KRaftMetadataCache
import kafka.server.metadata.MockConfigRepository
import kafka.utils.TestUtils.waitUntilTrue
import kafka.utils.{MockTime, ShutdownableThread, TestUtils}
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.metadata.RegisterBrokerRecord
import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, TopicRecord}
import org.apache.kafka.common.metrics.Metrics
@ -38,6 +38,7 @@ import org.apache.kafka.common.{IsolationLevel, TopicIdPartition, TopicPartition
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
@ -134,7 +135,7 @@ class ReplicaManagerConcurrencyTest {
private class Clock(
time: MockTime
) extends ShutdownableThread(name = "clock", isInterruptible = false) {
) extends ShutdownableThread("clock", false) {
override def doWork(): Unit = {
time.sleep(1)
}
@ -191,7 +192,7 @@ class ReplicaManagerConcurrencyTest {
replicaId: Int,
topicIdPartition: TopicIdPartition,
replicaManager: ReplicaManager
) extends ShutdownableThread(name = clientId, isInterruptible = false) {
) extends ShutdownableThread(clientId, false) {
private val random = new Random()
private val clientMetadata = new DefaultClientMetadata(
@ -255,7 +256,7 @@ class ReplicaManagerConcurrencyTest {
clientId: String,
topicPartition: TopicPartition,
replicaManager: ReplicaManager
) extends ShutdownableThread(name = clientId, isInterruptible = false) {
) extends ShutdownableThread(clientId, false) {
private val random = new Random()
private var sequence = 0
@ -333,7 +334,7 @@ class ReplicaManagerConcurrencyTest {
channel: ControllerChannel,
replicaManager: ReplicaManager,
metadataCache: KRaftMetadataCache
) extends ShutdownableThread(name = "controller", isInterruptible = false) {
) extends ShutdownableThread("controller", false) {
private var latestImage = MetadataImage.EMPTY
def initialize(): Unit = {

View File

@ -17,8 +17,8 @@
package kafka.utils
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.server.util.ShutdownableThread
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.util;
import org.apache.kafka.common.internals.FatalExitError;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public abstract class ShutdownableThread extends Thread {
public final String logPrefix;
private final Logger log;
private final boolean isInterruptible;
private final CountDownLatch shutdownInitiated = new CountDownLatch(1);
private final CountDownLatch shutdownComplete = new CountDownLatch(1);
private volatile boolean isStarted = false;
public ShutdownableThread(String name) {
this(name, true);
}
public ShutdownableThread(String name, boolean isInterruptible) {
this(name, isInterruptible, "[" + name + "]: ");
}
public ShutdownableThread(String name, boolean isInterruptible, String logPrefix) {
super(name);
this.isInterruptible = isInterruptible;
this.logPrefix = logPrefix;
log = new LogContext(logPrefix).logger(this.getClass());
this.setDaemon(false);
}
public void shutdown() throws InterruptedException {
initiateShutdown();
awaitShutdown();
}
public boolean isShutdownInitiated() {
return shutdownInitiated.getCount() == 0;
}
public boolean isShutdownComplete() {
return shutdownComplete.getCount() == 0;
}
/**
* @return true if there has been an unexpected error and the thread shut down
*/
// mind that run() might set both when we're shutting down the broker
// but the return value of this function at that point wouldn't matter
public boolean isThreadFailed() {
return isShutdownComplete() && !isShutdownInitiated();
}
public boolean initiateShutdown() {
synchronized (this) {
if (isRunning()) {
log.info("Shutting down");
shutdownInitiated.countDown();
if (isInterruptible)
interrupt();
return true;
} else
return false;
}
}
/**
* After calling initiateShutdown(), use this API to wait until the shutdown is complete.
*/
public void awaitShutdown() throws InterruptedException {
if (!isShutdownInitiated())
throw new IllegalStateException("initiateShutdown() was not called before awaitShutdown()");
else {
if (isStarted)
shutdownComplete.await();
log.info("Shutdown completed");
}
}
/**
* Causes the current thread to wait until the shutdown is initiated,
* or the specified waiting time elapses.
*
* @param timeout wait time in units.
* @param unit TimeUnit value for the wait time.
*/
public void pause(long timeout, TimeUnit unit) throws InterruptedException {
if (shutdownInitiated.await(timeout, unit))
log.trace("shutdownInitiated latch count reached zero. Shutdown called.");
}
/**
* This method is repeatedly invoked until the thread shuts down or this method throws an exception
*/
public abstract void doWork();
public void run() {
isStarted = true;
log.info("Starting");
try {
while (isRunning())
doWork();
} catch (FatalExitError e) {
shutdownInitiated.countDown();
shutdownComplete.countDown();
log.info("Stopped");
Exit.exit(e.statusCode());
} catch (Throwable e) {
if (isRunning())
log.error("Error due to", e);
} finally {
shutdownComplete.countDown();
}
log.info("Stopped");
}
public boolean isRunning() {
return !isShutdownInitiated();
}
}