Avoid gettimeofdaycalls in steady state fetch states

This commit is contained in:
Lucas Bradstreet 2019-10-01 13:46:30 -07:00
parent d2a13d8270
commit 854588804d
2 changed files with 9 additions and 9 deletions

View File

@ -31,7 +31,7 @@ import kafka.utils.CoreUtils.inLock
import org.apache.kafka.common.protocol.Errors
import AbstractFetcherThread._
import scala.collection.{Map, Seq, Set, mutable}
import scala.collection.{mutable, Map, Seq, Set}
import scala.collection.JavaConverters._
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
@ -615,7 +615,7 @@ abstract class AbstractFetcherThread(name: String,
Option(partitionStates.stateValue(partition)).foreach { currentFetchState =>
if (!currentFetchState.isDelayed) {
partitionStates.updateAndMoveToEnd(partition, PartitionFetchState(currentFetchState.fetchOffset,
currentFetchState.currentLeaderEpoch, new DelayedItem(delay), currentFetchState.state))
currentFetchState.currentLeaderEpoch, Some(new DelayedItem(delay)), currentFetchState.state))
}
}
}
@ -757,7 +757,7 @@ case object Fetching extends ReplicaState
object PartitionFetchState {
def apply(offset: Long, currentLeaderEpoch: Int, state: ReplicaState): PartitionFetchState = {
PartitionFetchState(offset, currentLeaderEpoch, new DelayedItem(0), state)
PartitionFetchState(offset, currentLeaderEpoch, None, state)
}
}
@ -771,20 +771,20 @@ object PartitionFetchState {
*/
case class PartitionFetchState(fetchOffset: Long,
currentLeaderEpoch: Int,
delay: DelayedItem,
delay: Option[DelayedItem],
state: ReplicaState) {
def isReadyForFetch: Boolean = state == Fetching && !isDelayed
def isTruncating: Boolean = state == Truncating && !isDelayed
def isDelayed: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) > 0
def isDelayed: Boolean = delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0)
override def toString: String = {
s"FetchState(fetchOffset=$fetchOffset" +
s", currentLeaderEpoch=$currentLeaderEpoch" +
s", state=$state" +
s", delay=${delay.delayMs}ms" +
s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" +
s")"
}
}

View File

@ -587,7 +587,7 @@ class ReplicaAlterLogDirsThreadTest {
// one partition is ready and one is delayed
val ResultWithPartitions(fetchRequest2Opt, partitionsWithError2) = thread.buildFetch(Map(
t1p0 -> PartitionFetchState(140, leaderEpoch, state = Fetching),
t1p1 -> PartitionFetchState(160, leaderEpoch, delay = new DelayedItem(5000), state = Fetching)))
t1p1 -> PartitionFetchState(160, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching)))
assertTrue(fetchRequest2Opt.isDefined)
val fetchRequest2 = fetchRequest2Opt.get
@ -600,8 +600,8 @@ class ReplicaAlterLogDirsThreadTest {
// both partitions are delayed
val ResultWithPartitions(fetchRequest3Opt, partitionsWithError3) = thread.buildFetch(Map(
t1p0 -> PartitionFetchState(140, leaderEpoch, delay = new DelayedItem(5000), state = Fetching),
t1p1 -> PartitionFetchState(160, leaderEpoch, delay = new DelayedItem(5000), state = Fetching)))
t1p0 -> PartitionFetchState(140, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching),
t1p1 -> PartitionFetchState(160, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching)))
assertTrue("Expected no fetch requests since all partitions are delayed", fetchRequest3Opt.isEmpty)
assertFalse(partitionsWithError3.nonEmpty)
}