mirror of https://github.com/apache/kafka.git
Avoid copying partition states to maintain fetch offsets
We already have the partition fetch data in the fetch session, so a copy is not required.
This commit is contained in:
parent
854588804d
commit
29fdd6094b
|
@ -101,7 +101,7 @@ public class PartitionStates<S> {
|
|||
}
|
||||
|
||||
public LinkedHashMap<TopicPartition, S> partitionStateMap() {
|
||||
return new LinkedHashMap<>(map);
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package kafka.server
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
import java.util
|
||||
import java.util.Optional
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
|
||||
|
@ -29,16 +30,17 @@ import kafka.common.ClientIdAndBroker
|
|||
import kafka.metrics.KafkaMetricsGroup
|
||||
import kafka.utils.CoreUtils.inLock
|
||||
import org.apache.kafka.common.protocol.Errors
|
||||
import AbstractFetcherThread._
|
||||
|
||||
import scala.collection.{mutable, Map, Seq, Set}
|
||||
import scala.collection.JavaConverters._
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import java.util.function.Consumer
|
||||
import java.util.function.BiConsumer
|
||||
|
||||
import com.yammer.metrics.core.Gauge
|
||||
import kafka.log.LogAppendInfo
|
||||
import kafka.server.AbstractFetcherThread.Fetch
|
||||
import kafka.server.AbstractFetcherThread.ResultWithPartitions
|
||||
import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
|
||||
import org.apache.kafka.common.internals.PartitionStates
|
||||
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
|
||||
|
@ -79,7 +81,8 @@ abstract class AbstractFetcherThread(name: String,
|
|||
|
||||
protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit
|
||||
|
||||
protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]]
|
||||
protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]]
|
||||
|
||||
|
||||
protected def latestEpoch(topicPartition: TopicPartition): Option[Int]
|
||||
|
||||
|
@ -117,9 +120,8 @@ abstract class AbstractFetcherThread(name: String,
|
|||
}
|
||||
|
||||
private def maybeFetch(): Unit = {
|
||||
val (fetchStates, fetchRequestOpt) = inLock(partitionMapLock) {
|
||||
val fetchStates = partitionStates.partitionStateMap.asScala
|
||||
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates)
|
||||
val fetchRequestOpt = inLock(partitionMapLock) {
|
||||
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(partitionStates.partitionStateMap.asScala)
|
||||
|
||||
handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
|
||||
|
||||
|
@ -128,11 +130,11 @@ abstract class AbstractFetcherThread(name: String,
|
|||
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
|
||||
}
|
||||
|
||||
(fetchStates, fetchRequestOpt)
|
||||
fetchRequestOpt
|
||||
}
|
||||
|
||||
fetchRequestOpt.foreach { fetchRequest =>
|
||||
processFetchRequest(fetchStates, fetchRequest)
|
||||
fetchRequestOpt.foreach { case Fetch(sessionPartitions, fetchRequest) =>
|
||||
processFetchRequest(sessionPartitions, fetchRequest)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -152,13 +154,12 @@ abstract class AbstractFetcherThread(name: String,
|
|||
val partitionsWithEpochs = mutable.Map.empty[TopicPartition, EpochData]
|
||||
val partitionsWithoutEpochs = mutable.Set.empty[TopicPartition]
|
||||
|
||||
partitionStates.stream().forEach(new Consumer[PartitionStates.PartitionState[PartitionFetchState]] {
|
||||
override def accept(state: PartitionStates.PartitionState[PartitionFetchState]): Unit = {
|
||||
if (state.value.isTruncating) {
|
||||
val tp = state.topicPartition
|
||||
partitionStates.partitionStateMap.forEach(new BiConsumer[TopicPartition, PartitionFetchState] {
|
||||
override def accept(tp: TopicPartition, state: PartitionFetchState): Unit = {
|
||||
if (state.isTruncating) {
|
||||
latestEpoch(tp) match {
|
||||
case Some(epoch) if isOffsetForLeaderEpochSupported =>
|
||||
partitionsWithEpochs += tp -> new EpochData(Optional.of(state.value.currentLeaderEpoch), epoch)
|
||||
partitionsWithEpochs += tp -> new EpochData(Optional.of(state.currentLeaderEpoch), epoch)
|
||||
case _ =>
|
||||
partitionsWithoutEpochs += tp
|
||||
}
|
||||
|
@ -278,7 +279,7 @@ abstract class AbstractFetcherThread(name: String,
|
|||
}
|
||||
}
|
||||
|
||||
private def processFetchRequest(fetchStates: Map[TopicPartition, PartitionFetchState],
|
||||
private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
|
||||
fetchRequest: FetchRequest.Builder): Unit = {
|
||||
val partitionsWithError = mutable.Set[TopicPartition]()
|
||||
var responseData: Seq[(TopicPartition, FetchData)] = Seq.empty
|
||||
|
@ -309,8 +310,8 @@ abstract class AbstractFetcherThread(name: String,
|
|||
// It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
|
||||
// In this case, we only want to process the fetch response if the partition state is ready for fetch and
|
||||
// the current offset is the same as the offset requested.
|
||||
val fetchState = fetchStates(topicPartition)
|
||||
if (fetchState.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
|
||||
val fetchPartitionData = sessionPartitions.get(topicPartition)
|
||||
if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
|
||||
partitionData.error match {
|
||||
case Errors.NONE =>
|
||||
try {
|
||||
|
@ -326,8 +327,7 @@ abstract class AbstractFetcherThread(name: String,
|
|||
// ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
|
||||
if (validBytes > 0 && partitionStates.contains(topicPartition)) {
|
||||
// Update partitionStates only if there is no exception during processPartitionData
|
||||
val newFetchState = PartitionFetchState(nextOffset, fetchState.currentLeaderEpoch,
|
||||
state = Fetching)
|
||||
val newFetchState = PartitionFetchState(nextOffset, fetchPartitionData.currentLeaderEpoch.get(), state = Fetching)
|
||||
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
|
||||
fetcherStats.byteRate.mark(validBytes)
|
||||
}
|
||||
|
@ -358,7 +358,7 @@ abstract class AbstractFetcherThread(name: String,
|
|||
|
||||
case Errors.UNKNOWN_LEADER_EPOCH =>
|
||||
debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " +
|
||||
s"this replica's current leader epoch of ${fetchState.currentLeaderEpoch}.")
|
||||
s"this replica's current leader epoch of ${fetchPartitionData.currentLeaderEpoch}.")
|
||||
partitionsWithError += topicPartition
|
||||
|
||||
case Errors.FENCED_LEADER_EPOCH =>
|
||||
|
@ -667,6 +667,7 @@ abstract class AbstractFetcherThread(name: String,
|
|||
|
||||
object AbstractFetcherThread {
|
||||
|
||||
case class Fetch(partitionData: util.Map[TopicPartition, FetchRequest.PartitionData], fetchRequest: FetchRequest.Builder)
|
||||
case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition])
|
||||
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Optional
|
|||
import kafka.api.Request
|
||||
import kafka.cluster.BrokerEndPoint
|
||||
import kafka.log.LogAppendInfo
|
||||
import kafka.server.AbstractFetcherThread.Fetch
|
||||
import kafka.server.AbstractFetcherThread.ResultWithPartitions
|
||||
import kafka.server.QuotaFactory.UnboundedQuota
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
|
@ -34,7 +35,7 @@ import org.apache.kafka.common.requests.FetchResponse.PartitionData
|
|||
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.{Map, Seq, Set, mutable}
|
||||
import scala.collection.{mutable, Map, Seq, Set}
|
||||
|
||||
class ReplicaAlterLogDirsThread(name: String,
|
||||
sourceBroker: BrokerEndPoint,
|
||||
|
@ -222,7 +223,7 @@ class ReplicaAlterLogDirsThread(name: String,
|
|||
nextPartitionOpt
|
||||
}
|
||||
|
||||
private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[FetchRequest.Builder]] = {
|
||||
private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[Fetch]] = {
|
||||
val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
|
||||
val partitionsWithError = mutable.Set[TopicPartition]()
|
||||
|
||||
|
@ -241,14 +242,14 @@ class ReplicaAlterLogDirsThread(name: String,
|
|||
} else {
|
||||
// Set maxWait and minBytes to 0 because the response should return immediately if
|
||||
// the future log has caught up with the current log of the partition
|
||||
Some(FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap)
|
||||
.setMaxBytes(maxBytes))
|
||||
val requestBuilder = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap).setMaxBytes(maxBytes)
|
||||
Some(Fetch(requestMap, requestBuilder))
|
||||
}
|
||||
|
||||
ResultWithPartitions(fetchRequestOpt, partitionsWithError)
|
||||
}
|
||||
|
||||
def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
|
||||
def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]] = {
|
||||
// Only include replica in the fetch request if it is not throttled.
|
||||
if (quota.isQuotaExceeded) {
|
||||
ResultWithPartitions(None, Set.empty)
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.Optional
|
|||
import kafka.api._
|
||||
import kafka.cluster.BrokerEndPoint
|
||||
import kafka.log.LogAppendInfo
|
||||
import kafka.server.AbstractFetcherThread.Fetch
|
||||
import kafka.server.AbstractFetcherThread.ResultWithPartitions
|
||||
import org.apache.kafka.clients.FetchSessionHandler
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
|
@ -34,7 +35,7 @@ import org.apache.kafka.common.requests._
|
|||
import org.apache.kafka.common.utils.{LogContext, Time}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.{Map, mutable}
|
||||
import scala.collection.{mutable, Map}
|
||||
|
||||
class ReplicaFetcherThread(name: String,
|
||||
fetcherId: Int,
|
||||
|
@ -240,7 +241,7 @@ class ReplicaFetcherThread(name: String,
|
|||
}
|
||||
}
|
||||
|
||||
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
|
||||
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]] = {
|
||||
val partitionsWithError = mutable.Set[TopicPartition]()
|
||||
|
||||
val builder = fetchSessionHandler.newBuilder()
|
||||
|
@ -269,7 +270,7 @@ class ReplicaFetcherThread(name: String,
|
|||
.setMaxBytes(maxBytes)
|
||||
.toForget(fetchData.toForget)
|
||||
.metadata(fetchData.metadata)
|
||||
Some(requestBuilder)
|
||||
Some(Fetch(fetchData.sessionPartitions(), requestBuilder))
|
||||
}
|
||||
|
||||
ResultWithPartitions(fetchRequestOpt, partitionsWithError)
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.yammer.metrics.Metrics
|
|||
import kafka.cluster.BrokerEndPoint
|
||||
import kafka.log.LogAppendInfo
|
||||
import kafka.message.NoCompressionCodec
|
||||
import kafka.server.AbstractFetcherThread.Fetch
|
||||
import kafka.server.AbstractFetcherThread.ResultWithPartitions
|
||||
import kafka.utils.TestUtils
|
||||
import org.apache.kafka.common.KafkaException
|
||||
|
@ -38,7 +39,7 @@ import org.junit.Assert._
|
|||
import org.junit.{Before, Test}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.{Map, Set, mutable}
|
||||
import scala.collection.{mutable, Map, Set}
|
||||
import scala.util.Random
|
||||
import org.scalatest.Assertions.assertThrows
|
||||
|
||||
|
@ -901,7 +902,7 @@ class AbstractFetcherThreadTest {
|
|||
state.highWatermark = offset
|
||||
}
|
||||
|
||||
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
|
||||
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]] = {
|
||||
val fetchData = mutable.Map.empty[TopicPartition, FetchRequest.PartitionData]
|
||||
partitionMap.foreach { case (partition, state) =>
|
||||
if (state.isReadyForFetch) {
|
||||
|
@ -911,7 +912,7 @@ class AbstractFetcherThreadTest {
|
|||
}
|
||||
}
|
||||
val fetchRequest = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 1, fetchData.asJava)
|
||||
ResultWithPartitions(Some(fetchRequest), Set.empty)
|
||||
ResultWithPartitions(Some(Fetch(fetchData.asJava, fetchRequest)), Set.empty)
|
||||
}
|
||||
|
||||
override def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.util.Optional
|
|||
|
||||
import kafka.cluster.{BrokerEndPoint, Partition}
|
||||
import kafka.log.{Log, LogManager}
|
||||
import kafka.server.AbstractFetcherThread.Fetch
|
||||
import kafka.server.AbstractFetcherThread.ResultWithPartitions
|
||||
import kafka.utils.{DelayedItem, TestUtils}
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
|
@ -524,7 +525,7 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
t1p1 -> PartitionFetchState(160, leaderEpoch, state = Fetching)))
|
||||
|
||||
assertTrue(fetchRequestOpt.isDefined)
|
||||
val fetchRequest = fetchRequestOpt.get
|
||||
val fetchRequest = fetchRequestOpt.get.fetchRequest
|
||||
assertFalse(fetchRequest.fetchData.isEmpty)
|
||||
assertFalse(partitionsWithError.nonEmpty)
|
||||
val request = fetchRequest.build()
|
||||
|
@ -577,9 +578,9 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
|
||||
assertTrue(fetchRequestOpt.isDefined)
|
||||
val fetchRequest = fetchRequestOpt.get
|
||||
assertFalse(fetchRequest.fetchData.isEmpty)
|
||||
assertFalse(fetchRequest.partitionData.isEmpty)
|
||||
assertFalse(partitionsWithError.nonEmpty)
|
||||
val fetchInfos = fetchRequest.build().fetchData.asScala.toSeq
|
||||
val fetchInfos = fetchRequest.fetchRequest.build().fetchData.asScala.toSeq
|
||||
assertEquals(1, fetchInfos.length)
|
||||
assertEquals("Expected fetch request for non-truncating partition", t1p0, fetchInfos.head._1)
|
||||
assertEquals(150, fetchInfos.head._2.fetchOffset)
|
||||
|
@ -591,9 +592,9 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
|
||||
assertTrue(fetchRequest2Opt.isDefined)
|
||||
val fetchRequest2 = fetchRequest2Opt.get
|
||||
assertFalse(fetchRequest2.fetchData.isEmpty)
|
||||
assertFalse(fetchRequest2.partitionData.isEmpty)
|
||||
assertFalse(partitionsWithError2.nonEmpty)
|
||||
val fetchInfos2 = fetchRequest2.build().fetchData.asScala.toSeq
|
||||
val fetchInfos2 = fetchRequest2.fetchRequest.build().fetchData.asScala.toSeq
|
||||
assertEquals(1, fetchInfos2.length)
|
||||
assertEquals("Expected fetch request for non-delayed partition", t1p0, fetchInfos2.head._1)
|
||||
assertEquals(140, fetchInfos2.head._2.fetchOffset)
|
||||
|
|
Loading…
Reference in New Issue