mirror of https://github.com/apache/kafka.git
Avoid copying partition states to maintain fetch offsets
We already have the partition fetch data in the fetch session, so a copy is not required.
This commit is contained in:
parent
854588804d
commit
29fdd6094b
|
@ -101,7 +101,7 @@ public class PartitionStates<S> {
|
||||||
}
|
}
|
||||||
|
|
||||||
public LinkedHashMap<TopicPartition, S> partitionStateMap() {
|
public LinkedHashMap<TopicPartition, S> partitionStateMap() {
|
||||||
return new LinkedHashMap<>(map);
|
return map;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package kafka.server
|
package kafka.server
|
||||||
|
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
|
import java.util
|
||||||
import java.util.Optional
|
import java.util.Optional
|
||||||
import java.util.concurrent.locks.ReentrantLock
|
import java.util.concurrent.locks.ReentrantLock
|
||||||
|
|
||||||
|
@ -29,16 +30,17 @@ import kafka.common.ClientIdAndBroker
|
||||||
import kafka.metrics.KafkaMetricsGroup
|
import kafka.metrics.KafkaMetricsGroup
|
||||||
import kafka.utils.CoreUtils.inLock
|
import kafka.utils.CoreUtils.inLock
|
||||||
import org.apache.kafka.common.protocol.Errors
|
import org.apache.kafka.common.protocol.Errors
|
||||||
import AbstractFetcherThread._
|
|
||||||
|
|
||||||
import scala.collection.{mutable, Map, Seq, Set}
|
import scala.collection.{mutable, Map, Seq, Set}
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
import java.util.function.Consumer
|
import java.util.function.BiConsumer
|
||||||
|
|
||||||
import com.yammer.metrics.core.Gauge
|
import com.yammer.metrics.core.Gauge
|
||||||
import kafka.log.LogAppendInfo
|
import kafka.log.LogAppendInfo
|
||||||
|
import kafka.server.AbstractFetcherThread.Fetch
|
||||||
|
import kafka.server.AbstractFetcherThread.ResultWithPartitions
|
||||||
import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
|
import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
|
||||||
import org.apache.kafka.common.internals.PartitionStates
|
import org.apache.kafka.common.internals.PartitionStates
|
||||||
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
|
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
|
||||||
|
@ -79,7 +81,8 @@ abstract class AbstractFetcherThread(name: String,
|
||||||
|
|
||||||
protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit
|
protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit
|
||||||
|
|
||||||
protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]]
|
protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]]
|
||||||
|
|
||||||
|
|
||||||
protected def latestEpoch(topicPartition: TopicPartition): Option[Int]
|
protected def latestEpoch(topicPartition: TopicPartition): Option[Int]
|
||||||
|
|
||||||
|
@ -117,9 +120,8 @@ abstract class AbstractFetcherThread(name: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
private def maybeFetch(): Unit = {
|
private def maybeFetch(): Unit = {
|
||||||
val (fetchStates, fetchRequestOpt) = inLock(partitionMapLock) {
|
val fetchRequestOpt = inLock(partitionMapLock) {
|
||||||
val fetchStates = partitionStates.partitionStateMap.asScala
|
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(partitionStates.partitionStateMap.asScala)
|
||||||
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates)
|
|
||||||
|
|
||||||
handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
|
handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
|
||||||
|
|
||||||
|
@ -128,11 +130,11 @@ abstract class AbstractFetcherThread(name: String,
|
||||||
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
|
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
|
||||||
}
|
}
|
||||||
|
|
||||||
(fetchStates, fetchRequestOpt)
|
fetchRequestOpt
|
||||||
}
|
}
|
||||||
|
|
||||||
fetchRequestOpt.foreach { fetchRequest =>
|
fetchRequestOpt.foreach { case Fetch(sessionPartitions, fetchRequest) =>
|
||||||
processFetchRequest(fetchStates, fetchRequest)
|
processFetchRequest(sessionPartitions, fetchRequest)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,13 +154,12 @@ abstract class AbstractFetcherThread(name: String,
|
||||||
val partitionsWithEpochs = mutable.Map.empty[TopicPartition, EpochData]
|
val partitionsWithEpochs = mutable.Map.empty[TopicPartition, EpochData]
|
||||||
val partitionsWithoutEpochs = mutable.Set.empty[TopicPartition]
|
val partitionsWithoutEpochs = mutable.Set.empty[TopicPartition]
|
||||||
|
|
||||||
partitionStates.stream().forEach(new Consumer[PartitionStates.PartitionState[PartitionFetchState]] {
|
partitionStates.partitionStateMap.forEach(new BiConsumer[TopicPartition, PartitionFetchState] {
|
||||||
override def accept(state: PartitionStates.PartitionState[PartitionFetchState]): Unit = {
|
override def accept(tp: TopicPartition, state: PartitionFetchState): Unit = {
|
||||||
if (state.value.isTruncating) {
|
if (state.isTruncating) {
|
||||||
val tp = state.topicPartition
|
|
||||||
latestEpoch(tp) match {
|
latestEpoch(tp) match {
|
||||||
case Some(epoch) if isOffsetForLeaderEpochSupported =>
|
case Some(epoch) if isOffsetForLeaderEpochSupported =>
|
||||||
partitionsWithEpochs += tp -> new EpochData(Optional.of(state.value.currentLeaderEpoch), epoch)
|
partitionsWithEpochs += tp -> new EpochData(Optional.of(state.currentLeaderEpoch), epoch)
|
||||||
case _ =>
|
case _ =>
|
||||||
partitionsWithoutEpochs += tp
|
partitionsWithoutEpochs += tp
|
||||||
}
|
}
|
||||||
|
@ -278,7 +279,7 @@ abstract class AbstractFetcherThread(name: String,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def processFetchRequest(fetchStates: Map[TopicPartition, PartitionFetchState],
|
private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
|
||||||
fetchRequest: FetchRequest.Builder): Unit = {
|
fetchRequest: FetchRequest.Builder): Unit = {
|
||||||
val partitionsWithError = mutable.Set[TopicPartition]()
|
val partitionsWithError = mutable.Set[TopicPartition]()
|
||||||
var responseData: Seq[(TopicPartition, FetchData)] = Seq.empty
|
var responseData: Seq[(TopicPartition, FetchData)] = Seq.empty
|
||||||
|
@ -309,8 +310,8 @@ abstract class AbstractFetcherThread(name: String,
|
||||||
// It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
|
// It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
|
||||||
// In this case, we only want to process the fetch response if the partition state is ready for fetch and
|
// In this case, we only want to process the fetch response if the partition state is ready for fetch and
|
||||||
// the current offset is the same as the offset requested.
|
// the current offset is the same as the offset requested.
|
||||||
val fetchState = fetchStates(topicPartition)
|
val fetchPartitionData = sessionPartitions.get(topicPartition)
|
||||||
if (fetchState.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
|
if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
|
||||||
partitionData.error match {
|
partitionData.error match {
|
||||||
case Errors.NONE =>
|
case Errors.NONE =>
|
||||||
try {
|
try {
|
||||||
|
@ -326,8 +327,7 @@ abstract class AbstractFetcherThread(name: String,
|
||||||
// ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
|
// ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
|
||||||
if (validBytes > 0 && partitionStates.contains(topicPartition)) {
|
if (validBytes > 0 && partitionStates.contains(topicPartition)) {
|
||||||
// Update partitionStates only if there is no exception during processPartitionData
|
// Update partitionStates only if there is no exception during processPartitionData
|
||||||
val newFetchState = PartitionFetchState(nextOffset, fetchState.currentLeaderEpoch,
|
val newFetchState = PartitionFetchState(nextOffset, fetchPartitionData.currentLeaderEpoch.get(), state = Fetching)
|
||||||
state = Fetching)
|
|
||||||
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
|
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
|
||||||
fetcherStats.byteRate.mark(validBytes)
|
fetcherStats.byteRate.mark(validBytes)
|
||||||
}
|
}
|
||||||
|
@ -358,7 +358,7 @@ abstract class AbstractFetcherThread(name: String,
|
||||||
|
|
||||||
case Errors.UNKNOWN_LEADER_EPOCH =>
|
case Errors.UNKNOWN_LEADER_EPOCH =>
|
||||||
debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " +
|
debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " +
|
||||||
s"this replica's current leader epoch of ${fetchState.currentLeaderEpoch}.")
|
s"this replica's current leader epoch of ${fetchPartitionData.currentLeaderEpoch}.")
|
||||||
partitionsWithError += topicPartition
|
partitionsWithError += topicPartition
|
||||||
|
|
||||||
case Errors.FENCED_LEADER_EPOCH =>
|
case Errors.FENCED_LEADER_EPOCH =>
|
||||||
|
@ -667,6 +667,7 @@ abstract class AbstractFetcherThread(name: String,
|
||||||
|
|
||||||
object AbstractFetcherThread {
|
object AbstractFetcherThread {
|
||||||
|
|
||||||
|
case class Fetch(partitionData: util.Map[TopicPartition, FetchRequest.PartitionData], fetchRequest: FetchRequest.Builder)
|
||||||
case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition])
|
case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition])
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Optional
|
||||||
import kafka.api.Request
|
import kafka.api.Request
|
||||||
import kafka.cluster.BrokerEndPoint
|
import kafka.cluster.BrokerEndPoint
|
||||||
import kafka.log.LogAppendInfo
|
import kafka.log.LogAppendInfo
|
||||||
|
import kafka.server.AbstractFetcherThread.Fetch
|
||||||
import kafka.server.AbstractFetcherThread.ResultWithPartitions
|
import kafka.server.AbstractFetcherThread.ResultWithPartitions
|
||||||
import kafka.server.QuotaFactory.UnboundedQuota
|
import kafka.server.QuotaFactory.UnboundedQuota
|
||||||
import org.apache.kafka.common.TopicPartition
|
import org.apache.kafka.common.TopicPartition
|
||||||
|
@ -34,7 +35,7 @@ import org.apache.kafka.common.requests.FetchResponse.PartitionData
|
||||||
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
|
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.collection.{Map, Seq, Set, mutable}
|
import scala.collection.{mutable, Map, Seq, Set}
|
||||||
|
|
||||||
class ReplicaAlterLogDirsThread(name: String,
|
class ReplicaAlterLogDirsThread(name: String,
|
||||||
sourceBroker: BrokerEndPoint,
|
sourceBroker: BrokerEndPoint,
|
||||||
|
@ -222,7 +223,7 @@ class ReplicaAlterLogDirsThread(name: String,
|
||||||
nextPartitionOpt
|
nextPartitionOpt
|
||||||
}
|
}
|
||||||
|
|
||||||
private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[FetchRequest.Builder]] = {
|
private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[Fetch]] = {
|
||||||
val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
|
val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
|
||||||
val partitionsWithError = mutable.Set[TopicPartition]()
|
val partitionsWithError = mutable.Set[TopicPartition]()
|
||||||
|
|
||||||
|
@ -241,14 +242,14 @@ class ReplicaAlterLogDirsThread(name: String,
|
||||||
} else {
|
} else {
|
||||||
// Set maxWait and minBytes to 0 because the response should return immediately if
|
// Set maxWait and minBytes to 0 because the response should return immediately if
|
||||||
// the future log has caught up with the current log of the partition
|
// the future log has caught up with the current log of the partition
|
||||||
Some(FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap)
|
val requestBuilder = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap).setMaxBytes(maxBytes)
|
||||||
.setMaxBytes(maxBytes))
|
Some(Fetch(requestMap, requestBuilder))
|
||||||
}
|
}
|
||||||
|
|
||||||
ResultWithPartitions(fetchRequestOpt, partitionsWithError)
|
ResultWithPartitions(fetchRequestOpt, partitionsWithError)
|
||||||
}
|
}
|
||||||
|
|
||||||
def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
|
def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]] = {
|
||||||
// Only include replica in the fetch request if it is not throttled.
|
// Only include replica in the fetch request if it is not throttled.
|
||||||
if (quota.isQuotaExceeded) {
|
if (quota.isQuotaExceeded) {
|
||||||
ResultWithPartitions(None, Set.empty)
|
ResultWithPartitions(None, Set.empty)
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.util.Optional
|
||||||
import kafka.api._
|
import kafka.api._
|
||||||
import kafka.cluster.BrokerEndPoint
|
import kafka.cluster.BrokerEndPoint
|
||||||
import kafka.log.LogAppendInfo
|
import kafka.log.LogAppendInfo
|
||||||
|
import kafka.server.AbstractFetcherThread.Fetch
|
||||||
import kafka.server.AbstractFetcherThread.ResultWithPartitions
|
import kafka.server.AbstractFetcherThread.ResultWithPartitions
|
||||||
import org.apache.kafka.clients.FetchSessionHandler
|
import org.apache.kafka.clients.FetchSessionHandler
|
||||||
import org.apache.kafka.common.TopicPartition
|
import org.apache.kafka.common.TopicPartition
|
||||||
|
@ -34,7 +35,7 @@ import org.apache.kafka.common.requests._
|
||||||
import org.apache.kafka.common.utils.{LogContext, Time}
|
import org.apache.kafka.common.utils.{LogContext, Time}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.collection.{Map, mutable}
|
import scala.collection.{mutable, Map}
|
||||||
|
|
||||||
class ReplicaFetcherThread(name: String,
|
class ReplicaFetcherThread(name: String,
|
||||||
fetcherId: Int,
|
fetcherId: Int,
|
||||||
|
@ -240,7 +241,7 @@ class ReplicaFetcherThread(name: String,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
|
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]] = {
|
||||||
val partitionsWithError = mutable.Set[TopicPartition]()
|
val partitionsWithError = mutable.Set[TopicPartition]()
|
||||||
|
|
||||||
val builder = fetchSessionHandler.newBuilder()
|
val builder = fetchSessionHandler.newBuilder()
|
||||||
|
@ -269,7 +270,7 @@ class ReplicaFetcherThread(name: String,
|
||||||
.setMaxBytes(maxBytes)
|
.setMaxBytes(maxBytes)
|
||||||
.toForget(fetchData.toForget)
|
.toForget(fetchData.toForget)
|
||||||
.metadata(fetchData.metadata)
|
.metadata(fetchData.metadata)
|
||||||
Some(requestBuilder)
|
Some(Fetch(fetchData.sessionPartitions(), requestBuilder))
|
||||||
}
|
}
|
||||||
|
|
||||||
ResultWithPartitions(fetchRequestOpt, partitionsWithError)
|
ResultWithPartitions(fetchRequestOpt, partitionsWithError)
|
||||||
|
|
|
@ -25,6 +25,7 @@ import com.yammer.metrics.Metrics
|
||||||
import kafka.cluster.BrokerEndPoint
|
import kafka.cluster.BrokerEndPoint
|
||||||
import kafka.log.LogAppendInfo
|
import kafka.log.LogAppendInfo
|
||||||
import kafka.message.NoCompressionCodec
|
import kafka.message.NoCompressionCodec
|
||||||
|
import kafka.server.AbstractFetcherThread.Fetch
|
||||||
import kafka.server.AbstractFetcherThread.ResultWithPartitions
|
import kafka.server.AbstractFetcherThread.ResultWithPartitions
|
||||||
import kafka.utils.TestUtils
|
import kafka.utils.TestUtils
|
||||||
import org.apache.kafka.common.KafkaException
|
import org.apache.kafka.common.KafkaException
|
||||||
|
@ -38,7 +39,7 @@ import org.junit.Assert._
|
||||||
import org.junit.{Before, Test}
|
import org.junit.{Before, Test}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.collection.{Map, Set, mutable}
|
import scala.collection.{mutable, Map, Set}
|
||||||
import scala.util.Random
|
import scala.util.Random
|
||||||
import org.scalatest.Assertions.assertThrows
|
import org.scalatest.Assertions.assertThrows
|
||||||
|
|
||||||
|
@ -901,7 +902,7 @@ class AbstractFetcherThreadTest {
|
||||||
state.highWatermark = offset
|
state.highWatermark = offset
|
||||||
}
|
}
|
||||||
|
|
||||||
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
|
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]] = {
|
||||||
val fetchData = mutable.Map.empty[TopicPartition, FetchRequest.PartitionData]
|
val fetchData = mutable.Map.empty[TopicPartition, FetchRequest.PartitionData]
|
||||||
partitionMap.foreach { case (partition, state) =>
|
partitionMap.foreach { case (partition, state) =>
|
||||||
if (state.isReadyForFetch) {
|
if (state.isReadyForFetch) {
|
||||||
|
@ -911,7 +912,7 @@ class AbstractFetcherThreadTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
val fetchRequest = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 1, fetchData.asJava)
|
val fetchRequest = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 1, fetchData.asJava)
|
||||||
ResultWithPartitions(Some(fetchRequest), Set.empty)
|
ResultWithPartitions(Some(Fetch(fetchData.asJava, fetchRequest)), Set.empty)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
|
override def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
|
||||||
|
|
|
@ -20,6 +20,7 @@ import java.util.Optional
|
||||||
|
|
||||||
import kafka.cluster.{BrokerEndPoint, Partition}
|
import kafka.cluster.{BrokerEndPoint, Partition}
|
||||||
import kafka.log.{Log, LogManager}
|
import kafka.log.{Log, LogManager}
|
||||||
|
import kafka.server.AbstractFetcherThread.Fetch
|
||||||
import kafka.server.AbstractFetcherThread.ResultWithPartitions
|
import kafka.server.AbstractFetcherThread.ResultWithPartitions
|
||||||
import kafka.utils.{DelayedItem, TestUtils}
|
import kafka.utils.{DelayedItem, TestUtils}
|
||||||
import org.apache.kafka.common.TopicPartition
|
import org.apache.kafka.common.TopicPartition
|
||||||
|
@ -524,7 +525,7 @@ class ReplicaAlterLogDirsThreadTest {
|
||||||
t1p1 -> PartitionFetchState(160, leaderEpoch, state = Fetching)))
|
t1p1 -> PartitionFetchState(160, leaderEpoch, state = Fetching)))
|
||||||
|
|
||||||
assertTrue(fetchRequestOpt.isDefined)
|
assertTrue(fetchRequestOpt.isDefined)
|
||||||
val fetchRequest = fetchRequestOpt.get
|
val fetchRequest = fetchRequestOpt.get.fetchRequest
|
||||||
assertFalse(fetchRequest.fetchData.isEmpty)
|
assertFalse(fetchRequest.fetchData.isEmpty)
|
||||||
assertFalse(partitionsWithError.nonEmpty)
|
assertFalse(partitionsWithError.nonEmpty)
|
||||||
val request = fetchRequest.build()
|
val request = fetchRequest.build()
|
||||||
|
@ -577,9 +578,9 @@ class ReplicaAlterLogDirsThreadTest {
|
||||||
|
|
||||||
assertTrue(fetchRequestOpt.isDefined)
|
assertTrue(fetchRequestOpt.isDefined)
|
||||||
val fetchRequest = fetchRequestOpt.get
|
val fetchRequest = fetchRequestOpt.get
|
||||||
assertFalse(fetchRequest.fetchData.isEmpty)
|
assertFalse(fetchRequest.partitionData.isEmpty)
|
||||||
assertFalse(partitionsWithError.nonEmpty)
|
assertFalse(partitionsWithError.nonEmpty)
|
||||||
val fetchInfos = fetchRequest.build().fetchData.asScala.toSeq
|
val fetchInfos = fetchRequest.fetchRequest.build().fetchData.asScala.toSeq
|
||||||
assertEquals(1, fetchInfos.length)
|
assertEquals(1, fetchInfos.length)
|
||||||
assertEquals("Expected fetch request for non-truncating partition", t1p0, fetchInfos.head._1)
|
assertEquals("Expected fetch request for non-truncating partition", t1p0, fetchInfos.head._1)
|
||||||
assertEquals(150, fetchInfos.head._2.fetchOffset)
|
assertEquals(150, fetchInfos.head._2.fetchOffset)
|
||||||
|
@ -591,9 +592,9 @@ class ReplicaAlterLogDirsThreadTest {
|
||||||
|
|
||||||
assertTrue(fetchRequest2Opt.isDefined)
|
assertTrue(fetchRequest2Opt.isDefined)
|
||||||
val fetchRequest2 = fetchRequest2Opt.get
|
val fetchRequest2 = fetchRequest2Opt.get
|
||||||
assertFalse(fetchRequest2.fetchData.isEmpty)
|
assertFalse(fetchRequest2.partitionData.isEmpty)
|
||||||
assertFalse(partitionsWithError2.nonEmpty)
|
assertFalse(partitionsWithError2.nonEmpty)
|
||||||
val fetchInfos2 = fetchRequest2.build().fetchData.asScala.toSeq
|
val fetchInfos2 = fetchRequest2.fetchRequest.build().fetchData.asScala.toSeq
|
||||||
assertEquals(1, fetchInfos2.length)
|
assertEquals(1, fetchInfos2.length)
|
||||||
assertEquals("Expected fetch request for non-delayed partition", t1p0, fetchInfos2.head._1)
|
assertEquals("Expected fetch request for non-delayed partition", t1p0, fetchInfos2.head._1)
|
||||||
assertEquals(140, fetchInfos2.head._2.fetchOffset)
|
assertEquals(140, fetchInfos2.head._2.fetchOffset)
|
||||||
|
|
Loading…
Reference in New Issue