mirror of https://github.com/apache/kafka.git
Avoid gettimeofdaycalls in steady state fetch states
This commit is contained in:
parent
d2a13d8270
commit
854588804d
|
@ -31,7 +31,7 @@ import kafka.utils.CoreUtils.inLock
|
||||||
import org.apache.kafka.common.protocol.Errors
|
import org.apache.kafka.common.protocol.Errors
|
||||||
import AbstractFetcherThread._
|
import AbstractFetcherThread._
|
||||||
|
|
||||||
import scala.collection.{Map, Seq, Set, mutable}
|
import scala.collection.{mutable, Map, Seq, Set}
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
|
@ -615,7 +615,7 @@ abstract class AbstractFetcherThread(name: String,
|
||||||
Option(partitionStates.stateValue(partition)).foreach { currentFetchState =>
|
Option(partitionStates.stateValue(partition)).foreach { currentFetchState =>
|
||||||
if (!currentFetchState.isDelayed) {
|
if (!currentFetchState.isDelayed) {
|
||||||
partitionStates.updateAndMoveToEnd(partition, PartitionFetchState(currentFetchState.fetchOffset,
|
partitionStates.updateAndMoveToEnd(partition, PartitionFetchState(currentFetchState.fetchOffset,
|
||||||
currentFetchState.currentLeaderEpoch, new DelayedItem(delay), currentFetchState.state))
|
currentFetchState.currentLeaderEpoch, Some(new DelayedItem(delay)), currentFetchState.state))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -757,7 +757,7 @@ case object Fetching extends ReplicaState
|
||||||
|
|
||||||
object PartitionFetchState {
|
object PartitionFetchState {
|
||||||
def apply(offset: Long, currentLeaderEpoch: Int, state: ReplicaState): PartitionFetchState = {
|
def apply(offset: Long, currentLeaderEpoch: Int, state: ReplicaState): PartitionFetchState = {
|
||||||
PartitionFetchState(offset, currentLeaderEpoch, new DelayedItem(0), state)
|
PartitionFetchState(offset, currentLeaderEpoch, None, state)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -771,20 +771,20 @@ object PartitionFetchState {
|
||||||
*/
|
*/
|
||||||
case class PartitionFetchState(fetchOffset: Long,
|
case class PartitionFetchState(fetchOffset: Long,
|
||||||
currentLeaderEpoch: Int,
|
currentLeaderEpoch: Int,
|
||||||
delay: DelayedItem,
|
delay: Option[DelayedItem],
|
||||||
state: ReplicaState) {
|
state: ReplicaState) {
|
||||||
|
|
||||||
def isReadyForFetch: Boolean = state == Fetching && !isDelayed
|
def isReadyForFetch: Boolean = state == Fetching && !isDelayed
|
||||||
|
|
||||||
def isTruncating: Boolean = state == Truncating && !isDelayed
|
def isTruncating: Boolean = state == Truncating && !isDelayed
|
||||||
|
|
||||||
def isDelayed: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) > 0
|
def isDelayed: Boolean = delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0)
|
||||||
|
|
||||||
override def toString: String = {
|
override def toString: String = {
|
||||||
s"FetchState(fetchOffset=$fetchOffset" +
|
s"FetchState(fetchOffset=$fetchOffset" +
|
||||||
s", currentLeaderEpoch=$currentLeaderEpoch" +
|
s", currentLeaderEpoch=$currentLeaderEpoch" +
|
||||||
s", state=$state" +
|
s", state=$state" +
|
||||||
s", delay=${delay.delayMs}ms" +
|
s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" +
|
||||||
s")"
|
s")"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -587,7 +587,7 @@ class ReplicaAlterLogDirsThreadTest {
|
||||||
// one partition is ready and one is delayed
|
// one partition is ready and one is delayed
|
||||||
val ResultWithPartitions(fetchRequest2Opt, partitionsWithError2) = thread.buildFetch(Map(
|
val ResultWithPartitions(fetchRequest2Opt, partitionsWithError2) = thread.buildFetch(Map(
|
||||||
t1p0 -> PartitionFetchState(140, leaderEpoch, state = Fetching),
|
t1p0 -> PartitionFetchState(140, leaderEpoch, state = Fetching),
|
||||||
t1p1 -> PartitionFetchState(160, leaderEpoch, delay = new DelayedItem(5000), state = Fetching)))
|
t1p1 -> PartitionFetchState(160, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching)))
|
||||||
|
|
||||||
assertTrue(fetchRequest2Opt.isDefined)
|
assertTrue(fetchRequest2Opt.isDefined)
|
||||||
val fetchRequest2 = fetchRequest2Opt.get
|
val fetchRequest2 = fetchRequest2Opt.get
|
||||||
|
@ -600,8 +600,8 @@ class ReplicaAlterLogDirsThreadTest {
|
||||||
|
|
||||||
// both partitions are delayed
|
// both partitions are delayed
|
||||||
val ResultWithPartitions(fetchRequest3Opt, partitionsWithError3) = thread.buildFetch(Map(
|
val ResultWithPartitions(fetchRequest3Opt, partitionsWithError3) = thread.buildFetch(Map(
|
||||||
t1p0 -> PartitionFetchState(140, leaderEpoch, delay = new DelayedItem(5000), state = Fetching),
|
t1p0 -> PartitionFetchState(140, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching),
|
||||||
t1p1 -> PartitionFetchState(160, leaderEpoch, delay = new DelayedItem(5000), state = Fetching)))
|
t1p1 -> PartitionFetchState(160, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching)))
|
||||||
assertTrue("Expected no fetch requests since all partitions are delayed", fetchRequest3Opt.isEmpty)
|
assertTrue("Expected no fetch requests since all partitions are delayed", fetchRequest3Opt.isEmpty)
|
||||||
assertFalse(partitionsWithError3.nonEmpty)
|
assertFalse(partitionsWithError3.nonEmpty)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue