diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml
index 49bd8c78692..76475ec386c 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -36,7 +36,9 @@
+
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
index 0dc8943fdcb..c552f7bd370 100644
--- a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
@@ -185,7 +185,18 @@ public class FetchSessionHandler {
* Another reason is because we make use of the list ordering to optimize the preparation of
* incremental fetch requests (see below).
*/
- private LinkedHashMap next = new LinkedHashMap<>();
+ private LinkedHashMap next;
+ private final boolean copySessionPartitions;
+
+ Builder() {
+ this.next = new LinkedHashMap<>();
+ this.copySessionPartitions = true;
+ }
+
+ Builder(int initialSize, boolean copySessionPartitions) {
+ this.next = new LinkedHashMap<>(initialSize);
+ this.copySessionPartitions = copySessionPartitions;
+ }
/**
* Mark that we want data from this partition in the upcoming fetch.
@@ -215,15 +226,10 @@ public class FetchSessionHandler {
Entry entry = iter.next();
TopicPartition topicPartition = entry.getKey();
PartitionData prevData = entry.getValue();
- PartitionData nextData = next.get(topicPartition);
+ PartitionData nextData = next.remove(topicPartition);
if (nextData != null) {
- if (prevData.equals(nextData)) {
- // Omit this partition from the FetchRequest, because it hasn't changed
- // since the previous request.
- next.remove(topicPartition);
- } else {
- // Move the altered partition to the end of 'next'
- next.remove(topicPartition);
+ if (!prevData.equals(nextData)) {
+ // Re-add the altered partition to the end of 'next'
next.put(topicPartition, nextData);
entry.setValue(nextData);
altered.add(topicPartition);
@@ -255,10 +261,10 @@ public class FetchSessionHandler {
partitionsToLogString(altered), partitionsToLogString(removed),
partitionsToLogString(sessionPartitions.keySet()));
}
- Map toSend =
- Collections.unmodifiableMap(new LinkedHashMap<>(next));
- Map curSessionPartitions =
- Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions));
+ Map toSend = Collections.unmodifiableMap(next);
+ Map curSessionPartitions = copySessionPartitions
+ ? Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions))
+ : Collections.unmodifiableMap(sessionPartitions);
next = null;
return new FetchRequestData(toSend, Collections.unmodifiableList(removed),
curSessionPartitions, nextMetadata);
@@ -269,6 +275,18 @@ public class FetchSessionHandler {
return new Builder();
}
+
+ /** A builder that allows for presizing the PartitionData hashmap, and avoiding making a
+ * secondary copy of the sessionPartitions, in cases where this is not necessarily.
+ * This builder is primarily for use by the Replica Fetcher
+ * @param size the initial size of the PartitionData hashmap
+ * @param copySessionPartitions boolean denoting whether the builder should make a deep copy of
+ * session partitions
+ */
+ public Builder newBuilder(int size, boolean copySessionPartitions) {
+ return new Builder(size, copySessionPartitions);
+ }
+
private String partitionsToLogString(Collection partitions) {
if (!log.isTraceEnabled()) {
return String.format("%d partition(s)", partitions.size());
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
index daad3550738..c289eaa59b7 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
@@ -101,7 +101,7 @@ public class PartitionStates {
}
public LinkedHashMap partitionStateMap() {
- return new LinkedHashMap<>(map);
+ return map;
}
/**
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 6e2e5da4c9a..5c65bed128f 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -18,6 +18,7 @@
package kafka.server
import java.nio.ByteBuffer
+import java.util
import java.util.Optional
import java.util.concurrent.locks.ReentrantLock
@@ -29,16 +30,17 @@ import kafka.common.ClientIdAndBroker
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.CoreUtils.inLock
import org.apache.kafka.common.protocol.Errors
-import AbstractFetcherThread._
-import scala.collection.{Map, Seq, Set, mutable}
+import scala.collection.{mutable, Map, Seq, Set}
import scala.collection.JavaConverters._
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
-import java.util.function.Consumer
+import java.util.function.BiConsumer
import com.yammer.metrics.core.Gauge
import kafka.log.LogAppendInfo
+import kafka.server.AbstractFetcherThread.ReplicaFetch
+import kafka.server.AbstractFetcherThread.ResultWithPartitions
import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
import org.apache.kafka.common.internals.PartitionStates
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
@@ -79,17 +81,19 @@ abstract class AbstractFetcherThread(name: String,
protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit
- protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]]
+ protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]]
protected def latestEpoch(topicPartition: TopicPartition): Option[Int]
+ protected def logStartOffset(topicPartition: TopicPartition): Long
+
protected def logEndOffset(topicPartition: TopicPartition): Long
protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch]
protected def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset]
- protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)]
+ protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData]
protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long
@@ -115,9 +119,8 @@ abstract class AbstractFetcherThread(name: String,
}
private def maybeFetch(): Unit = {
- val (fetchStates, fetchRequestOpt) = inLock(partitionMapLock) {
- val fetchStates = partitionStates.partitionStateMap.asScala
- val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates)
+ val fetchRequestOpt = inLock(partitionMapLock) {
+ val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(partitionStates.partitionStateMap.asScala)
handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
@@ -126,11 +129,11 @@ abstract class AbstractFetcherThread(name: String,
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}
- (fetchStates, fetchRequestOpt)
+ fetchRequestOpt
}
- fetchRequestOpt.foreach { fetchRequest =>
- processFetchRequest(fetchStates, fetchRequest)
+ fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, fetchRequest) =>
+ processFetchRequest(sessionPartitions, fetchRequest)
}
}
@@ -150,13 +153,12 @@ abstract class AbstractFetcherThread(name: String,
val partitionsWithEpochs = mutable.Map.empty[TopicPartition, EpochData]
val partitionsWithoutEpochs = mutable.Set.empty[TopicPartition]
- partitionStates.stream().forEach(new Consumer[PartitionStates.PartitionState[PartitionFetchState]] {
- override def accept(state: PartitionStates.PartitionState[PartitionFetchState]): Unit = {
- if (state.value.isTruncating) {
- val tp = state.topicPartition
+ partitionStates.partitionStateMap.forEach(new BiConsumer[TopicPartition, PartitionFetchState] {
+ override def accept(tp: TopicPartition, state: PartitionFetchState): Unit = {
+ if (state.isTruncating) {
latestEpoch(tp) match {
case Some(epoch) if isOffsetForLeaderEpochSupported =>
- partitionsWithEpochs += tp -> new EpochData(Optional.of(state.value.currentLeaderEpoch), epoch)
+ partitionsWithEpochs += tp -> new EpochData(Optional.of(state.currentLeaderEpoch), epoch)
case _ =>
partitionsWithoutEpochs += tp
}
@@ -276,10 +278,10 @@ abstract class AbstractFetcherThread(name: String,
}
}
- private def processFetchRequest(fetchStates: Map[TopicPartition, PartitionFetchState],
+ private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
fetchRequest: FetchRequest.Builder): Unit = {
val partitionsWithError = mutable.Set[TopicPartition]()
- var responseData: Seq[(TopicPartition, FetchData)] = Seq.empty
+ var responseData: Map[TopicPartition, FetchData] = Map.empty
try {
trace(s"Sending fetch request $fetchRequest")
@@ -307,8 +309,8 @@ abstract class AbstractFetcherThread(name: String,
// It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
// In this case, we only want to process the fetch response if the partition state is ready for fetch and
// the current offset is the same as the offset requested.
- val fetchState = fetchStates(topicPartition)
- if (fetchState.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
+ val fetchPartitionData = sessionPartitions.get(topicPartition)
+ if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
partitionData.error match {
case Errors.NONE =>
try {
@@ -319,13 +321,13 @@ abstract class AbstractFetcherThread(name: String,
logAppendInfoOpt.foreach { logAppendInfo =>
val validBytes = logAppendInfo.validBytes
val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
- fetcherLagStats.getAndMaybePut(topicPartition).lag = Math.max(0L, partitionData.highWatermark - nextOffset)
+ val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
+ fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
// ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
if (validBytes > 0 && partitionStates.contains(topicPartition)) {
// Update partitionStates only if there is no exception during processPartitionData
- val newFetchState = PartitionFetchState(nextOffset, fetchState.currentLeaderEpoch,
- state = Fetching)
+ val newFetchState = PartitionFetchState(nextOffset, Some(lag), currentFetchState.currentLeaderEpoch, state = Fetching)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
fetcherStats.byteRate.mark(validBytes)
}
@@ -356,7 +358,7 @@ abstract class AbstractFetcherThread(name: String,
case Errors.UNKNOWN_LEADER_EPOCH =>
debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " +
- s"this replica's current leader epoch of ${fetchState.currentLeaderEpoch}.")
+ s"this replica's current leader epoch of ${currentFetchState.currentLeaderEpoch}.")
partitionsWithError += topicPartition
case Errors.FENCED_LEADER_EPOCH =>
@@ -388,7 +390,7 @@ abstract class AbstractFetcherThread(name: String,
try {
Option(partitionStates.stateValue(topicPartition)).foreach { state =>
val newState = PartitionFetchState(math.min(truncationOffset, state.fetchOffset),
- state.currentLeaderEpoch, state.delay, state = Truncating)
+ state.lag, state.currentLeaderEpoch, state.delay, state = Truncating)
partitionStates.updateAndMoveToEnd(topicPartition, newState)
partitionMapCond.signalAll()
}
@@ -413,12 +415,10 @@ abstract class AbstractFetcherThread(name: String,
val currentState = partitionStates.stateValue(tp)
val updatedState = if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.leaderEpoch) {
currentState
+ } else if (initialFetchState.offset < 0) {
+ fetchOffsetAndTruncate(tp, initialFetchState.leaderEpoch)
} else {
- val initialFetchOffset = if (initialFetchState.offset < 0)
- fetchOffsetAndTruncate(tp, initialFetchState.leaderEpoch)
- else
- initialFetchState.offset
- PartitionFetchState(initialFetchOffset, initialFetchState.leaderEpoch, state = Truncating)
+ PartitionFetchState(initialFetchState.offset, None, initialFetchState.leaderEpoch, state = Truncating)
}
partitionStates.updateAndMoveToEnd(tp, updatedState)
}
@@ -440,8 +440,8 @@ abstract class AbstractFetcherThread(name: String,
val maybeTruncationComplete = fetchOffsets.get(state.topicPartition) match {
case Some(offsetTruncationState) =>
val state = if (offsetTruncationState.truncationCompleted) Fetching else Truncating
- PartitionFetchState(offsetTruncationState.offset, currentFetchState.currentLeaderEpoch,
- currentFetchState.delay, state)
+ PartitionFetchState(offsetTruncationState.offset, currentFetchState.lag,
+ currentFetchState.currentLeaderEpoch, currentFetchState.delay, state)
case None => currentFetchState
}
(state.topicPartition, maybeTruncationComplete)
@@ -528,11 +528,10 @@ abstract class AbstractFetcherThread(name: String,
private def handleOutOfRangeError(topicPartition: TopicPartition,
fetchState: PartitionFetchState): Boolean = {
try {
- val newOffset = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch)
- val newFetchState = PartitionFetchState(newOffset, fetchState.currentLeaderEpoch, state = Fetching)
+ val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " +
- s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset")
+ s"out of range, which typically implies a leader change. Reset fetch offset to ${newFetchState.fetchOffset}")
true
} catch {
case _: FencedLeaderEpochException =>
@@ -554,7 +553,7 @@ abstract class AbstractFetcherThread(name: String,
/**
* Handle a partition whose offset is out of range and return a new fetch offset.
*/
- protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = {
+ protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, currentLeaderEpoch: Int): PartitionFetchState = {
val replicaEndOffset = logEndOffset(topicPartition)
/**
@@ -572,7 +571,9 @@ abstract class AbstractFetcherThread(name: String,
warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
s"leader's latest offset $leaderEndOffset")
truncate(topicPartition, OffsetTruncationState(leaderEndOffset, truncationCompleted = true))
- leaderEndOffset
+
+ fetcherLagStats.getAndMaybePut(topicPartition).lag = 0
+ PartitionFetchState(leaderEndOffset, Some(0), currentLeaderEpoch, state = Fetching)
} else {
/**
* If the leader's log end offset is greater than the follower's log end offset, there are two possibilities:
@@ -602,7 +603,10 @@ abstract class AbstractFetcherThread(name: String,
// Only truncate log when current leader's log start offset is greater than follower's log end offset.
if (leaderStartOffset > replicaEndOffset)
truncateFullyAndStartAt(topicPartition, leaderStartOffset)
- offsetToFetch
+
+ val initialLag = leaderEndOffset - offsetToFetch
+ fetcherLagStats.getAndMaybePut(topicPartition).lag = initialLag
+ PartitionFetchState(offsetToFetch, Some(initialLag), currentLeaderEpoch, state = Fetching)
}
}
@@ -613,7 +617,7 @@ abstract class AbstractFetcherThread(name: String,
Option(partitionStates.stateValue(partition)).foreach { currentFetchState =>
if (!currentFetchState.isDelayed) {
partitionStates.updateAndMoveToEnd(partition, PartitionFetchState(currentFetchState.fetchOffset,
- currentFetchState.currentLeaderEpoch, new DelayedItem(delay), currentFetchState.state))
+ currentFetchState.lag, currentFetchState.currentLeaderEpoch, Some(new DelayedItem(delay)), currentFetchState.state))
}
}
}
@@ -665,6 +669,7 @@ abstract class AbstractFetcherThread(name: String,
object AbstractFetcherThread {
+ case class ReplicaFetch(partitionData: util.Map[TopicPartition, FetchRequest.PartitionData], fetchRequest: FetchRequest.Builder)
case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition])
}
@@ -702,29 +707,21 @@ class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends KafkaMetricsGr
}
class FetcherLagStats(metricId: ClientIdAndBroker) {
- private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k)
- val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
+ private val valueFactory = (k: TopicPartition) => new FetcherLagMetrics(ClientIdTopicPartition(metricId.clientId, k))
+ val stats = new Pool[TopicPartition, FetcherLagMetrics](Some(valueFactory))
def getAndMaybePut(topicPartition: TopicPartition): FetcherLagMetrics = {
- stats.getAndMaybePut(ClientIdTopicPartition(metricId.clientId, topicPartition))
- }
-
- def isReplicaInSync(topicPartition: TopicPartition): Boolean = {
- val fetcherLagMetrics = stats.get(ClientIdTopicPartition(metricId.clientId, topicPartition))
- if (fetcherLagMetrics != null)
- fetcherLagMetrics.lag <= 0
- else
- false
+ stats.getAndMaybePut(topicPartition)
}
def unregister(topicPartition: TopicPartition): Unit = {
- val lagMetrics = stats.remove(ClientIdTopicPartition(metricId.clientId, topicPartition))
+ val lagMetrics = stats.remove(topicPartition)
if (lagMetrics != null) lagMetrics.unregister()
}
def unregister(): Unit = {
- stats.keys.toBuffer.foreach { key: ClientIdTopicPartition =>
- unregister(key.topicPartition)
+ stats.keys.toBuffer.foreach { key: TopicPartition =>
+ unregister(key)
}
}
}
@@ -754,8 +751,8 @@ case object Truncating extends ReplicaState
case object Fetching extends ReplicaState
object PartitionFetchState {
- def apply(offset: Long, currentLeaderEpoch: Int, state: ReplicaState): PartitionFetchState = {
- PartitionFetchState(offset, currentLeaderEpoch, new DelayedItem(0), state)
+ def apply(offset: Long, lag: Option[Long], currentLeaderEpoch: Int, state: ReplicaState): PartitionFetchState = {
+ PartitionFetchState(offset, lag, currentLeaderEpoch, None, state)
}
}
@@ -768,21 +765,25 @@ object PartitionFetchState {
* (3) ReadyForFetch, the is the active state where the thread is actively fetching data.
*/
case class PartitionFetchState(fetchOffset: Long,
+ lag: Option[Long],
currentLeaderEpoch: Int,
- delay: DelayedItem,
+ delay: Option[DelayedItem],
state: ReplicaState) {
def isReadyForFetch: Boolean = state == Fetching && !isDelayed
+ def isReplicaInSync: Boolean = lag.isDefined && lag.get <= 0
+
def isTruncating: Boolean = state == Truncating && !isDelayed
- def isDelayed: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) > 0
+ def isDelayed: Boolean = delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0)
override def toString: String = {
s"FetchState(fetchOffset=$fetchOffset" +
s", currentLeaderEpoch=$currentLeaderEpoch" +
s", state=$state" +
- s", delay=${delay.delayMs}ms" +
+ s", lag=$lag" +
+ s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" +
s")"
}
}
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index fdb2bfd6c03..c36de3d0b1d 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -23,6 +23,7 @@ import java.util.Optional
import kafka.api.Request
import kafka.cluster.BrokerEndPoint
import kafka.log.LogAppendInfo
+import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.QuotaFactory.UnboundedQuota
import org.apache.kafka.common.TopicPartition
@@ -34,7 +35,7 @@ import org.apache.kafka.common.requests.FetchResponse.PartitionData
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
import scala.collection.JavaConverters._
-import scala.collection.{Map, Seq, Set, mutable}
+import scala.collection.{mutable, Map, Seq, Set}
class ReplicaAlterLogDirsThread(name: String,
sourceBroker: BrokerEndPoint,
@@ -59,6 +60,10 @@ class ReplicaAlterLogDirsThread(name: String,
replicaMgr.futureLocalLogOrException(topicPartition).latestEpoch
}
+ override protected def logStartOffset(topicPartition: TopicPartition): Long = {
+ replicaMgr.futureLocalLogOrException(topicPartition).logStartOffset
+ }
+
override protected def logEndOffset(topicPartition: TopicPartition): Long = {
replicaMgr.futureLocalLogOrException(topicPartition).logEndOffset
}
@@ -67,7 +72,7 @@ class ReplicaAlterLogDirsThread(name: String,
replicaMgr.futureLocalLogOrException(topicPartition).endOffsetForEpoch(epoch)
}
- def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = {
+ def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = {
var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData[Records])] = null
val request = fetchRequest.build()
@@ -95,7 +100,7 @@ class ReplicaAlterLogDirsThread(name: String,
if (partitionData == null)
throw new IllegalStateException(s"Failed to fetch data for partitions ${request.fetchData.keySet().toArray.mkString(",")}")
- partitionData
+ partitionData.toMap
}
// process fetched data
@@ -218,7 +223,7 @@ class ReplicaAlterLogDirsThread(name: String,
nextPartitionOpt
}
- private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[FetchRequest.Builder]] = {
+ private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[ReplicaFetch]] = {
val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
val partitionsWithError = mutable.Set[TopicPartition]()
@@ -237,14 +242,14 @@ class ReplicaAlterLogDirsThread(name: String,
} else {
// Set maxWait and minBytes to 0 because the response should return immediately if
// the future log has caught up with the current log of the partition
- Some(FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap)
- .setMaxBytes(maxBytes))
+ val requestBuilder = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap).setMaxBytes(maxBytes)
+ Some(ReplicaFetch(requestMap, requestBuilder))
}
ResultWithPartitions(fetchRequestOpt, partitionsWithError)
}
- def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
+ def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
// Only include replica in the fetch request if it is not throttled.
if (quota.isQuotaExceeded) {
ResultWithPartitions(None, Set.empty)
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 1d2fdeed2d6..83a5dea2385 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -22,6 +22,7 @@ import java.util.Optional
import kafka.api._
import kafka.cluster.BrokerEndPoint
import kafka.log.LogAppendInfo
+import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.TopicPartition
@@ -34,7 +35,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{LogContext, Time}
import scala.collection.JavaConverters._
-import scala.collection.{Map, mutable}
+import scala.collection.{mutable, Map}
class ReplicaFetcherThread(name: String,
fetcherId: Int,
@@ -96,12 +97,16 @@ class ReplicaFetcherThread(name: String,
private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
private val fetchSize = brokerConfig.replicaFetchMaxBytes
private val brokerSupportsLeaderEpochRequest = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
- private val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
+ val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
replicaMgr.localLogOrException(topicPartition).latestEpoch
}
+ override protected def logStartOffset(topicPartition: TopicPartition): Long = {
+ replicaMgr.localLogOrException(topicPartition).logStartOffset
+ }
+
override protected def logEndOffset(topicPartition: TopicPartition): Long = {
replicaMgr.localLogOrException(topicPartition).logEndOffset
}
@@ -191,14 +196,14 @@ class ReplicaFetcherThread(name: String,
}
- override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = {
+ override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = {
try {
val clientResponse = leaderEndpoint.sendRequest(fetchRequest)
val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse[Records]]
if (!fetchSessionHandler.handleResponse(fetchResponse)) {
- Nil
+ Map.empty
} else {
- fetchResponse.responseData.asScala.toSeq
+ fetchResponse.responseData.asScala
}
} catch {
case t: Throwable =>
@@ -236,15 +241,15 @@ class ReplicaFetcherThread(name: String,
}
}
- override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
+ override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
val partitionsWithError = mutable.Set[TopicPartition]()
- val builder = fetchSessionHandler.newBuilder()
+ val builder = fetchSessionHandler.newBuilder(partitionMap.size, false)
partitionMap.foreach { case (topicPartition, fetchState) =>
// We will not include a replica in the fetch request if it should be throttled.
- if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, topicPartition)) {
+ if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, fetchState, topicPartition)) {
try {
- val logStartOffset = replicaMgr.localLogOrException(topicPartition).logStartOffset
+ val logStartOffset = this.logStartOffset(topicPartition)
builder.add(topicPartition, new FetchRequest.PartitionData(
fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch)))
} catch {
@@ -265,7 +270,7 @@ class ReplicaFetcherThread(name: String,
.setMaxBytes(maxBytes)
.toForget(fetchData.toForget)
.metadata(fetchData.metadata)
- Some(requestBuilder)
+ Some(ReplicaFetch(fetchData.sessionPartitions(), requestBuilder))
}
ResultWithPartitions(fetchRequestOpt, partitionsWithError)
@@ -330,9 +335,8 @@ class ReplicaFetcherThread(name: String,
* To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
* the quota is exceeded and the replica is not in sync.
*/
- private def shouldFollowerThrottle(quota: ReplicaQuota, topicPartition: TopicPartition): Boolean = {
- val isReplicaInSync = fetcherLagStats.isReplicaInSync(topicPartition)
- !isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
+ private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = {
+ !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
}
}
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index d197845d4da..ecd92bfba29 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -59,7 +59,7 @@ class AbstractFetcherManagerTest {
EasyMock.expect(fetcher.start())
EasyMock.expect(fetcher.addPartitions(Map(tp -> OffsetAndEpoch(fetchOffset, leaderEpoch))))
EasyMock.expect(fetcher.fetchState(tp))
- .andReturn(Some(PartitionFetchState(fetchOffset, leaderEpoch, Truncating)))
+ .andReturn(Some(PartitionFetchState(fetchOffset, None, leaderEpoch, Truncating)))
EasyMock.expect(fetcher.removePartitions(Set(tp)))
EasyMock.expect(fetcher.fetchState(tp)).andReturn(None)
EasyMock.replay(fetcher)
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 55c38a1deab..17075219614 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -25,6 +25,7 @@ import com.yammer.metrics.Metrics
import kafka.cluster.BrokerEndPoint
import kafka.log.LogAppendInfo
import kafka.message.NoCompressionCodec
+import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.utils.TestUtils
import org.apache.kafka.common.KafkaException
@@ -38,7 +39,7 @@ import org.junit.Assert._
import org.junit.{Before, Test}
import scala.collection.JavaConverters._
-import scala.collection.{Map, Set, mutable}
+import scala.collection.{mutable, Map, Set}
import scala.util.Random
import org.scalatest.Assertions.assertThrows
@@ -575,7 +576,7 @@ class AbstractFetcherThreadTest {
val fetcher = new MockFetcherThread {
var fetchedOnce = false
- override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = {
+ override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = {
val fetchedData = super.fetchFromLeader(fetchRequest)
if (!fetchedOnce) {
val records = fetchedData.head._2.records.asInstanceOf[MemoryRecords]
@@ -901,7 +902,7 @@ class AbstractFetcherThreadTest {
state.highWatermark = offset
}
- override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
+ override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
val fetchData = mutable.Map.empty[TopicPartition, FetchRequest.PartitionData]
partitionMap.foreach { case (partition, state) =>
if (state.isReadyForFetch) {
@@ -911,7 +912,7 @@ class AbstractFetcherThreadTest {
}
}
val fetchRequest = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 1, fetchData.asJava)
- ResultWithPartitions(Some(fetchRequest), Set.empty)
+ ResultWithPartitions(Some(ReplicaFetch(fetchData.asJava, fetchRequest)), Set.empty)
}
override def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
@@ -919,6 +920,8 @@ class AbstractFetcherThreadTest {
state.log.lastOption.map(_.partitionLeaderEpoch).orElse(Some(EpochEndOffset.UNDEFINED_EPOCH))
}
+ override def logStartOffset(topicPartition: TopicPartition): Long = replicaPartitionState(topicPartition).logStartOffset
+
override def logEndOffset(topicPartition: TopicPartition): Long = replicaPartitionState(topicPartition).logEndOffset
override def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = {
@@ -973,7 +976,7 @@ class AbstractFetcherThreadTest {
override protected def isOffsetForLeaderEpochSupported: Boolean = true
- override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = {
+ override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = {
fetchRequest.fetchData.asScala.map { case (partition, fetchData) =>
val leaderState = leaderPartitionState(partition)
val epochCheckError = checkExpectedLeaderEpoch(fetchData.currentLeaderEpoch, leaderState)
@@ -1000,7 +1003,7 @@ class AbstractFetcherThreadTest {
(partition, new FetchData(error, leaderState.highWatermark, leaderState.highWatermark, leaderState.logStartOffset,
List.empty.asJava, records))
- }.toSeq
+ }.toMap
}
private def checkLeaderEpochAndThrow(expectedEpoch: Int, partitionState: PartitionState): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 79457c0ee82..6a7d8c80d4e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -20,6 +20,7 @@ import java.util.Optional
import kafka.cluster.{BrokerEndPoint, Partition}
import kafka.log.{Log, LogManager}
+import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.utils.{DelayedItem, TestUtils}
import org.apache.kafka.common.TopicPartition
@@ -520,11 +521,11 @@ class ReplicaAlterLogDirsThreadTest {
t1p1 -> offsetAndEpoch(0L, leaderEpoch)))
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = thread.buildFetch(Map(
- t1p0 -> PartitionFetchState(150, leaderEpoch, state = Fetching),
- t1p1 -> PartitionFetchState(160, leaderEpoch, state = Fetching)))
+ t1p0 -> PartitionFetchState(150, None, leaderEpoch, None, state = Fetching),
+ t1p1 -> PartitionFetchState(160, None, leaderEpoch, None, state = Fetching)))
assertTrue(fetchRequestOpt.isDefined)
- val fetchRequest = fetchRequestOpt.get
+ val fetchRequest = fetchRequestOpt.get.fetchRequest
assertFalse(fetchRequest.fetchData.isEmpty)
assertFalse(partitionsWithError.nonEmpty)
val request = fetchRequest.build()
@@ -572,36 +573,36 @@ class ReplicaAlterLogDirsThreadTest {
// one partition is ready and one is truncating
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = thread.buildFetch(Map(
- t1p0 -> PartitionFetchState(150, leaderEpoch, state = Fetching),
- t1p1 -> PartitionFetchState(160, leaderEpoch, state = Truncating)))
+ t1p0 -> PartitionFetchState(150, None, leaderEpoch, state = Fetching),
+ t1p1 -> PartitionFetchState(160, None, leaderEpoch, state = Truncating)))
assertTrue(fetchRequestOpt.isDefined)
val fetchRequest = fetchRequestOpt.get
- assertFalse(fetchRequest.fetchData.isEmpty)
+ assertFalse(fetchRequest.partitionData.isEmpty)
assertFalse(partitionsWithError.nonEmpty)
- val fetchInfos = fetchRequest.build().fetchData.asScala.toSeq
+ val fetchInfos = fetchRequest.fetchRequest.build().fetchData.asScala.toSeq
assertEquals(1, fetchInfos.length)
assertEquals("Expected fetch request for non-truncating partition", t1p0, fetchInfos.head._1)
assertEquals(150, fetchInfos.head._2.fetchOffset)
// one partition is ready and one is delayed
val ResultWithPartitions(fetchRequest2Opt, partitionsWithError2) = thread.buildFetch(Map(
- t1p0 -> PartitionFetchState(140, leaderEpoch, state = Fetching),
- t1p1 -> PartitionFetchState(160, leaderEpoch, delay = new DelayedItem(5000), state = Fetching)))
+ t1p0 -> PartitionFetchState(140, None, leaderEpoch, state = Fetching),
+ t1p1 -> PartitionFetchState(160, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching)))
assertTrue(fetchRequest2Opt.isDefined)
val fetchRequest2 = fetchRequest2Opt.get
- assertFalse(fetchRequest2.fetchData.isEmpty)
+ assertFalse(fetchRequest2.partitionData.isEmpty)
assertFalse(partitionsWithError2.nonEmpty)
- val fetchInfos2 = fetchRequest2.build().fetchData.asScala.toSeq
+ val fetchInfos2 = fetchRequest2.fetchRequest.build().fetchData.asScala.toSeq
assertEquals(1, fetchInfos2.length)
assertEquals("Expected fetch request for non-delayed partition", t1p0, fetchInfos2.head._1)
assertEquals(140, fetchInfos2.head._2.fetchOffset)
// both partitions are delayed
val ResultWithPartitions(fetchRequest3Opt, partitionsWithError3) = thread.buildFetch(Map(
- t1p0 -> PartitionFetchState(140, leaderEpoch, delay = new DelayedItem(5000), state = Fetching),
- t1p1 -> PartitionFetchState(160, leaderEpoch, delay = new DelayedItem(5000), state = Fetching)))
+ t1p0 -> PartitionFetchState(140, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching),
+ t1p1 -> PartitionFetchState(160, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching)))
assertTrue("Expected no fetch requests since all partitions are delayed", fetchRequest3Opt.isEmpty)
assertFalse(partitionsWithError3.nonEmpty)
}
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 70ea5b0682a..f9c0f725710 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -208,6 +208,8 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
+
+
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
new file mode 100644
index 00000000000..0635ff53387
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.fetcher;
+
+import kafka.api.ApiVersion$;
+import kafka.cluster.BrokerEndPoint;
+import kafka.cluster.DelayedOperations;
+import kafka.cluster.Partition;
+import kafka.cluster.PartitionStateStore;
+import kafka.log.CleanerConfig;
+import kafka.log.Defaults;
+import kafka.log.LogAppendInfo;
+import kafka.log.LogConfig;
+import kafka.log.LogManager;
+import kafka.server.BrokerState;
+import kafka.server.BrokerTopicStats;
+import kafka.server.FailedPartitions;
+import kafka.server.KafkaConfig;
+import kafka.server.LogDirFailureChannel;
+import kafka.server.MetadataCache;
+import kafka.server.OffsetAndEpoch;
+import kafka.server.OffsetTruncationState;
+import kafka.server.ReplicaFetcherThread;
+import kafka.server.ReplicaQuota;
+import kafka.server.checkpoints.OffsetCheckpoints;
+import kafka.utils.KafkaScheduler;
+import kafka.utils.Pool;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.LeaderAndIsrRequestData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.RecordsSend;
+import org.apache.kafka.common.requests.EpochEndOffset;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.mockito.Mockito;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import scala.Option;
+import scala.collection.Iterator;
+import scala.collection.JavaConverters;
+import scala.compat.java8.OptionConverters;
+import scala.collection.Map;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+
+public class ReplicaFetcherThreadBenchmark {
+ @Param({"100", "500", "1000", "5000"})
+ private int partitionCount;
+
+ private ReplicaFetcherBenchThread fetcher;
+ private LogManager logManager;
+ private File logDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
+ private KafkaScheduler scheduler = new KafkaScheduler(1, "scheduler", true);
+ private Pool pool = new Pool(Option.empty());
+
+ @Setup(Level.Trial)
+ public void setup() throws IOException {
+ if (!logDir.mkdir())
+ throw new IOException("error creating test directory");
+
+ scheduler.startup();
+ Properties props = new Properties();
+ props.put("zookeeper.connect", "127.0.0.1:9999");
+ KafkaConfig config = new KafkaConfig(props);
+ LogConfig logConfig = createLogConfig();
+
+ List logDirs = Collections.singletonList(logDir);
+ BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
+ logManager = new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(),
+ JavaConverters.asScalaIteratorConverter(new ArrayList().iterator()).asScala().toSeq(),
+ new scala.collection.mutable.HashMap<>(),
+ logConfig,
+ new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5"),
+ 1,
+ 1000L,
+ 10000L,
+ 10000L,
+ 1000L,
+ 60000,
+ scheduler,
+ new BrokerState(),
+ brokerTopicStats,
+ logDirFailureChannel,
+ Time.SYSTEM);
+
+ LinkedHashMap> initialFetched = new LinkedHashMap<>();
+ scala.collection.mutable.Map offsetAndEpochs = new scala.collection.mutable.HashMap<>();
+ for (int i = 0; i < partitionCount; i++) {
+ TopicPartition tp = new TopicPartition("topic", i);
+
+ List replicas = Arrays.asList(0, 1, 2);
+ LeaderAndIsrRequestData.LeaderAndIsrPartitionState partitionState = new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
+ .setControllerEpoch(0)
+ .setLeader(0)
+ .setLeaderEpoch(0)
+ .setIsr(replicas)
+ .setZkVersion(1)
+ .setReplicas(replicas)
+ .setIsNew(true);
+
+ PartitionStateStore partitionStateStore = Mockito.mock(PartitionStateStore.class);
+ Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new Properties());
+ OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class);
+ Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Option.apply(0L));
+ Partition partition = new Partition(tp, 100, ApiVersion$.MODULE$.latestVersion(),
+ 0, Time.SYSTEM, partitionStateStore, new DelayedOperationsMock(tp),
+ Mockito.mock(MetadataCache.class), logManager);
+
+ partition.makeFollower(0, partitionState, 0, offsetCheckpoints);
+ pool.put(tp, partition);
+ offsetAndEpochs.put(tp, new OffsetAndEpoch(0, 0));
+ BaseRecords fetched = new BaseRecords() {
+ @Override
+ public int sizeInBytes() {
+ return 0;
+ }
+
+ @Override
+ public RecordsSend toSend(String destination) {
+ return null;
+ }
+ };
+ initialFetched.put(tp, new FetchResponse.PartitionData<>(Errors.NONE, 0, 0, 0,
+ new LinkedList<>(), fetched));
+ }
+
+ fetcher = new ReplicaFetcherBenchThread(config, pool);
+ fetcher.addPartitions(offsetAndEpochs);
+ // force a pass to move partitions to fetching state. We do this in the setup phase
+ // so that we do not measure this time as part of the steady state work
+ fetcher.doWork();
+ // handle response to engage the incremental fetch session handler
+ fetcher.fetchSessionHandler().handleResponse(new FetchResponse<>(Errors.NONE, initialFetched, 0, 999));
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDown() throws IOException {
+ logManager.shutdown();
+ scheduler.shutdown();
+ Utils.delete(logDir);
+ }
+
+ @Benchmark
+ public long testFetcher() {
+ fetcher.doWork();
+ return fetcher.fetcherStats().requestRate().count();
+ }
+
+ // avoid mocked DelayedOperations to avoid mocked class affecting benchmark results
+ private static class DelayedOperationsMock extends DelayedOperations {
+ DelayedOperationsMock(TopicPartition topicPartition) {
+ super(topicPartition, null, null, null);
+ }
+
+ @Override
+ public int numDelayedDelete() {
+ return 0;
+ }
+ }
+
+ private static LogConfig createLogConfig() {
+ Properties logProps = new Properties();
+ logProps.put(LogConfig.SegmentMsProp(), Defaults.SegmentMs());
+ logProps.put(LogConfig.SegmentBytesProp(), Defaults.SegmentSize());
+ logProps.put(LogConfig.RetentionMsProp(), Defaults.RetentionMs());
+ logProps.put(LogConfig.RetentionBytesProp(), Defaults.RetentionSize());
+ logProps.put(LogConfig.SegmentJitterMsProp(), Defaults.SegmentJitterMs());
+ logProps.put(LogConfig.CleanupPolicyProp(), Defaults.CleanupPolicy());
+ logProps.put(LogConfig.MaxMessageBytesProp(), Defaults.MaxMessageSize());
+ logProps.put(LogConfig.IndexIntervalBytesProp(), Defaults.IndexInterval());
+ logProps.put(LogConfig.SegmentIndexBytesProp(), Defaults.MaxIndexSize());
+ logProps.put(LogConfig.MessageFormatVersionProp(), Defaults.MessageFormatVersion());
+ logProps.put(LogConfig.FileDeleteDelayMsProp(), Defaults.FileDeleteDelayMs());
+ return LogConfig.apply(logProps, new scala.collection.immutable.HashSet<>());
+ }
+
+
+ static class ReplicaFetcherBenchThread extends ReplicaFetcherThread {
+ private final Pool pool;
+
+ ReplicaFetcherBenchThread(KafkaConfig config, Pool partitions) {
+ super("name",
+ 3,
+ new BrokerEndPoint(3, "host", 3000),
+ config,
+ new FailedPartitions(),
+ null,
+ new Metrics(),
+ Time.SYSTEM,
+ new ReplicaQuota() {
+ @Override
+ public boolean isQuotaExceeded() {
+ return false;
+ }
+
+ @Override
+ public void record(long value) {
+ }
+
+ @Override
+ public boolean isThrottled(TopicPartition topicPartition) {
+ return false;
+ }
+ },
+ Option.empty());
+
+ pool = partitions;
+ }
+
+ @Override
+ public Option