KAFKA-9039: Optimize ReplicaFetcher fetch path (#7443)

Improves the performance of the replica fetcher for high partition count fetch requests, where a majority of the partitions did not update between fetch requests. All benchmarks were run on an r5x.large.

Vanilla
Benchmark (partitionCount) Mode Cnt Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 15 26491.825 ± 438.463 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 15 153941.952 ± 4337.073 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 15 339868.602 ± 4201.462 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 15 2588878.448 ± 22172.482 ns/op

From 100 to 5000 partitions the latency increase is 2588878.448 / 26491.825 = 97.

Avoid gettimeofdaycalls in steady state fetch states
8545888

Benchmark (partitionCount) Mode Cnt Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 15 22685.381 ± 267.727 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 15 113622.521 ± 1854.254 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 15 273698.740 ± 9269.554 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 15 2189223.207 ± 1706.945 ns/op

From 100 to 5000 partitions the latency increase is 2189223.207 / 22685.381 = 97X

Avoid copying partition states to maintain fetch offsets
29fdd60

Benchmark (partitionCount) Mode Cnt Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 15 17039.989 ± 609.355 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 15 99371.086 ± 1833.256 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 15 216071.333 ± 3714.147 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 15 2035678.223 ± 5195.232 ns/op

From 100 to 5000 partitions the latency increase is 2035678.223 / 17039.989 = 119X

Keep lag alongside PartitionFetchState to avoid expensive isReplicaInSync check
0e57e3e

Benchmark (partitionCount) Mode Cnt Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 15 15131.684 ± 382.088 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 15 86813.843 ± 3346.385 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 15 193050.381 ± 3281.833 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 15 1801488.513 ± 2756.355 ns/op

From 100 to 5000 partitions the latency increase is 1801488.513 / 15131.684 = 119X

Fetch session optimizations (mostly presizing the next hashmap, and avoiding making a copy of sessionPartitions, as a deep copy is not required for the ReplicaFetcher)
2614b24

Benchmark (partitionCount) Mode Cnt Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 15 11386.203 ± 416.701 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 15 60820.292 ± 3163.001 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 15 146242.158 ± 1937.254 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 15 1366768.926 ± 3305.712 ns/op

From 100 to 5000 partitions the latency increase is 1366768.926 / 11386.203 = 120

Reviewers: Jun Rao <junrao@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Lucas Bradstreet 2019-10-16 09:49:53 -07:00 committed by Guozhang Wang
parent 072503527e
commit 8966d066bd
12 changed files with 579 additions and 112 deletions

View File

@ -36,7 +36,9 @@
<allow pkg="kafka.log"/>
<allow pkg="kafka.server"/>
<allow pkg="kafka.api"/>
<allow class="kafka.utils.Pool"/>
<allow class="kafka.utils.KafkaScheduler"/>
<allow class="org.apache.kafka.clients.FetchSessionHandler"/>
<allow pkg="org.mockito"/>

View File

@ -185,7 +185,18 @@ public class FetchSessionHandler {
* Another reason is because we make use of the list ordering to optimize the preparation of
* incremental fetch requests (see below).
*/
private LinkedHashMap<TopicPartition, PartitionData> next = new LinkedHashMap<>();
private LinkedHashMap<TopicPartition, PartitionData> next;
private final boolean copySessionPartitions;
Builder() {
this.next = new LinkedHashMap<>();
this.copySessionPartitions = true;
}
Builder(int initialSize, boolean copySessionPartitions) {
this.next = new LinkedHashMap<>(initialSize);
this.copySessionPartitions = copySessionPartitions;
}
/**
* Mark that we want data from this partition in the upcoming fetch.
@ -215,15 +226,10 @@ public class FetchSessionHandler {
Entry<TopicPartition, PartitionData> entry = iter.next();
TopicPartition topicPartition = entry.getKey();
PartitionData prevData = entry.getValue();
PartitionData nextData = next.get(topicPartition);
PartitionData nextData = next.remove(topicPartition);
if (nextData != null) {
if (prevData.equals(nextData)) {
// Omit this partition from the FetchRequest, because it hasn't changed
// since the previous request.
next.remove(topicPartition);
} else {
// Move the altered partition to the end of 'next'
next.remove(topicPartition);
if (!prevData.equals(nextData)) {
// Re-add the altered partition to the end of 'next'
next.put(topicPartition, nextData);
entry.setValue(nextData);
altered.add(topicPartition);
@ -255,10 +261,10 @@ public class FetchSessionHandler {
partitionsToLogString(altered), partitionsToLogString(removed),
partitionsToLogString(sessionPartitions.keySet()));
}
Map<TopicPartition, PartitionData> toSend =
Collections.unmodifiableMap(new LinkedHashMap<>(next));
Map<TopicPartition, PartitionData> curSessionPartitions =
Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions));
Map<TopicPartition, PartitionData> toSend = Collections.unmodifiableMap(next);
Map<TopicPartition, PartitionData> curSessionPartitions = copySessionPartitions
? Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions))
: Collections.unmodifiableMap(sessionPartitions);
next = null;
return new FetchRequestData(toSend, Collections.unmodifiableList(removed),
curSessionPartitions, nextMetadata);
@ -269,6 +275,18 @@ public class FetchSessionHandler {
return new Builder();
}
/** A builder that allows for presizing the PartitionData hashmap, and avoiding making a
* secondary copy of the sessionPartitions, in cases where this is not necessarily.
* This builder is primarily for use by the Replica Fetcher
* @param size the initial size of the PartitionData hashmap
* @param copySessionPartitions boolean denoting whether the builder should make a deep copy of
* session partitions
*/
public Builder newBuilder(int size, boolean copySessionPartitions) {
return new Builder(size, copySessionPartitions);
}
private String partitionsToLogString(Collection<TopicPartition> partitions) {
if (!log.isTraceEnabled()) {
return String.format("%d partition(s)", partitions.size());

View File

@ -101,7 +101,7 @@ public class PartitionStates<S> {
}
public LinkedHashMap<TopicPartition, S> partitionStateMap() {
return new LinkedHashMap<>(map);
return map;
}
/**

View File

@ -18,6 +18,7 @@
package kafka.server
import java.nio.ByteBuffer
import java.util
import java.util.Optional
import java.util.concurrent.locks.ReentrantLock
@ -29,16 +30,17 @@ import kafka.common.ClientIdAndBroker
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.CoreUtils.inLock
import org.apache.kafka.common.protocol.Errors
import AbstractFetcherThread._
import scala.collection.{Map, Seq, Set, mutable}
import scala.collection.{mutable, Map, Seq, Set}
import scala.collection.JavaConverters._
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.function.Consumer
import java.util.function.BiConsumer
import com.yammer.metrics.core.Gauge
import kafka.log.LogAppendInfo
import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
import org.apache.kafka.common.internals.PartitionStates
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
@ -79,17 +81,19 @@ abstract class AbstractFetcherThread(name: String,
protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit
protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]]
protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]]
protected def latestEpoch(topicPartition: TopicPartition): Option[Int]
protected def logStartOffset(topicPartition: TopicPartition): Long
protected def logEndOffset(topicPartition: TopicPartition): Long
protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch]
protected def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset]
protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)]
protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData]
protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long
@ -115,9 +119,8 @@ abstract class AbstractFetcherThread(name: String,
}
private def maybeFetch(): Unit = {
val (fetchStates, fetchRequestOpt) = inLock(partitionMapLock) {
val fetchStates = partitionStates.partitionStateMap.asScala
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates)
val fetchRequestOpt = inLock(partitionMapLock) {
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(partitionStates.partitionStateMap.asScala)
handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
@ -126,11 +129,11 @@ abstract class AbstractFetcherThread(name: String,
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}
(fetchStates, fetchRequestOpt)
fetchRequestOpt
}
fetchRequestOpt.foreach { fetchRequest =>
processFetchRequest(fetchStates, fetchRequest)
fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, fetchRequest) =>
processFetchRequest(sessionPartitions, fetchRequest)
}
}
@ -150,13 +153,12 @@ abstract class AbstractFetcherThread(name: String,
val partitionsWithEpochs = mutable.Map.empty[TopicPartition, EpochData]
val partitionsWithoutEpochs = mutable.Set.empty[TopicPartition]
partitionStates.stream().forEach(new Consumer[PartitionStates.PartitionState[PartitionFetchState]] {
override def accept(state: PartitionStates.PartitionState[PartitionFetchState]): Unit = {
if (state.value.isTruncating) {
val tp = state.topicPartition
partitionStates.partitionStateMap.forEach(new BiConsumer[TopicPartition, PartitionFetchState] {
override def accept(tp: TopicPartition, state: PartitionFetchState): Unit = {
if (state.isTruncating) {
latestEpoch(tp) match {
case Some(epoch) if isOffsetForLeaderEpochSupported =>
partitionsWithEpochs += tp -> new EpochData(Optional.of(state.value.currentLeaderEpoch), epoch)
partitionsWithEpochs += tp -> new EpochData(Optional.of(state.currentLeaderEpoch), epoch)
case _ =>
partitionsWithoutEpochs += tp
}
@ -276,10 +278,10 @@ abstract class AbstractFetcherThread(name: String,
}
}
private def processFetchRequest(fetchStates: Map[TopicPartition, PartitionFetchState],
private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
fetchRequest: FetchRequest.Builder): Unit = {
val partitionsWithError = mutable.Set[TopicPartition]()
var responseData: Seq[(TopicPartition, FetchData)] = Seq.empty
var responseData: Map[TopicPartition, FetchData] = Map.empty
try {
trace(s"Sending fetch request $fetchRequest")
@ -307,8 +309,8 @@ abstract class AbstractFetcherThread(name: String,
// It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
// In this case, we only want to process the fetch response if the partition state is ready for fetch and
// the current offset is the same as the offset requested.
val fetchState = fetchStates(topicPartition)
if (fetchState.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
val fetchPartitionData = sessionPartitions.get(topicPartition)
if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
partitionData.error match {
case Errors.NONE =>
try {
@ -319,13 +321,13 @@ abstract class AbstractFetcherThread(name: String,
logAppendInfoOpt.foreach { logAppendInfo =>
val validBytes = logAppendInfo.validBytes
val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
fetcherLagStats.getAndMaybePut(topicPartition).lag = Math.max(0L, partitionData.highWatermark - nextOffset)
val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
// ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
if (validBytes > 0 && partitionStates.contains(topicPartition)) {
// Update partitionStates only if there is no exception during processPartitionData
val newFetchState = PartitionFetchState(nextOffset, fetchState.currentLeaderEpoch,
state = Fetching)
val newFetchState = PartitionFetchState(nextOffset, Some(lag), currentFetchState.currentLeaderEpoch, state = Fetching)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
fetcherStats.byteRate.mark(validBytes)
}
@ -356,7 +358,7 @@ abstract class AbstractFetcherThread(name: String,
case Errors.UNKNOWN_LEADER_EPOCH =>
debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " +
s"this replica's current leader epoch of ${fetchState.currentLeaderEpoch}.")
s"this replica's current leader epoch of ${currentFetchState.currentLeaderEpoch}.")
partitionsWithError += topicPartition
case Errors.FENCED_LEADER_EPOCH =>
@ -388,7 +390,7 @@ abstract class AbstractFetcherThread(name: String,
try {
Option(partitionStates.stateValue(topicPartition)).foreach { state =>
val newState = PartitionFetchState(math.min(truncationOffset, state.fetchOffset),
state.currentLeaderEpoch, state.delay, state = Truncating)
state.lag, state.currentLeaderEpoch, state.delay, state = Truncating)
partitionStates.updateAndMoveToEnd(topicPartition, newState)
partitionMapCond.signalAll()
}
@ -413,12 +415,10 @@ abstract class AbstractFetcherThread(name: String,
val currentState = partitionStates.stateValue(tp)
val updatedState = if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.leaderEpoch) {
currentState
} else if (initialFetchState.offset < 0) {
fetchOffsetAndTruncate(tp, initialFetchState.leaderEpoch)
} else {
val initialFetchOffset = if (initialFetchState.offset < 0)
fetchOffsetAndTruncate(tp, initialFetchState.leaderEpoch)
else
initialFetchState.offset
PartitionFetchState(initialFetchOffset, initialFetchState.leaderEpoch, state = Truncating)
PartitionFetchState(initialFetchState.offset, None, initialFetchState.leaderEpoch, state = Truncating)
}
partitionStates.updateAndMoveToEnd(tp, updatedState)
}
@ -440,8 +440,8 @@ abstract class AbstractFetcherThread(name: String,
val maybeTruncationComplete = fetchOffsets.get(state.topicPartition) match {
case Some(offsetTruncationState) =>
val state = if (offsetTruncationState.truncationCompleted) Fetching else Truncating
PartitionFetchState(offsetTruncationState.offset, currentFetchState.currentLeaderEpoch,
currentFetchState.delay, state)
PartitionFetchState(offsetTruncationState.offset, currentFetchState.lag,
currentFetchState.currentLeaderEpoch, currentFetchState.delay, state)
case None => currentFetchState
}
(state.topicPartition, maybeTruncationComplete)
@ -528,11 +528,10 @@ abstract class AbstractFetcherThread(name: String,
private def handleOutOfRangeError(topicPartition: TopicPartition,
fetchState: PartitionFetchState): Boolean = {
try {
val newOffset = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch)
val newFetchState = PartitionFetchState(newOffset, fetchState.currentLeaderEpoch, state = Fetching)
val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " +
s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset")
s"out of range, which typically implies a leader change. Reset fetch offset to ${newFetchState.fetchOffset}")
true
} catch {
case _: FencedLeaderEpochException =>
@ -554,7 +553,7 @@ abstract class AbstractFetcherThread(name: String,
/**
* Handle a partition whose offset is out of range and return a new fetch offset.
*/
protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = {
protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, currentLeaderEpoch: Int): PartitionFetchState = {
val replicaEndOffset = logEndOffset(topicPartition)
/**
@ -572,7 +571,9 @@ abstract class AbstractFetcherThread(name: String,
warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
s"leader's latest offset $leaderEndOffset")
truncate(topicPartition, OffsetTruncationState(leaderEndOffset, truncationCompleted = true))
leaderEndOffset
fetcherLagStats.getAndMaybePut(topicPartition).lag = 0
PartitionFetchState(leaderEndOffset, Some(0), currentLeaderEpoch, state = Fetching)
} else {
/**
* If the leader's log end offset is greater than the follower's log end offset, there are two possibilities:
@ -602,7 +603,10 @@ abstract class AbstractFetcherThread(name: String,
// Only truncate log when current leader's log start offset is greater than follower's log end offset.
if (leaderStartOffset > replicaEndOffset)
truncateFullyAndStartAt(topicPartition, leaderStartOffset)
offsetToFetch
val initialLag = leaderEndOffset - offsetToFetch
fetcherLagStats.getAndMaybePut(topicPartition).lag = initialLag
PartitionFetchState(offsetToFetch, Some(initialLag), currentLeaderEpoch, state = Fetching)
}
}
@ -613,7 +617,7 @@ abstract class AbstractFetcherThread(name: String,
Option(partitionStates.stateValue(partition)).foreach { currentFetchState =>
if (!currentFetchState.isDelayed) {
partitionStates.updateAndMoveToEnd(partition, PartitionFetchState(currentFetchState.fetchOffset,
currentFetchState.currentLeaderEpoch, new DelayedItem(delay), currentFetchState.state))
currentFetchState.lag, currentFetchState.currentLeaderEpoch, Some(new DelayedItem(delay)), currentFetchState.state))
}
}
}
@ -665,6 +669,7 @@ abstract class AbstractFetcherThread(name: String,
object AbstractFetcherThread {
case class ReplicaFetch(partitionData: util.Map[TopicPartition, FetchRequest.PartitionData], fetchRequest: FetchRequest.Builder)
case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition])
}
@ -702,29 +707,21 @@ class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends KafkaMetricsGr
}
class FetcherLagStats(metricId: ClientIdAndBroker) {
private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k)
val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
private val valueFactory = (k: TopicPartition) => new FetcherLagMetrics(ClientIdTopicPartition(metricId.clientId, k))
val stats = new Pool[TopicPartition, FetcherLagMetrics](Some(valueFactory))
def getAndMaybePut(topicPartition: TopicPartition): FetcherLagMetrics = {
stats.getAndMaybePut(ClientIdTopicPartition(metricId.clientId, topicPartition))
}
def isReplicaInSync(topicPartition: TopicPartition): Boolean = {
val fetcherLagMetrics = stats.get(ClientIdTopicPartition(metricId.clientId, topicPartition))
if (fetcherLagMetrics != null)
fetcherLagMetrics.lag <= 0
else
false
stats.getAndMaybePut(topicPartition)
}
def unregister(topicPartition: TopicPartition): Unit = {
val lagMetrics = stats.remove(ClientIdTopicPartition(metricId.clientId, topicPartition))
val lagMetrics = stats.remove(topicPartition)
if (lagMetrics != null) lagMetrics.unregister()
}
def unregister(): Unit = {
stats.keys.toBuffer.foreach { key: ClientIdTopicPartition =>
unregister(key.topicPartition)
stats.keys.toBuffer.foreach { key: TopicPartition =>
unregister(key)
}
}
}
@ -754,8 +751,8 @@ case object Truncating extends ReplicaState
case object Fetching extends ReplicaState
object PartitionFetchState {
def apply(offset: Long, currentLeaderEpoch: Int, state: ReplicaState): PartitionFetchState = {
PartitionFetchState(offset, currentLeaderEpoch, new DelayedItem(0), state)
def apply(offset: Long, lag: Option[Long], currentLeaderEpoch: Int, state: ReplicaState): PartitionFetchState = {
PartitionFetchState(offset, lag, currentLeaderEpoch, None, state)
}
}
@ -768,21 +765,25 @@ object PartitionFetchState {
* (3) ReadyForFetch, the is the active state where the thread is actively fetching data.
*/
case class PartitionFetchState(fetchOffset: Long,
lag: Option[Long],
currentLeaderEpoch: Int,
delay: DelayedItem,
delay: Option[DelayedItem],
state: ReplicaState) {
def isReadyForFetch: Boolean = state == Fetching && !isDelayed
def isReplicaInSync: Boolean = lag.isDefined && lag.get <= 0
def isTruncating: Boolean = state == Truncating && !isDelayed
def isDelayed: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) > 0
def isDelayed: Boolean = delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0)
override def toString: String = {
s"FetchState(fetchOffset=$fetchOffset" +
s", currentLeaderEpoch=$currentLeaderEpoch" +
s", state=$state" +
s", delay=${delay.delayMs}ms" +
s", lag=$lag" +
s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" +
s")"
}
}

View File

@ -23,6 +23,7 @@ import java.util.Optional
import kafka.api.Request
import kafka.cluster.BrokerEndPoint
import kafka.log.LogAppendInfo
import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.QuotaFactory.UnboundedQuota
import org.apache.kafka.common.TopicPartition
@ -34,7 +35,7 @@ import org.apache.kafka.common.requests.FetchResponse.PartitionData
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
import scala.collection.JavaConverters._
import scala.collection.{Map, Seq, Set, mutable}
import scala.collection.{mutable, Map, Seq, Set}
class ReplicaAlterLogDirsThread(name: String,
sourceBroker: BrokerEndPoint,
@ -59,6 +60,10 @@ class ReplicaAlterLogDirsThread(name: String,
replicaMgr.futureLocalLogOrException(topicPartition).latestEpoch
}
override protected def logStartOffset(topicPartition: TopicPartition): Long = {
replicaMgr.futureLocalLogOrException(topicPartition).logStartOffset
}
override protected def logEndOffset(topicPartition: TopicPartition): Long = {
replicaMgr.futureLocalLogOrException(topicPartition).logEndOffset
}
@ -67,7 +72,7 @@ class ReplicaAlterLogDirsThread(name: String,
replicaMgr.futureLocalLogOrException(topicPartition).endOffsetForEpoch(epoch)
}
def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = {
def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = {
var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData[Records])] = null
val request = fetchRequest.build()
@ -95,7 +100,7 @@ class ReplicaAlterLogDirsThread(name: String,
if (partitionData == null)
throw new IllegalStateException(s"Failed to fetch data for partitions ${request.fetchData.keySet().toArray.mkString(",")}")
partitionData
partitionData.toMap
}
// process fetched data
@ -218,7 +223,7 @@ class ReplicaAlterLogDirsThread(name: String,
nextPartitionOpt
}
private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[FetchRequest.Builder]] = {
private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[ReplicaFetch]] = {
val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
val partitionsWithError = mutable.Set[TopicPartition]()
@ -237,14 +242,14 @@ class ReplicaAlterLogDirsThread(name: String,
} else {
// Set maxWait and minBytes to 0 because the response should return immediately if
// the future log has caught up with the current log of the partition
Some(FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap)
.setMaxBytes(maxBytes))
val requestBuilder = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap).setMaxBytes(maxBytes)
Some(ReplicaFetch(requestMap, requestBuilder))
}
ResultWithPartitions(fetchRequestOpt, partitionsWithError)
}
def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
// Only include replica in the fetch request if it is not throttled.
if (quota.isQuotaExceeded) {
ResultWithPartitions(None, Set.empty)

View File

@ -22,6 +22,7 @@ import java.util.Optional
import kafka.api._
import kafka.cluster.BrokerEndPoint
import kafka.log.LogAppendInfo
import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.TopicPartition
@ -34,7 +35,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{LogContext, Time}
import scala.collection.JavaConverters._
import scala.collection.{Map, mutable}
import scala.collection.{mutable, Map}
class ReplicaFetcherThread(name: String,
fetcherId: Int,
@ -96,12 +97,16 @@ class ReplicaFetcherThread(name: String,
private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
private val fetchSize = brokerConfig.replicaFetchMaxBytes
private val brokerSupportsLeaderEpochRequest = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
private val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
replicaMgr.localLogOrException(topicPartition).latestEpoch
}
override protected def logStartOffset(topicPartition: TopicPartition): Long = {
replicaMgr.localLogOrException(topicPartition).logStartOffset
}
override protected def logEndOffset(topicPartition: TopicPartition): Long = {
replicaMgr.localLogOrException(topicPartition).logEndOffset
}
@ -191,14 +196,14 @@ class ReplicaFetcherThread(name: String,
}
override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = {
override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = {
try {
val clientResponse = leaderEndpoint.sendRequest(fetchRequest)
val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse[Records]]
if (!fetchSessionHandler.handleResponse(fetchResponse)) {
Nil
Map.empty
} else {
fetchResponse.responseData.asScala.toSeq
fetchResponse.responseData.asScala
}
} catch {
case t: Throwable =>
@ -236,15 +241,15 @@ class ReplicaFetcherThread(name: String,
}
}
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
val partitionsWithError = mutable.Set[TopicPartition]()
val builder = fetchSessionHandler.newBuilder()
val builder = fetchSessionHandler.newBuilder(partitionMap.size, false)
partitionMap.foreach { case (topicPartition, fetchState) =>
// We will not include a replica in the fetch request if it should be throttled.
if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, topicPartition)) {
if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, fetchState, topicPartition)) {
try {
val logStartOffset = replicaMgr.localLogOrException(topicPartition).logStartOffset
val logStartOffset = this.logStartOffset(topicPartition)
builder.add(topicPartition, new FetchRequest.PartitionData(
fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch)))
} catch {
@ -265,7 +270,7 @@ class ReplicaFetcherThread(name: String,
.setMaxBytes(maxBytes)
.toForget(fetchData.toForget)
.metadata(fetchData.metadata)
Some(requestBuilder)
Some(ReplicaFetch(fetchData.sessionPartitions(), requestBuilder))
}
ResultWithPartitions(fetchRequestOpt, partitionsWithError)
@ -330,9 +335,8 @@ class ReplicaFetcherThread(name: String,
* To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
* the quota is exceeded and the replica is not in sync.
*/
private def shouldFollowerThrottle(quota: ReplicaQuota, topicPartition: TopicPartition): Boolean = {
val isReplicaInSync = fetcherLagStats.isReplicaInSync(topicPartition)
!isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = {
!fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
}
}

View File

@ -59,7 +59,7 @@ class AbstractFetcherManagerTest {
EasyMock.expect(fetcher.start())
EasyMock.expect(fetcher.addPartitions(Map(tp -> OffsetAndEpoch(fetchOffset, leaderEpoch))))
EasyMock.expect(fetcher.fetchState(tp))
.andReturn(Some(PartitionFetchState(fetchOffset, leaderEpoch, Truncating)))
.andReturn(Some(PartitionFetchState(fetchOffset, None, leaderEpoch, Truncating)))
EasyMock.expect(fetcher.removePartitions(Set(tp)))
EasyMock.expect(fetcher.fetchState(tp)).andReturn(None)
EasyMock.replay(fetcher)

View File

@ -25,6 +25,7 @@ import com.yammer.metrics.Metrics
import kafka.cluster.BrokerEndPoint
import kafka.log.LogAppendInfo
import kafka.message.NoCompressionCodec
import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.utils.TestUtils
import org.apache.kafka.common.KafkaException
@ -38,7 +39,7 @@ import org.junit.Assert._
import org.junit.{Before, Test}
import scala.collection.JavaConverters._
import scala.collection.{Map, Set, mutable}
import scala.collection.{mutable, Map, Set}
import scala.util.Random
import org.scalatest.Assertions.assertThrows
@ -575,7 +576,7 @@ class AbstractFetcherThreadTest {
val fetcher = new MockFetcherThread {
var fetchedOnce = false
override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = {
override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = {
val fetchedData = super.fetchFromLeader(fetchRequest)
if (!fetchedOnce) {
val records = fetchedData.head._2.records.asInstanceOf[MemoryRecords]
@ -901,7 +902,7 @@ class AbstractFetcherThreadTest {
state.highWatermark = offset
}
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
val fetchData = mutable.Map.empty[TopicPartition, FetchRequest.PartitionData]
partitionMap.foreach { case (partition, state) =>
if (state.isReadyForFetch) {
@ -911,7 +912,7 @@ class AbstractFetcherThreadTest {
}
}
val fetchRequest = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 1, fetchData.asJava)
ResultWithPartitions(Some(fetchRequest), Set.empty)
ResultWithPartitions(Some(ReplicaFetch(fetchData.asJava, fetchRequest)), Set.empty)
}
override def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
@ -919,6 +920,8 @@ class AbstractFetcherThreadTest {
state.log.lastOption.map(_.partitionLeaderEpoch).orElse(Some(EpochEndOffset.UNDEFINED_EPOCH))
}
override def logStartOffset(topicPartition: TopicPartition): Long = replicaPartitionState(topicPartition).logStartOffset
override def logEndOffset(topicPartition: TopicPartition): Long = replicaPartitionState(topicPartition).logEndOffset
override def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = {
@ -973,7 +976,7 @@ class AbstractFetcherThreadTest {
override protected def isOffsetForLeaderEpochSupported: Boolean = true
override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = {
override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = {
fetchRequest.fetchData.asScala.map { case (partition, fetchData) =>
val leaderState = leaderPartitionState(partition)
val epochCheckError = checkExpectedLeaderEpoch(fetchData.currentLeaderEpoch, leaderState)
@ -1000,7 +1003,7 @@ class AbstractFetcherThreadTest {
(partition, new FetchData(error, leaderState.highWatermark, leaderState.highWatermark, leaderState.logStartOffset,
List.empty.asJava, records))
}.toSeq
}.toMap
}
private def checkLeaderEpochAndThrow(expectedEpoch: Int, partitionState: PartitionState): Unit = {

View File

@ -20,6 +20,7 @@ import java.util.Optional
import kafka.cluster.{BrokerEndPoint, Partition}
import kafka.log.{Log, LogManager}
import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.utils.{DelayedItem, TestUtils}
import org.apache.kafka.common.TopicPartition
@ -520,11 +521,11 @@ class ReplicaAlterLogDirsThreadTest {
t1p1 -> offsetAndEpoch(0L, leaderEpoch)))
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = thread.buildFetch(Map(
t1p0 -> PartitionFetchState(150, leaderEpoch, state = Fetching),
t1p1 -> PartitionFetchState(160, leaderEpoch, state = Fetching)))
t1p0 -> PartitionFetchState(150, None, leaderEpoch, None, state = Fetching),
t1p1 -> PartitionFetchState(160, None, leaderEpoch, None, state = Fetching)))
assertTrue(fetchRequestOpt.isDefined)
val fetchRequest = fetchRequestOpt.get
val fetchRequest = fetchRequestOpt.get.fetchRequest
assertFalse(fetchRequest.fetchData.isEmpty)
assertFalse(partitionsWithError.nonEmpty)
val request = fetchRequest.build()
@ -572,36 +573,36 @@ class ReplicaAlterLogDirsThreadTest {
// one partition is ready and one is truncating
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = thread.buildFetch(Map(
t1p0 -> PartitionFetchState(150, leaderEpoch, state = Fetching),
t1p1 -> PartitionFetchState(160, leaderEpoch, state = Truncating)))
t1p0 -> PartitionFetchState(150, None, leaderEpoch, state = Fetching),
t1p1 -> PartitionFetchState(160, None, leaderEpoch, state = Truncating)))
assertTrue(fetchRequestOpt.isDefined)
val fetchRequest = fetchRequestOpt.get
assertFalse(fetchRequest.fetchData.isEmpty)
assertFalse(fetchRequest.partitionData.isEmpty)
assertFalse(partitionsWithError.nonEmpty)
val fetchInfos = fetchRequest.build().fetchData.asScala.toSeq
val fetchInfos = fetchRequest.fetchRequest.build().fetchData.asScala.toSeq
assertEquals(1, fetchInfos.length)
assertEquals("Expected fetch request for non-truncating partition", t1p0, fetchInfos.head._1)
assertEquals(150, fetchInfos.head._2.fetchOffset)
// one partition is ready and one is delayed
val ResultWithPartitions(fetchRequest2Opt, partitionsWithError2) = thread.buildFetch(Map(
t1p0 -> PartitionFetchState(140, leaderEpoch, state = Fetching),
t1p1 -> PartitionFetchState(160, leaderEpoch, delay = new DelayedItem(5000), state = Fetching)))
t1p0 -> PartitionFetchState(140, None, leaderEpoch, state = Fetching),
t1p1 -> PartitionFetchState(160, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching)))
assertTrue(fetchRequest2Opt.isDefined)
val fetchRequest2 = fetchRequest2Opt.get
assertFalse(fetchRequest2.fetchData.isEmpty)
assertFalse(fetchRequest2.partitionData.isEmpty)
assertFalse(partitionsWithError2.nonEmpty)
val fetchInfos2 = fetchRequest2.build().fetchData.asScala.toSeq
val fetchInfos2 = fetchRequest2.fetchRequest.build().fetchData.asScala.toSeq
assertEquals(1, fetchInfos2.length)
assertEquals("Expected fetch request for non-delayed partition", t1p0, fetchInfos2.head._1)
assertEquals(140, fetchInfos2.head._2.fetchOffset)
// both partitions are delayed
val ResultWithPartitions(fetchRequest3Opt, partitionsWithError3) = thread.buildFetch(Map(
t1p0 -> PartitionFetchState(140, leaderEpoch, delay = new DelayedItem(5000), state = Fetching),
t1p1 -> PartitionFetchState(160, leaderEpoch, delay = new DelayedItem(5000), state = Fetching)))
t1p0 -> PartitionFetchState(140, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching),
t1p1 -> PartitionFetchState(160, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching)))
assertTrue("Expected no fetch requests since all partitions are delayed", fetchRequest3Opt.isEmpty)
assertFalse(partitionsWithError3.nonEmpty)
}

View File

@ -208,6 +208,8 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Package name="org.apache.kafka.jmh.record.generated"/>
<Package name="org.apache.kafka.jmh.producer.generated"/>
<Package name="org.apache.kafka.jmh.partition.generated"/>
<Package name="org.apache.kafka.jmh.fetchsession.generated"/>
<Package name="org.apache.kafka.jmh.fetcher.generated"/>
</Or>
</Match>

View File

@ -0,0 +1,312 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.fetcher;
import kafka.api.ApiVersion$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.DelayedOperations;
import kafka.cluster.Partition;
import kafka.cluster.PartitionStateStore;
import kafka.log.CleanerConfig;
import kafka.log.Defaults;
import kafka.log.LogAppendInfo;
import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.server.BrokerState;
import kafka.server.BrokerTopicStats;
import kafka.server.FailedPartitions;
import kafka.server.KafkaConfig;
import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache;
import kafka.server.OffsetAndEpoch;
import kafka.server.OffsetTruncationState;
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaQuota;
import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.utils.KafkaScheduler;
import kafka.utils.Pool;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.RecordsSend;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import scala.Option;
import scala.collection.Iterator;
import scala.collection.JavaConverters;
import scala.compat.java8.OptionConverters;
import scala.collection.Map;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 15)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class ReplicaFetcherThreadBenchmark {
@Param({"100", "500", "1000", "5000"})
private int partitionCount;
private ReplicaFetcherBenchThread fetcher;
private LogManager logManager;
private File logDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
private KafkaScheduler scheduler = new KafkaScheduler(1, "scheduler", true);
private Pool<TopicPartition, Partition> pool = new Pool<TopicPartition, Partition>(Option.empty());
@Setup(Level.Trial)
public void setup() throws IOException {
if (!logDir.mkdir())
throw new IOException("error creating test directory");
scheduler.startup();
Properties props = new Properties();
props.put("zookeeper.connect", "127.0.0.1:9999");
KafkaConfig config = new KafkaConfig(props);
LogConfig logConfig = createLogConfig();
List<File> logDirs = Collections.singletonList(logDir);
BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
logManager = new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(),
JavaConverters.asScalaIteratorConverter(new ArrayList<File>().iterator()).asScala().toSeq(),
new scala.collection.mutable.HashMap<>(),
logConfig,
new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5"),
1,
1000L,
10000L,
10000L,
1000L,
60000,
scheduler,
new BrokerState(),
brokerTopicStats,
logDirFailureChannel,
Time.SYSTEM);
LinkedHashMap<TopicPartition, FetchResponse.PartitionData<BaseRecords>> initialFetched = new LinkedHashMap<>();
scala.collection.mutable.Map<TopicPartition, OffsetAndEpoch> offsetAndEpochs = new scala.collection.mutable.HashMap<>();
for (int i = 0; i < partitionCount; i++) {
TopicPartition tp = new TopicPartition("topic", i);
List<Integer> replicas = Arrays.asList(0, 1, 2);
LeaderAndIsrRequestData.LeaderAndIsrPartitionState partitionState = new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(replicas)
.setZkVersion(1)
.setReplicas(replicas)
.setIsNew(true);
PartitionStateStore partitionStateStore = Mockito.mock(PartitionStateStore.class);
Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new Properties());
OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class);
Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Option.apply(0L));
Partition partition = new Partition(tp, 100, ApiVersion$.MODULE$.latestVersion(),
0, Time.SYSTEM, partitionStateStore, new DelayedOperationsMock(tp),
Mockito.mock(MetadataCache.class), logManager);
partition.makeFollower(0, partitionState, 0, offsetCheckpoints);
pool.put(tp, partition);
offsetAndEpochs.put(tp, new OffsetAndEpoch(0, 0));
BaseRecords fetched = new BaseRecords() {
@Override
public int sizeInBytes() {
return 0;
}
@Override
public RecordsSend toSend(String destination) {
return null;
}
};
initialFetched.put(tp, new FetchResponse.PartitionData<>(Errors.NONE, 0, 0, 0,
new LinkedList<>(), fetched));
}
fetcher = new ReplicaFetcherBenchThread(config, pool);
fetcher.addPartitions(offsetAndEpochs);
// force a pass to move partitions to fetching state. We do this in the setup phase
// so that we do not measure this time as part of the steady state work
fetcher.doWork();
// handle response to engage the incremental fetch session handler
fetcher.fetchSessionHandler().handleResponse(new FetchResponse<>(Errors.NONE, initialFetched, 0, 999));
}
@TearDown(Level.Trial)
public void tearDown() throws IOException {
logManager.shutdown();
scheduler.shutdown();
Utils.delete(logDir);
}
@Benchmark
public long testFetcher() {
fetcher.doWork();
return fetcher.fetcherStats().requestRate().count();
}
// avoid mocked DelayedOperations to avoid mocked class affecting benchmark results
private static class DelayedOperationsMock extends DelayedOperations {
DelayedOperationsMock(TopicPartition topicPartition) {
super(topicPartition, null, null, null);
}
@Override
public int numDelayedDelete() {
return 0;
}
}
private static LogConfig createLogConfig() {
Properties logProps = new Properties();
logProps.put(LogConfig.SegmentMsProp(), Defaults.SegmentMs());
logProps.put(LogConfig.SegmentBytesProp(), Defaults.SegmentSize());
logProps.put(LogConfig.RetentionMsProp(), Defaults.RetentionMs());
logProps.put(LogConfig.RetentionBytesProp(), Defaults.RetentionSize());
logProps.put(LogConfig.SegmentJitterMsProp(), Defaults.SegmentJitterMs());
logProps.put(LogConfig.CleanupPolicyProp(), Defaults.CleanupPolicy());
logProps.put(LogConfig.MaxMessageBytesProp(), Defaults.MaxMessageSize());
logProps.put(LogConfig.IndexIntervalBytesProp(), Defaults.IndexInterval());
logProps.put(LogConfig.SegmentIndexBytesProp(), Defaults.MaxIndexSize());
logProps.put(LogConfig.MessageFormatVersionProp(), Defaults.MessageFormatVersion());
logProps.put(LogConfig.FileDeleteDelayMsProp(), Defaults.FileDeleteDelayMs());
return LogConfig.apply(logProps, new scala.collection.immutable.HashSet<>());
}
static class ReplicaFetcherBenchThread extends ReplicaFetcherThread {
private final Pool<TopicPartition, Partition> pool;
ReplicaFetcherBenchThread(KafkaConfig config, Pool<TopicPartition, Partition> partitions) {
super("name",
3,
new BrokerEndPoint(3, "host", 3000),
config,
new FailedPartitions(),
null,
new Metrics(),
Time.SYSTEM,
new ReplicaQuota() {
@Override
public boolean isQuotaExceeded() {
return false;
}
@Override
public void record(long value) {
}
@Override
public boolean isThrottled(TopicPartition topicPartition) {
return false;
}
},
Option.empty());
pool = partitions;
}
@Override
public Option<Object> latestEpoch(TopicPartition topicPartition) {
return Option.apply(0);
}
@Override
public long logStartOffset(TopicPartition topicPartition) {
return pool.get(topicPartition).localLogOrException().logStartOffset();
}
@Override
public long logEndOffset(TopicPartition topicPartition) {
return 0;
}
@Override
public void truncate(TopicPartition tp, OffsetTruncationState offsetTruncationState) {
// pretend to truncate to move to Fetching state
}
@Override
public Option<OffsetAndEpoch> endOffsetForEpoch(TopicPartition topicPartition, int epoch) {
return OptionConverters.toScala(Optional.of(new OffsetAndEpoch(0, 0)));
}
@Override
public Option<LogAppendInfo> processPartitionData(TopicPartition topicPartition, long fetchOffset, FetchResponse.PartitionData partitionData) {
return Option.empty();
}
@Override
public long fetchEarliestOffsetFromLeader(TopicPartition topicPartition, int currentLeaderEpoch) {
return 0;
}
@Override
public Map<TopicPartition, EpochEndOffset> fetchEpochEndOffsets(Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> partitions) {
scala.collection.mutable.Map<TopicPartition, EpochEndOffset> endOffsets = new scala.collection.mutable.HashMap<>();
Iterator<TopicPartition> iterator = partitions.keys().iterator();
while (iterator.hasNext()) {
endOffsets.put(iterator.next(), new EpochEndOffset(0, 100));
}
return endOffsets;
}
@Override
public Map<TopicPartition, FetchResponse.PartitionData<Records>> fetchFromLeader(FetchRequest.Builder fetchRequest) {
return new scala.collection.mutable.HashMap<>();
}
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.fetchsession;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.LogContext;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 10)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class FetchSessionBenchmark {
private static final LogContext LOG_CONTEXT = new LogContext("[BenchFetchSessionHandler]=");
@Param(value = {"10", "100", "1000"})
private int partitionCount;
@Param(value = {"0", "10", "100", "1000"})
private int updatedPercentage;
@Param(value = {"false", "true"})
private boolean presize;
private LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetches;
private FetchSessionHandler handler;
@Setup(Level.Trial)
public void setUp() {
fetches = new LinkedHashMap<>();
handler = new FetchSessionHandler(LOG_CONTEXT, 1);
FetchSessionHandler.Builder builder = handler.newBuilder();
LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> respMap = new LinkedHashMap<>();
for (int i = 0; i < partitionCount; i++) {
TopicPartition tp = new TopicPartition("foo", i);
FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(0, 0, 200,
Optional.empty());
fetches.put(tp, partitionData);
builder.add(tp, partitionData);
respMap.put(tp, new FetchResponse.PartitionData<>(
Errors.NONE,
0L,
0L,
0,
null,
null));
}
builder.build();
// build and handle an initial response so that the next fetch will be incremental
handler.handleResponse(new FetchResponse<>(Errors.NONE, respMap, 0, 1));
int counter = 0;
for (TopicPartition topicPartition: new ArrayList<>(fetches.keySet())) {
if (updatedPercentage != 0 && counter % (100 / updatedPercentage) == 0) {
// reorder in fetch session, and update log start offset
fetches.remove(topicPartition);
fetches.put(topicPartition, new FetchRequest.PartitionData(50, 40, 200,
Optional.empty()));
}
counter++;
}
}
@Benchmark
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public void incrementalFetchSessionBuild() {
FetchSessionHandler.Builder builder;
if (presize)
builder = handler.newBuilder(fetches.size(), true);
else
builder = handler.newBuilder();
for (Map.Entry<TopicPartition, FetchRequest.PartitionData> entry: fetches.entrySet()) {
builder.add(entry.getKey(), entry.getValue());
}
builder.build();
}
}