mirror of https://github.com/apache/kafka.git
Avoid gettimeofdaycalls in steady state fetch states
This commit is contained in:
parent
d2a13d8270
commit
854588804d
|
@ -31,7 +31,7 @@ import kafka.utils.CoreUtils.inLock
|
|||
import org.apache.kafka.common.protocol.Errors
|
||||
import AbstractFetcherThread._
|
||||
|
||||
import scala.collection.{Map, Seq, Set, mutable}
|
||||
import scala.collection.{mutable, Map, Seq, Set}
|
||||
import scala.collection.JavaConverters._
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
@ -615,7 +615,7 @@ abstract class AbstractFetcherThread(name: String,
|
|||
Option(partitionStates.stateValue(partition)).foreach { currentFetchState =>
|
||||
if (!currentFetchState.isDelayed) {
|
||||
partitionStates.updateAndMoveToEnd(partition, PartitionFetchState(currentFetchState.fetchOffset,
|
||||
currentFetchState.currentLeaderEpoch, new DelayedItem(delay), currentFetchState.state))
|
||||
currentFetchState.currentLeaderEpoch, Some(new DelayedItem(delay)), currentFetchState.state))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -757,7 +757,7 @@ case object Fetching extends ReplicaState
|
|||
|
||||
object PartitionFetchState {
|
||||
def apply(offset: Long, currentLeaderEpoch: Int, state: ReplicaState): PartitionFetchState = {
|
||||
PartitionFetchState(offset, currentLeaderEpoch, new DelayedItem(0), state)
|
||||
PartitionFetchState(offset, currentLeaderEpoch, None, state)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -771,20 +771,20 @@ object PartitionFetchState {
|
|||
*/
|
||||
case class PartitionFetchState(fetchOffset: Long,
|
||||
currentLeaderEpoch: Int,
|
||||
delay: DelayedItem,
|
||||
delay: Option[DelayedItem],
|
||||
state: ReplicaState) {
|
||||
|
||||
def isReadyForFetch: Boolean = state == Fetching && !isDelayed
|
||||
|
||||
def isTruncating: Boolean = state == Truncating && !isDelayed
|
||||
|
||||
def isDelayed: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) > 0
|
||||
def isDelayed: Boolean = delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0)
|
||||
|
||||
override def toString: String = {
|
||||
s"FetchState(fetchOffset=$fetchOffset" +
|
||||
s", currentLeaderEpoch=$currentLeaderEpoch" +
|
||||
s", state=$state" +
|
||||
s", delay=${delay.delayMs}ms" +
|
||||
s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" +
|
||||
s")"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -587,7 +587,7 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
// one partition is ready and one is delayed
|
||||
val ResultWithPartitions(fetchRequest2Opt, partitionsWithError2) = thread.buildFetch(Map(
|
||||
t1p0 -> PartitionFetchState(140, leaderEpoch, state = Fetching),
|
||||
t1p1 -> PartitionFetchState(160, leaderEpoch, delay = new DelayedItem(5000), state = Fetching)))
|
||||
t1p1 -> PartitionFetchState(160, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching)))
|
||||
|
||||
assertTrue(fetchRequest2Opt.isDefined)
|
||||
val fetchRequest2 = fetchRequest2Opt.get
|
||||
|
@ -600,8 +600,8 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
|
||||
// both partitions are delayed
|
||||
val ResultWithPartitions(fetchRequest3Opt, partitionsWithError3) = thread.buildFetch(Map(
|
||||
t1p0 -> PartitionFetchState(140, leaderEpoch, delay = new DelayedItem(5000), state = Fetching),
|
||||
t1p1 -> PartitionFetchState(160, leaderEpoch, delay = new DelayedItem(5000), state = Fetching)))
|
||||
t1p0 -> PartitionFetchState(140, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching),
|
||||
t1p1 -> PartitionFetchState(160, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching)))
|
||||
assertTrue("Expected no fetch requests since all partitions are delayed", fetchRequest3Opt.isEmpty)
|
||||
assertFalse(partitionsWithError3.nonEmpty)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue