Avoid copying partition states to maintain fetch offsets

We already have the partition fetch data in the fetch session, so a copy
is not required.
This commit is contained in:
Lucas Bradstreet 2019-10-01 15:41:36 -07:00
parent 854588804d
commit 29fdd6094b
6 changed files with 42 additions and 37 deletions

View File

@ -101,7 +101,7 @@ public class PartitionStates<S> {
}
public LinkedHashMap<TopicPartition, S> partitionStateMap() {
return new LinkedHashMap<>(map);
return map;
}
/**

View File

@ -18,6 +18,7 @@
package kafka.server
import java.nio.ByteBuffer
import java.util
import java.util.Optional
import java.util.concurrent.locks.ReentrantLock
@ -29,16 +30,17 @@ import kafka.common.ClientIdAndBroker
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.CoreUtils.inLock
import org.apache.kafka.common.protocol.Errors
import AbstractFetcherThread._
import scala.collection.{mutable, Map, Seq, Set}
import scala.collection.JavaConverters._
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.function.Consumer
import java.util.function.BiConsumer
import com.yammer.metrics.core.Gauge
import kafka.log.LogAppendInfo
import kafka.server.AbstractFetcherThread.Fetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
import org.apache.kafka.common.internals.PartitionStates
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
@ -79,7 +81,8 @@ abstract class AbstractFetcherThread(name: String,
protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit
protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]]
protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]]
protected def latestEpoch(topicPartition: TopicPartition): Option[Int]
@ -117,9 +120,8 @@ abstract class AbstractFetcherThread(name: String,
}
private def maybeFetch(): Unit = {
val (fetchStates, fetchRequestOpt) = inLock(partitionMapLock) {
val fetchStates = partitionStates.partitionStateMap.asScala
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates)
val fetchRequestOpt = inLock(partitionMapLock) {
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(partitionStates.partitionStateMap.asScala)
handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
@ -128,11 +130,11 @@ abstract class AbstractFetcherThread(name: String,
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}
(fetchStates, fetchRequestOpt)
fetchRequestOpt
}
fetchRequestOpt.foreach { fetchRequest =>
processFetchRequest(fetchStates, fetchRequest)
fetchRequestOpt.foreach { case Fetch(sessionPartitions, fetchRequest) =>
processFetchRequest(sessionPartitions, fetchRequest)
}
}
@ -152,13 +154,12 @@ abstract class AbstractFetcherThread(name: String,
val partitionsWithEpochs = mutable.Map.empty[TopicPartition, EpochData]
val partitionsWithoutEpochs = mutable.Set.empty[TopicPartition]
partitionStates.stream().forEach(new Consumer[PartitionStates.PartitionState[PartitionFetchState]] {
override def accept(state: PartitionStates.PartitionState[PartitionFetchState]): Unit = {
if (state.value.isTruncating) {
val tp = state.topicPartition
partitionStates.partitionStateMap.forEach(new BiConsumer[TopicPartition, PartitionFetchState] {
override def accept(tp: TopicPartition, state: PartitionFetchState): Unit = {
if (state.isTruncating) {
latestEpoch(tp) match {
case Some(epoch) if isOffsetForLeaderEpochSupported =>
partitionsWithEpochs += tp -> new EpochData(Optional.of(state.value.currentLeaderEpoch), epoch)
partitionsWithEpochs += tp -> new EpochData(Optional.of(state.currentLeaderEpoch), epoch)
case _ =>
partitionsWithoutEpochs += tp
}
@ -278,7 +279,7 @@ abstract class AbstractFetcherThread(name: String,
}
}
private def processFetchRequest(fetchStates: Map[TopicPartition, PartitionFetchState],
private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
fetchRequest: FetchRequest.Builder): Unit = {
val partitionsWithError = mutable.Set[TopicPartition]()
var responseData: Seq[(TopicPartition, FetchData)] = Seq.empty
@ -309,8 +310,8 @@ abstract class AbstractFetcherThread(name: String,
// It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
// In this case, we only want to process the fetch response if the partition state is ready for fetch and
// the current offset is the same as the offset requested.
val fetchState = fetchStates(topicPartition)
if (fetchState.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
val fetchPartitionData = sessionPartitions.get(topicPartition)
if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
partitionData.error match {
case Errors.NONE =>
try {
@ -326,8 +327,7 @@ abstract class AbstractFetcherThread(name: String,
// ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
if (validBytes > 0 && partitionStates.contains(topicPartition)) {
// Update partitionStates only if there is no exception during processPartitionData
val newFetchState = PartitionFetchState(nextOffset, fetchState.currentLeaderEpoch,
state = Fetching)
val newFetchState = PartitionFetchState(nextOffset, fetchPartitionData.currentLeaderEpoch.get(), state = Fetching)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
fetcherStats.byteRate.mark(validBytes)
}
@ -358,7 +358,7 @@ abstract class AbstractFetcherThread(name: String,
case Errors.UNKNOWN_LEADER_EPOCH =>
debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " +
s"this replica's current leader epoch of ${fetchState.currentLeaderEpoch}.")
s"this replica's current leader epoch of ${fetchPartitionData.currentLeaderEpoch}.")
partitionsWithError += topicPartition
case Errors.FENCED_LEADER_EPOCH =>
@ -667,6 +667,7 @@ abstract class AbstractFetcherThread(name: String,
object AbstractFetcherThread {
case class Fetch(partitionData: util.Map[TopicPartition, FetchRequest.PartitionData], fetchRequest: FetchRequest.Builder)
case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition])
}

View File

@ -23,6 +23,7 @@ import java.util.Optional
import kafka.api.Request
import kafka.cluster.BrokerEndPoint
import kafka.log.LogAppendInfo
import kafka.server.AbstractFetcherThread.Fetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.QuotaFactory.UnboundedQuota
import org.apache.kafka.common.TopicPartition
@ -34,7 +35,7 @@ import org.apache.kafka.common.requests.FetchResponse.PartitionData
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
import scala.collection.JavaConverters._
import scala.collection.{Map, Seq, Set, mutable}
import scala.collection.{mutable, Map, Seq, Set}
class ReplicaAlterLogDirsThread(name: String,
sourceBroker: BrokerEndPoint,
@ -222,7 +223,7 @@ class ReplicaAlterLogDirsThread(name: String,
nextPartitionOpt
}
private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[FetchRequest.Builder]] = {
private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[Fetch]] = {
val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
val partitionsWithError = mutable.Set[TopicPartition]()
@ -241,14 +242,14 @@ class ReplicaAlterLogDirsThread(name: String,
} else {
// Set maxWait and minBytes to 0 because the response should return immediately if
// the future log has caught up with the current log of the partition
Some(FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap)
.setMaxBytes(maxBytes))
val requestBuilder = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap).setMaxBytes(maxBytes)
Some(Fetch(requestMap, requestBuilder))
}
ResultWithPartitions(fetchRequestOpt, partitionsWithError)
}
def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]] = {
// Only include replica in the fetch request if it is not throttled.
if (quota.isQuotaExceeded) {
ResultWithPartitions(None, Set.empty)

View File

@ -22,6 +22,7 @@ import java.util.Optional
import kafka.api._
import kafka.cluster.BrokerEndPoint
import kafka.log.LogAppendInfo
import kafka.server.AbstractFetcherThread.Fetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.TopicPartition
@ -34,7 +35,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{LogContext, Time}
import scala.collection.JavaConverters._
import scala.collection.{Map, mutable}
import scala.collection.{mutable, Map}
class ReplicaFetcherThread(name: String,
fetcherId: Int,
@ -240,7 +241,7 @@ class ReplicaFetcherThread(name: String,
}
}
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]] = {
val partitionsWithError = mutable.Set[TopicPartition]()
val builder = fetchSessionHandler.newBuilder()
@ -269,7 +270,7 @@ class ReplicaFetcherThread(name: String,
.setMaxBytes(maxBytes)
.toForget(fetchData.toForget)
.metadata(fetchData.metadata)
Some(requestBuilder)
Some(Fetch(fetchData.sessionPartitions(), requestBuilder))
}
ResultWithPartitions(fetchRequestOpt, partitionsWithError)

View File

@ -25,6 +25,7 @@ import com.yammer.metrics.Metrics
import kafka.cluster.BrokerEndPoint
import kafka.log.LogAppendInfo
import kafka.message.NoCompressionCodec
import kafka.server.AbstractFetcherThread.Fetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.utils.TestUtils
import org.apache.kafka.common.KafkaException
@ -38,7 +39,7 @@ import org.junit.Assert._
import org.junit.{Before, Test}
import scala.collection.JavaConverters._
import scala.collection.{Map, Set, mutable}
import scala.collection.{mutable, Map, Set}
import scala.util.Random
import org.scalatest.Assertions.assertThrows
@ -901,7 +902,7 @@ class AbstractFetcherThreadTest {
state.highWatermark = offset
}
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]] = {
val fetchData = mutable.Map.empty[TopicPartition, FetchRequest.PartitionData]
partitionMap.foreach { case (partition, state) =>
if (state.isReadyForFetch) {
@ -911,7 +912,7 @@ class AbstractFetcherThreadTest {
}
}
val fetchRequest = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 1, fetchData.asJava)
ResultWithPartitions(Some(fetchRequest), Set.empty)
ResultWithPartitions(Some(Fetch(fetchData.asJava, fetchRequest)), Set.empty)
}
override def latestEpoch(topicPartition: TopicPartition): Option[Int] = {

View File

@ -20,6 +20,7 @@ import java.util.Optional
import kafka.cluster.{BrokerEndPoint, Partition}
import kafka.log.{Log, LogManager}
import kafka.server.AbstractFetcherThread.Fetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.utils.{DelayedItem, TestUtils}
import org.apache.kafka.common.TopicPartition
@ -524,7 +525,7 @@ class ReplicaAlterLogDirsThreadTest {
t1p1 -> PartitionFetchState(160, leaderEpoch, state = Fetching)))
assertTrue(fetchRequestOpt.isDefined)
val fetchRequest = fetchRequestOpt.get
val fetchRequest = fetchRequestOpt.get.fetchRequest
assertFalse(fetchRequest.fetchData.isEmpty)
assertFalse(partitionsWithError.nonEmpty)
val request = fetchRequest.build()
@ -577,9 +578,9 @@ class ReplicaAlterLogDirsThreadTest {
assertTrue(fetchRequestOpt.isDefined)
val fetchRequest = fetchRequestOpt.get
assertFalse(fetchRequest.fetchData.isEmpty)
assertFalse(fetchRequest.partitionData.isEmpty)
assertFalse(partitionsWithError.nonEmpty)
val fetchInfos = fetchRequest.build().fetchData.asScala.toSeq
val fetchInfos = fetchRequest.fetchRequest.build().fetchData.asScala.toSeq
assertEquals(1, fetchInfos.length)
assertEquals("Expected fetch request for non-truncating partition", t1p0, fetchInfos.head._1)
assertEquals(150, fetchInfos.head._2.fetchOffset)
@ -591,9 +592,9 @@ class ReplicaAlterLogDirsThreadTest {
assertTrue(fetchRequest2Opt.isDefined)
val fetchRequest2 = fetchRequest2Opt.get
assertFalse(fetchRequest2.fetchData.isEmpty)
assertFalse(fetchRequest2.partitionData.isEmpty)
assertFalse(partitionsWithError2.nonEmpty)
val fetchInfos2 = fetchRequest2.build().fetchData.asScala.toSeq
val fetchInfos2 = fetchRequest2.fetchRequest.build().fetchData.asScala.toSeq
assertEquals(1, fetchInfos2.length)
assertEquals("Expected fetch request for non-delayed partition", t1p0, fetchInfos2.head._1)
assertEquals(140, fetchInfos2.head._2.fetchOffset)