KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path (#11170)

Before we used the metadata cache to determine whether or not to use topic IDs. Unfortunately, metadata cache updates with ZK controllers are in a separate request and may be too slow for the fetcher thread. This results in switching between topic names and topic IDs for topics that could just use IDs.

This patch adds topic IDs to FetcherState created in LeaderAndIsr requests. It also supports updating this state for follower threads as soon as a LeaderAndIsr request provides a topic ID.

We've opted to only update replica fetcher threads. AlterLogDir threads will use either topic name or topic ID depending on what was present when they were created.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Justine Olshan 2021-09-24 01:51:08 -07:00 committed by GitHub
parent d08e3ad7d5
commit b76bcaf3a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 398 additions and 101 deletions

View File

@ -64,6 +64,11 @@ public class PartitionStates<S> {
updateSize();
}
public void update(TopicPartition topicPartition, S state) {
map.put(topicPartition, state);
updateSize();
}
public void remove(TopicPartition topicPartition) {
map.remove(topicPartition);
updateSize();

View File

@ -21,7 +21,7 @@ import kafka.cluster.BrokerEndPoint
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.utils.Utils
import scala.collection.{Map, Set, mutable}
@ -67,7 +67,7 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
if (id.fetcherId >= newSize)
thread.shutdown()
val fetchStates = partitionStates.map { case (topicPartition, currentFetchState) =>
val initialFetchState = InitialFetchState(thread.sourceBroker,
val initialFetchState = InitialFetchState(currentFetchState.topicId, thread.sourceBroker,
currentLeaderEpoch = currentFetchState.currentLeaderEpoch,
initOffset = currentFetchState.fetchOffset)
topicPartition -> initialFetchState
@ -163,6 +163,26 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for partitions $initialOffsetAndEpochs")
}
/**
* If the fetcher and partition state exist, update all to include the topic ID
*
* @param partitionsToUpdate a mapping of partitions to be updated to their leader IDs
* @param topicIds the mappings from topic name to ID or None if it does not exist
*/
def maybeUpdateTopicIds(partitionsToUpdate: Map[TopicPartition, Int], topicIds: String => Option[Uuid]): Unit = {
lock synchronized {
val partitionsPerFetcher = partitionsToUpdate.groupBy { case (topicPartition, leaderId) =>
BrokerIdAndFetcherId(leaderId, getFetcherId(topicPartition))
}.map { case (brokerAndFetcherId, partitionsToUpdate) =>
(brokerAndFetcherId, partitionsToUpdate.keySet)
}
for ((brokerIdAndFetcherId, partitions) <- partitionsPerFetcher) {
fetcherThreadMap.get(brokerIdAndFetcherId).foreach(_.maybeUpdateTopicIds(partitions, topicIds))
}
}
}
def removeFetcherForPartitions(partitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = {
val fetchStates = mutable.Map.empty[TopicPartition, PartitionFetchState]
lock synchronized {
@ -235,6 +255,6 @@ class FailedPartitions {
case class BrokerAndFetcherId(broker: BrokerEndPoint, fetcherId: Int)
case class InitialFetchState(leader: BrokerEndPoint, currentLeaderEpoch: Int, initOffset: Long)
case class InitialFetchState(topicId: Option[Uuid], leader: BrokerEndPoint, currentLeaderEpoch: Int, initOffset: Long)
case class BrokerIdAndFetcherId(brokerId: Int, fetcherId: Int)

View File

@ -33,7 +33,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid}
import java.nio.ByteBuffer
import java.util
@ -357,7 +357,7 @@ abstract class AbstractFetcherThread(name: String,
// ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
if (validBytes > 0 && partitionStates.contains(topicPartition)) {
// Update partitionStates only if there is no exception during processPartitionData
val newFetchState = PartitionFetchState(nextOffset, Some(lag),
val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
currentFetchState.currentLeaderEpoch, state = Fetching,
logAppendInfo.lastLeaderEpoch)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
@ -442,7 +442,7 @@ abstract class AbstractFetcherThread(name: String,
partitionMapLock.lockInterruptibly()
try {
Option(partitionStates.stateValue(topicPartition)).foreach { state =>
val newState = PartitionFetchState(math.min(truncationOffset, state.fetchOffset),
val newState = PartitionFetchState(state.topicId, math.min(truncationOffset, state.fetchOffset),
state.lag, state.currentLeaderEpoch, state.delay, state = Truncating,
lastFetchedEpoch = None)
partitionStates.updateAndMoveToEnd(topicPartition, newState)
@ -469,16 +469,16 @@ abstract class AbstractFetcherThread(name: String,
if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.currentLeaderEpoch) {
currentState
} else if (initialFetchState.initOffset < 0) {
fetchOffsetAndTruncate(tp, initialFetchState.currentLeaderEpoch)
fetchOffsetAndTruncate(tp, initialFetchState.topicId, initialFetchState.currentLeaderEpoch)
} else if (isTruncationOnFetchSupported) {
// With old message format, `latestEpoch` will be empty and we use Truncating state
// to truncate to high watermark.
val lastFetchedEpoch = latestEpoch(tp)
val state = if (lastFetchedEpoch.nonEmpty) Fetching else Truncating
PartitionFetchState(initialFetchState.initOffset, None, initialFetchState.currentLeaderEpoch,
PartitionFetchState(initialFetchState.topicId, initialFetchState.initOffset, None, initialFetchState.currentLeaderEpoch,
state, lastFetchedEpoch)
} else {
PartitionFetchState(initialFetchState.initOffset, None, initialFetchState.currentLeaderEpoch,
PartitionFetchState(initialFetchState.topicId, initialFetchState.initOffset, None, initialFetchState.currentLeaderEpoch,
state = Truncating, lastFetchedEpoch = None)
}
}
@ -499,6 +499,20 @@ abstract class AbstractFetcherThread(name: String,
} finally partitionMapLock.unlock()
}
def maybeUpdateTopicIds(partitions: Set[TopicPartition], topicIds: String => Option[Uuid]): Unit = {
partitionMapLock.lockInterruptibly()
try {
partitions.foreach { tp =>
val currentState = partitionStates.stateValue(tp)
if (currentState != null) {
val updatedState = currentState.updateTopicId(topicIds(tp.topic))
partitionStates.update(tp, updatedState)
}
}
partitionMapCond.signalAll()
} finally partitionMapLock.unlock()
}
/**
* Loop through all partitions, updating their fetch offset and maybe marking them as
* truncation completed if their offsetTruncationState indicates truncation completed
@ -515,7 +529,7 @@ abstract class AbstractFetcherThread(name: String,
Fetching
else
Truncating
PartitionFetchState(offsetTruncationState.offset, currentFetchState.lag,
PartitionFetchState(currentFetchState.topicId, offsetTruncationState.offset, currentFetchState.lag,
currentFetchState.currentLeaderEpoch, currentFetchState.delay, state, lastFetchedEpoch)
case None => currentFetchState
}
@ -606,7 +620,7 @@ abstract class AbstractFetcherThread(name: String,
fetchState: PartitionFetchState,
requestEpoch: Optional[Integer]): Boolean = {
try {
val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch)
val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " +
s"out of range, which typically implies a leader change. Reset fetch offset to ${newFetchState.fetchOffset}")
@ -630,7 +644,7 @@ abstract class AbstractFetcherThread(name: String,
/**
* Handle a partition whose offset is out of range and return a new fetch offset.
*/
protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, currentLeaderEpoch: Int): PartitionFetchState = {
protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = {
val replicaEndOffset = logEndOffset(topicPartition)
/**
@ -650,7 +664,7 @@ abstract class AbstractFetcherThread(name: String,
truncate(topicPartition, OffsetTruncationState(leaderEndOffset, truncationCompleted = true))
fetcherLagStats.getAndMaybePut(topicPartition).lag = 0
PartitionFetchState(leaderEndOffset, Some(0), currentLeaderEpoch,
PartitionFetchState(topicId, leaderEndOffset, Some(0), currentLeaderEpoch,
state = Fetching, lastFetchedEpoch = latestEpoch(topicPartition))
} else {
/**
@ -684,7 +698,7 @@ abstract class AbstractFetcherThread(name: String,
val initialLag = leaderEndOffset - offsetToFetch
fetcherLagStats.getAndMaybePut(topicPartition).lag = initialLag
PartitionFetchState(offsetToFetch, Some(initialLag), currentLeaderEpoch,
PartitionFetchState(topicId, offsetToFetch, Some(initialLag), currentLeaderEpoch,
state = Fetching, lastFetchedEpoch = latestEpoch(topicPartition))
}
}
@ -695,7 +709,7 @@ abstract class AbstractFetcherThread(name: String,
for (partition <- partitions) {
Option(partitionStates.stateValue(partition)).foreach { currentFetchState =>
if (!currentFetchState.isDelayed) {
partitionStates.updateAndMoveToEnd(partition, PartitionFetchState(currentFetchState.fetchOffset,
partitionStates.updateAndMoveToEnd(partition, PartitionFetchState(currentFetchState.topicId, currentFetchState.fetchOffset,
currentFetchState.lag, currentFetchState.currentLeaderEpoch, Some(new DelayedItem(delay)),
currentFetchState.state, currentFetchState.lastFetchedEpoch))
}
@ -824,9 +838,9 @@ case object Truncating extends ReplicaState
case object Fetching extends ReplicaState
object PartitionFetchState {
def apply(offset: Long, lag: Option[Long], currentLeaderEpoch: Int, state: ReplicaState,
def apply(topicId: Option[Uuid], offset: Long, lag: Option[Long], currentLeaderEpoch: Int, state: ReplicaState,
lastFetchedEpoch: Option[Int]): PartitionFetchState = {
PartitionFetchState(offset, lag, currentLeaderEpoch, None, state, lastFetchedEpoch)
PartitionFetchState(topicId, offset, lag, currentLeaderEpoch, None, state, lastFetchedEpoch)
}
}
@ -838,7 +852,8 @@ object PartitionFetchState {
* (2) Delayed, for example due to an error, where we subsequently back off a bit
* (3) ReadyForFetch, the is the active state where the thread is actively fetching data.
*/
case class PartitionFetchState(fetchOffset: Long,
case class PartitionFetchState(topicId: Option[Uuid],
fetchOffset: Long,
lag: Option[Long],
currentLeaderEpoch: Int,
delay: Option[DelayedItem],
@ -854,7 +869,8 @@ case class PartitionFetchState(fetchOffset: Long,
def isDelayed: Boolean = delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0)
override def toString: String = {
s"FetchState(fetchOffset=$fetchOffset" +
s"FetchState(topicId=$topicId" +
s", fetchOffset=$fetchOffset" +
s", currentLeaderEpoch=$currentLeaderEpoch" +
s", lastFetchedEpoch=$lastFetchedEpoch" +
s", state=$state" +
@ -862,6 +878,10 @@ case class PartitionFetchState(fetchOffset: Long,
s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" +
s")"
}
def updateTopicId(topicId: Option[Uuid]): PartitionFetchState = {
this.copy(topicId = topicId)
}
}
case class OffsetTruncationState(offset: Long, truncationCompleted: Boolean) {

View File

@ -22,7 +22,7 @@ import kafka.cluster.BrokerEndPoint
import kafka.log.{LeaderOffsetIncremented, LogAppendInfo}
import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
import kafka.server.QuotaFactory.UnboundedQuota
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
@ -31,7 +31,7 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, RequestUtils}
import java.util
import java.util.Optional
import java.util.{Collections, Optional}
import scala.collection.{Map, Seq, Set, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
@ -76,7 +76,15 @@ class ReplicaAlterLogDirsThread(name: String,
var partitionData: Seq[(TopicPartition, FetchData)] = null
val request = fetchRequest.build()
val (topicIds, topicNames) = replicaMgr.metadataCache.topicIdInfo()
// We can build the map from the request since it contains topic IDs and names.
// Only one ID can be associated with a name and vice versa.
val topicIds = new mutable.HashMap[String, Uuid]()
val topicNames = new mutable.HashMap[Uuid, String]()
request.data.topics.forEach { topic =>
topicIds.put(topic.topic, topic.topicId)
topicNames.put(topic.topicId, topic.topic)
}
def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
partitionData = responsePartitionData.map { case (tp, data) =>
@ -94,7 +102,7 @@ class ReplicaAlterLogDirsThread(name: String,
}
// Will throw UnknownTopicIdException if a topic ID is unknown.
val fetchData = request.fetchData(topicNames)
val fetchData = request.fetchData(topicNames.asJava)
replicaMgr.fetchMessages(
0L, // timeout is 0 so that the callback will be executed immediately
@ -103,7 +111,7 @@ class ReplicaAlterLogDirsThread(name: String,
request.maxBytes,
false,
fetchData.asScala.toSeq,
topicIds,
topicIds.asJava,
UnboundedQuota,
processResponseCallback,
request.isolationLevel,
@ -262,7 +270,6 @@ class ReplicaAlterLogDirsThread(name: String,
private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[ReplicaFetch]] = {
val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
val partitionsWithError = mutable.Set[TopicPartition]()
val topicIds = replicaMgr.metadataCache.topicNamesToIds()
try {
val logStartOffset = replicaMgr.futureLocalLogOrException(tp).logStartOffset
@ -281,14 +288,14 @@ class ReplicaAlterLogDirsThread(name: String,
val fetchRequestOpt = if (requestMap.isEmpty) {
None
} else {
val version: Short = if (topicIds.containsKey(tp.topic()))
val version: Short = if (fetchState.topicId.isEmpty)
12
else
ApiKeys.FETCH.latestVersion
// Set maxWait and minBytes to 0 because the response should return immediately if
// the future log has caught up with the current log of the partition
val requestBuilder = FetchRequest.Builder.forReplica(version, replicaId, 0, 0, requestMap,
topicIds).setMaxBytes(maxBytes)
Collections.singletonMap(tp.topic, fetchState.topicId.getOrElse(Uuid.ZERO_UUID))).setMaxBytes(maxBytes)
Some(ReplicaFetch(requestMap, requestBuilder))
}

View File

@ -273,7 +273,6 @@ class ReplicaFetcherThread(name: String,
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
val partitionsWithError = mutable.Set[TopicPartition]()
val topicIds = replicaMgr.metadataCache.topicNamesToIds()
val builder = fetchSessionHandler.newBuilder(partitionMap.size, false)
partitionMap.forKeyValue { (topicPartition, fetchState) =>
@ -285,7 +284,7 @@ class ReplicaFetcherThread(name: String,
fetchState.lastFetchedEpoch.map(_.asInstanceOf[Integer]).asJava
else
Optional.empty[Integer]
builder.add(topicPartition, topicIds.getOrDefault(topicPartition.topic(), Uuid.ZERO_UUID), new FetchRequest.PartitionData(
builder.add(topicPartition, fetchState.topicId.getOrElse(Uuid.ZERO_UUID), new FetchRequest.PartitionData(
fetchState.fetchOffset,
logStartOffset,
fetchSize,

View File

@ -728,7 +728,8 @@ class ReplicaManager(val config: KafkaConfig,
// throw NotLeaderOrFollowerException if replica does not exist for the given partition
val partition = getPartitionOrException(topicPartition)
partition.localLogOrException
val log = partition.localLogOrException
val topicId = log.topicId
// If the destinationLDir is different from the current log directory of the replica:
// - If there is no offline log directory, create the future log in the destinationDir (if it does not exist) and
@ -740,7 +741,7 @@ class ReplicaManager(val config: KafkaConfig,
val futureLog = futureLocalLogOrException(topicPartition)
logManager.abortAndPauseCleaning(topicPartition)
val initialFetchState = InitialFetchState(BrokerEndPoint(config.brokerId, "localhost", -1),
val initialFetchState = InitialFetchState(topicId, BrokerEndPoint(config.brokerId, "localhost", -1),
partition.getLeaderEpoch, futureLog.highWatermark)
replicaAlterLogDirsManager.addFetcherForPartitions(Map(topicPartition -> initialFetchState))
}
@ -1329,6 +1330,7 @@ class ReplicaManager(val config: KafkaConfig,
val partitions = new mutable.HashSet[Partition]()
val partitionsToBeLeader = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
val partitionsToBeFollower = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
val topicIdUpdateFollowerPartitions = new mutable.HashSet[Partition]()
// First create the partition if it doesn't exist already
requestPartitionStates.foreach { partitionState =>
@ -1396,6 +1398,8 @@ class ReplicaManager(val config: KafkaConfig,
stateChangeLogger.info(s"Updating log for $topicPartition to assign topic ID " +
s"$topicId from LeaderAndIsr request from controller $controllerId with correlation " +
s"id $correlationId epoch $controllerEpoch")
if (partitionState.leader != localBrokerId)
topicIdUpdateFollowerPartitions.add(partition)
Errors.NONE
case _ =>
stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
@ -1424,6 +1428,9 @@ class ReplicaManager(val config: KafkaConfig,
val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
updateLeaderAndFollowerMetrics(followerTopicSet)
if (topicIdUpdateFollowerPartitions.nonEmpty)
updateTopicIdForFollowers(controllerId, controllerEpoch, topicIdUpdateFollowerPartitions, correlationId, topicIdFromRequest)
// We initialize highwatermark thread after the first LeaderAndIsr request. This ensures that all the partitions
// have been completely populated before starting the checkpointing there by avoiding weird race conditions
startHighWatermarkCheckPointThread()
@ -1521,7 +1528,7 @@ class ReplicaManager(val config: KafkaConfig,
// replica from source dir to destination dir
logManager.abortAndPauseCleaning(topicPartition)
futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(leader,
futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader,
partition.getLeaderEpoch, log.highWatermark))
}
}
@ -1716,7 +1723,7 @@ class ReplicaManager(val config: KafkaConfig,
val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), leaderNode.port())
val log = partition.localLogOrException
val fetchOffset = initialFetchOffset(log)
partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset)
partition.topicPartition -> InitialFetchState(topicIds(partition.topic), leader, partition.getLeaderEpoch, fetchOffset)
}.toMap
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
@ -1739,6 +1746,42 @@ class ReplicaManager(val config: KafkaConfig,
partitionsToMakeFollower
}
private def updateTopicIdForFollowers(controllerId: Int,
controllerEpoch: Int,
partitions: Set[Partition],
correlationId: Int,
topicIds: String => Option[Uuid]): Unit = {
val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
try {
if (isShuttingDown.get()) {
if (traceLoggingEnabled) {
partitions.foreach { partition =>
stateChangeLogger.trace(s"Skipped the update topic ID step of the become-follower state " +
s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " +
s"partition ${partition.topicPartition} since it is shutting down")
}
}
} else {
val partitionsToUpdateFollowerWithLeader = mutable.Map.empty[TopicPartition, Int]
partitions.foreach { partition =>
partition.leaderReplicaIdOpt.foreach { leader =>
if (metadataCache.hasAliveBroker(leader)) {
partitionsToUpdateFollowerWithLeader += partition.topicPartition -> leader
}
}
}
replicaFetcherManager.maybeUpdateTopicIds(partitionsToUpdateFollowerWithLeader, topicIds)
}
} catch {
case e: Throwable =>
stateChangeLogger.error(s"Error while processing LeaderAndIsr request with correlationId $correlationId " +
s"received from controller $controllerId epoch $controllerEpoch when trying to update topic IDs in the fetchers", e)
// Re-throw the exception for it to be caught in KafkaApis
throw e
}
}
/**
* From IBP 2.7 onwards, we send latest fetch epoch in the request and truncate if a
* diverging epoch is returned in the response, avoiding the need for a separate
@ -2211,6 +2254,7 @@ class ReplicaManager(val config: KafkaConfig,
.getOrElse(Node.noNode)
val log = partition.localLogOrException
partitionAndOffsets.put(topicPartition, InitialFetchState(
log.topicId,
new BrokerEndPoint(node.id, node.host, node.port),
partition.getLeaderEpoch,
initialFetchOffset(log)

View File

@ -20,7 +20,7 @@ import com.yammer.metrics.core.Gauge
import kafka.cluster.BrokerEndPoint
import kafka.metrics.KafkaYammerMetrics
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.easymock.EasyMock
import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.api.Assertions._
@ -51,7 +51,9 @@ class AbstractFetcherManagerTest {
val fetchOffset = 10L
val leaderEpoch = 15
val tp = new TopicPartition("topic", 0)
val topicId = Some(Uuid.randomUuid())
val initialFetchState = InitialFetchState(
topicId = topicId,
leader = new BrokerEndPoint(0, "localhost", 9092),
currentLeaderEpoch = leaderEpoch,
initOffset = fetchOffset)
@ -60,7 +62,7 @@ class AbstractFetcherManagerTest {
EasyMock.expect(fetcher.addPartitions(Map(tp -> initialFetchState)))
.andReturn(Set(tp))
EasyMock.expect(fetcher.fetchState(tp))
.andReturn(Some(PartitionFetchState(fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None)))
.andReturn(Some(PartitionFetchState(topicId, fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None)))
EasyMock.expect(fetcher.removePartitions(Set(tp))).andReturn(Map.empty)
EasyMock.expect(fetcher.fetchState(tp)).andReturn(None)
EasyMock.replay(fetcher)
@ -110,7 +112,9 @@ class AbstractFetcherManagerTest {
val fetchOffset = 10L
val leaderEpoch = 15
val tp = new TopicPartition("topic", 0)
val topicId = Some(Uuid.randomUuid())
val initialFetchState = InitialFetchState(
topicId = topicId,
leader = new BrokerEndPoint(0, "localhost", 9092),
currentLeaderEpoch = leaderEpoch,
initOffset = fetchOffset)
@ -133,4 +137,87 @@ class AbstractFetcherManagerTest {
assertEquals(0, fetcherManager.deadThreadCount)
EasyMock.verify(fetcher)
}
@Test
def testMaybeUpdateTopicIds(): Unit = {
val fetcher: AbstractFetcherThread = EasyMock.mock(classOf[AbstractFetcherThread])
val fetcherManager = new AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", "fetcher-manager", 2) {
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
fetcher
}
}
val fetchOffset = 10L
val leaderEpoch = 15
val tp1 = new TopicPartition("topic1", 0)
val tp2 = new TopicPartition("topic2", 0)
val unknownTp = new TopicPartition("topic2", 1)
val topicId1 = Some(Uuid.randomUuid())
val topicId2 = Some(Uuid.randomUuid())
// Start out with no topic ID.
val initialFetchState1 = InitialFetchState(
topicId = None,
leader = new BrokerEndPoint(0, "localhost", 9092),
currentLeaderEpoch = leaderEpoch,
initOffset = fetchOffset)
// Include a partition on a different leader
val initialFetchState2 = InitialFetchState(
topicId = None,
leader = new BrokerEndPoint(1, "localhost", 9092),
currentLeaderEpoch = leaderEpoch,
initOffset = fetchOffset)
// Simulate calls to different fetchers due to different leaders
EasyMock.expect(fetcher.start())
EasyMock.expect(fetcher.start())
EasyMock.expect(fetcher.addPartitions(Map(tp1 -> initialFetchState1)))
.andReturn(Set(tp1))
EasyMock.expect(fetcher.addPartitions(Map(tp2 -> initialFetchState2)))
.andReturn(Set(tp2))
EasyMock.expect(fetcher.fetchState(tp1))
.andReturn(Some(PartitionFetchState(None, fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None)))
EasyMock.expect(fetcher.fetchState(tp2))
.andReturn(Some(PartitionFetchState(None, fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None)))
val topicIds = Map(tp1.topic -> topicId1, tp2.topic -> topicId2)
EasyMock.expect(fetcher.maybeUpdateTopicIds(Set(tp1), topicIds))
EasyMock.expect(fetcher.maybeUpdateTopicIds(Set(tp2), topicIds))
EasyMock.expect(fetcher.fetchState(tp1))
.andReturn(Some(PartitionFetchState(topicId1, fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None)))
EasyMock.expect(fetcher.fetchState(tp2))
.andReturn(Some(PartitionFetchState(topicId2, fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None)))
// When targeting a fetcher that doesn't exist, we will not see fetcher.maybeUpdateTopicIds called.
// We will see it for a topic partition that does not exist.
EasyMock.expect(fetcher.maybeUpdateTopicIds(Set(unknownTp), topicIds))
EasyMock.expect(fetcher.fetchState(unknownTp))
.andReturn(None)
EasyMock.replay(fetcher)
def verifyFetchState(fetchState: Option[PartitionFetchState], expectedTopicId: Option[Uuid]): Unit = {
assertTrue(fetchState.isDefined)
assertEquals(expectedTopicId, fetchState.get.topicId)
}
fetcherManager.addFetcherForPartitions(Map(tp1 -> initialFetchState1, tp2 -> initialFetchState2))
verifyFetchState(fetcher.fetchState(tp1), None)
verifyFetchState(fetcher.fetchState(tp2), None)
val partitionsToUpdate = Map(tp1 -> initialFetchState1.leader.id, tp2 -> initialFetchState2.leader.id)
fetcherManager.maybeUpdateTopicIds(partitionsToUpdate, topicIds)
verifyFetchState(fetcher.fetchState(tp1), topicId1)
verifyFetchState(fetcher.fetchState(tp2), topicId2)
// Try an invalid fetcher and an invalid topic partition
val invalidPartitionsToUpdate = Map(tp1 -> 2, unknownTp -> initialFetchState1.leader.id)
fetcherManager.maybeUpdateTopicIds(invalidPartitionsToUpdate, topicIds)
assertTrue(fetcher.fetchState(unknownTp).isEmpty)
EasyMock.verify(fetcher)
}
}

View File

@ -53,7 +53,7 @@ import scala.compat.java8.OptionConverters._
class AbstractFetcherThreadTest {
val truncateOnFetch = true
val topicIds = Map("topic1" -> Uuid.randomUuid(), "topic2" -> Uuid.randomUuid()).asJava
val topicIds = Map("topic1" -> Uuid.randomUuid(), "topic2" -> Uuid.randomUuid())
val version = ApiKeys.FETCH.latestVersion()
private val partition1 = new TopicPartition("topic1", 0)
private val partition2 = new TopicPartition("topic2", 0)
@ -71,8 +71,8 @@ class AbstractFetcherThreadTest {
.batches.asScala.head
}
private def initialFetchState(fetchOffset: Long, leaderEpoch: Int): InitialFetchState = {
InitialFetchState(leader = new BrokerEndPoint(0, "localhost", 9092),
private def initialFetchState(topicId: Option[Uuid], fetchOffset: Long, leaderEpoch: Int): InitialFetchState = {
InitialFetchState(topicId = topicId, leader = new BrokerEndPoint(0, "localhost", 9092),
initOffset = fetchOffset, currentLeaderEpoch = leaderEpoch)
}
@ -83,7 +83,7 @@ class AbstractFetcherThreadTest {
// add one partition to create the consumer lag metric
fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0))
fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 0)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 0L, leaderEpoch = 0)))
fetcher.setLeaderState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0))
fetcher.start()
@ -110,7 +110,7 @@ class AbstractFetcherThreadTest {
// add one partition to create the consumer lag metric
fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0))
fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 0)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 0L, leaderEpoch = 0)))
fetcher.setLeaderState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0))
fetcher.doWork()
@ -131,7 +131,7 @@ class AbstractFetcherThreadTest {
val fetcher = new MockFetcherThread
fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0))
fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 0)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 0L, leaderEpoch = 0)))
val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))
@ -160,7 +160,7 @@ class AbstractFetcherThreadTest {
val fetcher = new ErrorMockFetcherThread(fetchBackOffMs = fetchBackOffMs)
fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0))
fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 0)))
fetcher.addPartitions(Map(partition -> initialFetchState(Some(Uuid.randomUuid()), 0L, leaderEpoch = 0)))
val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))
@ -191,7 +191,7 @@ class AbstractFetcherThreadTest {
val fetcher = new MockFetcherThread
fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0))
fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 0)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 0L, leaderEpoch = 0)))
val batch = mkBatch(baseOffset = 0L, leaderEpoch = 1,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))
@ -217,7 +217,7 @@ class AbstractFetcherThreadTest {
val replicaState = MockFetcherThread.PartitionState(leaderEpoch = 0)
fetcher.setReplicaState(partition, replicaState)
fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 0)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 0L, leaderEpoch = 0)))
val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0,
new SimpleRecord("a".getBytes),
@ -248,7 +248,7 @@ class AbstractFetcherThreadTest {
// The replica's leader epoch is ahead of the leader
val replicaState = MockFetcherThread.PartitionState(leaderEpoch = 1)
fetcher.setReplicaState(partition, replicaState)
fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 1)), forceTruncation = true)
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 0L, leaderEpoch = 1)), forceTruncation = true)
val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0, new SimpleRecord("a".getBytes))
val leaderState = MockFetcherThread.PartitionState(Seq(batch), leaderEpoch = 0, highWatermark = 2L)
@ -281,7 +281,7 @@ class AbstractFetcherThreadTest {
val replicaState = MockFetcherThread.PartitionState(leaderEpoch = 1)
fetcher.setReplicaState(partition, replicaState)
fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 1)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 0L, leaderEpoch = 1)))
val leaderState = MockFetcherThread.PartitionState(Seq(
mkBatch(baseOffset = 0L, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
@ -322,7 +322,7 @@ class AbstractFetcherThreadTest {
val replicaState = MockFetcherThread.PartitionState(replicaLog, leaderEpoch = 5, highWatermark = 0L)
fetcher.setReplicaState(partition, replicaState)
fetcher.addPartitions(Map(partition -> initialFetchState(3L, leaderEpoch = 5)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 5)))
val leaderLog = Seq(
mkBatch(baseOffset = 0, leaderEpoch = 1, new SimpleRecord("a".getBytes)),
@ -368,7 +368,7 @@ class AbstractFetcherThreadTest {
val replicaState = MockFetcherThread.PartitionState(replicaLog, leaderEpoch = 5, highWatermark)
fetcher.setReplicaState(partition, replicaState)
fetcher.addPartitions(Map(partition -> initialFetchState(highWatermark, leaderEpoch = 5)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), highWatermark, leaderEpoch = 5)))
fetcher.doWork()
@ -401,7 +401,7 @@ class AbstractFetcherThreadTest {
val replicaState = MockFetcherThread.PartitionState(replicaLog, leaderEpoch = 5, highWatermark)
fetcher.setReplicaState(partition, replicaState)
fetcher.addPartitions(Map(partition -> initialFetchState(highWatermark, leaderEpoch = 5)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), highWatermark, leaderEpoch = 5)))
fetcher.doWork()
@ -430,7 +430,7 @@ class AbstractFetcherThreadTest {
val replicaState = MockFetcherThread.PartitionState(replicaLog, leaderEpoch = 5, highWatermark)
fetcher.setReplicaState(partition, replicaState)
fetcher.addPartitions(Map(partition -> initialFetchState(highWatermark, leaderEpoch = 5)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), highWatermark, leaderEpoch = 5)))
fetcher.doWork()
@ -452,7 +452,7 @@ class AbstractFetcherThreadTest {
val replicaState = MockFetcherThread.PartitionState(leaderEpoch = 5)
fetcher.setReplicaState(partition, replicaState)
fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 5)), forceTruncation = true)
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 0L, leaderEpoch = 5)), forceTruncation = true)
val leaderLog = Seq(
mkBatch(baseOffset = 0, leaderEpoch = 1, new SimpleRecord("a".getBytes)),
@ -470,7 +470,7 @@ class AbstractFetcherThreadTest {
assertEquals(1, truncations)
// Add partitions again with the same epoch
fetcher.addPartitions(Map(partition -> initialFetchState(3L, leaderEpoch = 5)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 5)))
// Verify we did not truncate
fetcher.doWork()
@ -500,7 +500,7 @@ class AbstractFetcherThreadTest {
fetcher.setReplicaState(partition, replicaState)
// Verify that truncation based on fetch response is performed if partition is owned by fetcher thread
fetcher.addPartitions(Map(partition -> initialFetchState(6L, leaderEpoch = 4)))
fetcher.addPartitions(Map(partition -> initialFetchState(Some(Uuid.randomUuid()), 6L, leaderEpoch = 4)))
val endOffset = new EpochEndOffset()
.setPartition(partition.partition)
.setErrorCode(Errors.NONE.code)
@ -534,7 +534,7 @@ class AbstractFetcherThreadTest {
val replicaState = MockFetcherThread.PartitionState(replicaLog, leaderEpoch = 4, highWatermark = 0L)
fetcher.setReplicaState(partition, replicaState)
fetcher.addPartitions(Map(partition -> initialFetchState(3L, leaderEpoch = 4)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 4)))
val leaderLog = Seq(
mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
@ -576,7 +576,7 @@ class AbstractFetcherThreadTest {
val replicaLog = Seq()
val replicaState = MockFetcherThread.PartitionState(replicaLog, leaderEpoch = 4, highWatermark = 0L)
fetcher.setReplicaState(partition, replicaState)
fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 4)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 0L, leaderEpoch = 4)))
val leaderLog = Seq(
mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
@ -603,7 +603,7 @@ class AbstractFetcherThreadTest {
val replicaState = MockFetcherThread.PartitionState(replicaLog, leaderEpoch = 0, highWatermark = 0L)
fetcher.setReplicaState(partition, replicaState)
fetcher.addPartitions(Map(partition -> initialFetchState(3L, leaderEpoch = 0)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 0)))
val leaderLog = Seq(
mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
@ -650,7 +650,7 @@ class AbstractFetcherThreadTest {
val replicaState = MockFetcherThread.PartitionState(replicaLog, leaderEpoch = 0, highWatermark = 0L)
fetcher.setReplicaState(partition, replicaState)
fetcher.addPartitions(Map(partition -> initialFetchState(3L, leaderEpoch = 0)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 0)))
val leaderLog = Seq(
mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
@ -692,7 +692,7 @@ class AbstractFetcherThreadTest {
}
fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0))
fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 0)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 0L, leaderEpoch = 0)))
val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))
@ -737,7 +737,7 @@ class AbstractFetcherThreadTest {
// leader epoch changes while fetching epochs from leader
removePartitions(Set(partition))
setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = nextLeaderEpochOnFollower))
addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = nextLeaderEpochOnFollower)), forceTruncation = true)
addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 0L, leaderEpoch = nextLeaderEpochOnFollower)), forceTruncation = true)
fetchEpochsFromLeaderOnce = true
}
fetchedEpochs
@ -745,7 +745,7 @@ class AbstractFetcherThreadTest {
}
fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = initialLeaderEpochOnFollower))
fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = initialLeaderEpochOnFollower)), forceTruncation = true)
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 0L, leaderEpoch = initialLeaderEpochOnFollower)), forceTruncation = true)
val leaderLog = Seq(
mkBatch(baseOffset = 0, leaderEpoch = initialLeaderEpochOnFollower, new SimpleRecord("c".getBytes)))
@ -789,7 +789,7 @@ class AbstractFetcherThreadTest {
}
fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = initialLeaderEpochOnFollower))
fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = initialLeaderEpochOnFollower)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 0L, leaderEpoch = initialLeaderEpochOnFollower)))
val leaderLog = Seq(
mkBatch(baseOffset = 0, leaderEpoch = initialLeaderEpochOnFollower, new SimpleRecord("c".getBytes)))
@ -827,7 +827,7 @@ class AbstractFetcherThreadTest {
}
fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0))
fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 0)), forceTruncation = true)
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 0L, leaderEpoch = 0)), forceTruncation = true)
fetcher.setLeaderState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0))
// first round of truncation should throw an exception
@ -865,11 +865,11 @@ class AbstractFetcherThreadTest {
private def verifyFetcherThreadHandlingPartitionFailure(fetcher: MockFetcherThread): Unit = {
fetcher.setReplicaState(partition1, MockFetcherThread.PartitionState(leaderEpoch = 0))
fetcher.addPartitions(Map(partition1 -> initialFetchState(0L, leaderEpoch = 0)), forceTruncation = true)
fetcher.addPartitions(Map(partition1 -> initialFetchState(topicIds.get(partition1.topic), 0L, leaderEpoch = 0)), forceTruncation = true)
fetcher.setLeaderState(partition1, MockFetcherThread.PartitionState(leaderEpoch = 0))
fetcher.setReplicaState(partition2, MockFetcherThread.PartitionState(leaderEpoch = 0))
fetcher.addPartitions(Map(partition2 -> initialFetchState(0L, leaderEpoch = 0)), forceTruncation = true)
fetcher.addPartitions(Map(partition2 -> initialFetchState(topicIds.get(partition2.topic), 0L, leaderEpoch = 0)), forceTruncation = true)
fetcher.setLeaderState(partition2, MockFetcherThread.PartitionState(leaderEpoch = 0))
// processing data fails for partition1
@ -887,7 +887,7 @@ class AbstractFetcherThreadTest {
// simulate a leader change
fetcher.removePartitions(Set(partition1))
failedPartitions.removeAll(Set(partition1))
fetcher.addPartitions(Map(partition1 -> initialFetchState(0L, leaderEpoch = 1)), forceTruncation = true)
fetcher.addPartitions(Map(partition1 -> initialFetchState(topicIds.get(partition1.topic), 0L, leaderEpoch = 1)), forceTruncation = true)
// partition1 added back
assertEquals(Some(Truncating), fetcher.fetchState(partition1).map(_.state))
@ -907,7 +907,7 @@ class AbstractFetcherThreadTest {
val replicaState = MockFetcherThread.PartitionState(replicaLog, leaderEpoch = 5, highWatermark = 0L)
fetcher.setReplicaState(partition, replicaState)
fetcher.addPartitions(Map(partition -> initialFetchState(3L, leaderEpoch = 5)))
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 5)))
assertEquals(3L, replicaState.logEndOffset)
fetcher.verifyLastFetchedEpoch(partition, expectedEpoch = Some(4))
@ -929,6 +929,32 @@ class AbstractFetcherThreadTest {
fetcher.verifyLastFetchedEpoch(partition, Some(5))
}
@Test
def testMaybeUpdateTopicIds(): Unit = {
val partition = new TopicPartition("topic1", 0)
val fetcher = new MockFetcherThread
// Start with no topic IDs
fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0))
fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, leaderEpoch = 0)))
def verifyFetchState(fetchState: Option[PartitionFetchState], expectedTopicId: Option[Uuid]): Unit = {
assertTrue(fetchState.isDefined)
assertEquals(expectedTopicId, fetchState.get.topicId)
}
verifyFetchState(fetcher.fetchState(partition), None)
// Add topic ID
fetcher.maybeUpdateTopicIds(Set(partition), topicName => topicIds.get(topicName))
verifyFetchState(fetcher.fetchState(partition), topicIds.get(partition.topic))
// Try to update topic ID for non-existent topic partition
val unknownPartition = new TopicPartition("unknown", 0)
fetcher.maybeUpdateTopicIds(Set(unknownPartition), topicName => topicIds.get(topicName))
assertTrue(fetcher.fetchState(unknownPartition).isEmpty)
}
object MockFetcherThread {
class PartitionState(var log: mutable.Buffer[RecordBatch],
var leaderEpoch: Int,
@ -1076,7 +1102,7 @@ class AbstractFetcherThreadTest {
1024 * 1024, Optional.of[Integer](state.currentLeaderEpoch), lastFetchedEpoch))
}
}
val fetchRequest = FetchRequest.Builder.forReplica(version, replicaId, 0, 1, fetchData.asJava, topicIds)
val fetchRequest = FetchRequest.Builder.forReplica(version, replicaId, 0, 1, fetchData.asJava, topicIds.asJava)
val fetchRequestOpt =
if (fetchData.isEmpty)
None

View File

@ -17,10 +17,12 @@
package kafka.server
import org.apache.kafka.common.Uuid
class AbstractFetcherThreadWithIbp26Test extends AbstractFetcherThreadTest {
override val truncateOnFetch = false
override val version = 11
override val topicIds = java.util.Collections.emptyMap()
override val topicIds = Map.empty[String, Uuid]
}

View File

@ -65,7 +65,7 @@ class ReplicaAlterLogDirsThreadTest {
metadataCache.updateMetadata(0, updateMetadataRequest)
private def initialFetchState(fetchOffset: Long, leaderEpoch: Int = 1): InitialFetchState = {
InitialFetchState(leader = new BrokerEndPoint(0, "localhost", 9092),
InitialFetchState(topicId = Some(topicId), leader = new BrokerEndPoint(0, "localhost", 9092),
initOffset = fetchOffset, currentLeaderEpoch = leaderEpoch)
}
@ -853,8 +853,8 @@ class ReplicaAlterLogDirsThreadTest {
t1p1 -> initialFetchState(0L, leaderEpoch)))
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = thread.buildFetch(Map(
t1p0 -> PartitionFetchState(150, None, leaderEpoch, None, state = Fetching, lastFetchedEpoch = None),
t1p1 -> PartitionFetchState(160, None, leaderEpoch, None, state = Fetching, lastFetchedEpoch = None)))
t1p0 -> PartitionFetchState(Some(topicId), 150, None, leaderEpoch, None, state = Fetching, lastFetchedEpoch = None),
t1p1 -> PartitionFetchState(Some(topicId), 160, None, leaderEpoch, None, state = Fetching, lastFetchedEpoch = None)))
assertTrue(fetchRequestOpt.isDefined)
val fetchRequest = fetchRequestOpt.get.fetchRequest
@ -906,8 +906,8 @@ class ReplicaAlterLogDirsThreadTest {
// one partition is ready and one is truncating
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = thread.buildFetch(Map(
t1p0 -> PartitionFetchState(150, None, leaderEpoch, state = Fetching, lastFetchedEpoch = None),
t1p1 -> PartitionFetchState(160, None, leaderEpoch, state = Truncating, lastFetchedEpoch = None)))
t1p0 -> PartitionFetchState(Some(topicId), 150, None, leaderEpoch, state = Fetching, lastFetchedEpoch = None),
t1p1 -> PartitionFetchState(Some(topicId), 160, None, leaderEpoch, state = Truncating, lastFetchedEpoch = None)))
assertTrue(fetchRequestOpt.isDefined)
val fetchRequest = fetchRequestOpt.get
@ -920,8 +920,8 @@ class ReplicaAlterLogDirsThreadTest {
// one partition is ready and one is delayed
val ResultWithPartitions(fetchRequest2Opt, partitionsWithError2) = thread.buildFetch(Map(
t1p0 -> PartitionFetchState(140, None, leaderEpoch, state = Fetching, lastFetchedEpoch = None),
t1p1 -> PartitionFetchState(160, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching, lastFetchedEpoch = None)))
t1p0 -> PartitionFetchState(Some(topicId), 140, None, leaderEpoch, state = Fetching, lastFetchedEpoch = None),
t1p1 -> PartitionFetchState(Some(topicId), 160, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching, lastFetchedEpoch = None)))
assertTrue(fetchRequest2Opt.isDefined)
val fetchRequest2 = fetchRequest2Opt.get
@ -934,8 +934,8 @@ class ReplicaAlterLogDirsThreadTest {
// both partitions are delayed
val ResultWithPartitions(fetchRequest3Opt, partitionsWithError3) = thread.buildFetch(Map(
t1p0 -> PartitionFetchState(140, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching, lastFetchedEpoch = None),
t1p1 -> PartitionFetchState(160, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching, lastFetchedEpoch = None)))
t1p0 -> PartitionFetchState(Some(topicId), 140, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching, lastFetchedEpoch = None),
t1p1 -> PartitionFetchState(Some(topicId), 160, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching, lastFetchedEpoch = None)))
assertTrue(fetchRequest3Opt.isEmpty, "Expected no fetch requests since all partitions are delayed")
assertFalse(partitionsWithError3.nonEmpty)
}

View File

@ -78,8 +78,8 @@ class ReplicaFetcherThreadTest {
private val metadataCache = new ZkMetadataCache(0)
metadataCache.updateMetadata(0, updateMetadataRequest)
private def initialFetchState(fetchOffset: Long, leaderEpoch: Int = 1): InitialFetchState = {
InitialFetchState(leader = new BrokerEndPoint(0, "localhost", 9092),
private def initialFetchState(topicId: Option[Uuid], fetchOffset: Long, leaderEpoch: Int = 1): InitialFetchState = {
InitialFetchState(topicId = topicId, leader = new BrokerEndPoint(0, "localhost", 9092),
initOffset = fetchOffset, currentLeaderEpoch = leaderEpoch)
}
@ -157,9 +157,9 @@ class ReplicaFetcherThreadTest {
// topic 1 supports epoch, t2 doesn't.
thread.addPartitions(Map(
t1p0 -> initialFetchState(0L),
t1p1 -> initialFetchState(0L),
t2p1 -> initialFetchState(0L)))
t1p0 -> initialFetchState(Some(topicId1), 0L),
t1p1 -> initialFetchState(Some(topicId2), 0L),
t2p1 -> initialFetchState(Some(topicId2), 0L)))
assertPartitionStates(thread, shouldBeReadyForFetch = false, shouldBeTruncatingLog = true, shouldBeDelayed = false)
//Loop 1
@ -300,7 +300,7 @@ class ReplicaFetcherThreadTest {
val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, brokerEndPoint, new SystemTime())
val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, failedPartitions, replicaManager,
new Metrics, new SystemTime, UnboundedQuota, Some(mockNetwork))
thread.addPartitions(Map(t1p0 -> initialFetchState(0L), t1p1 -> initialFetchState(0L)))
thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), 0L), t1p1 -> initialFetchState(Some(topicId1), 0L)))
//Loop 1
thread.doWork()
@ -365,7 +365,7 @@ class ReplicaFetcherThreadTest {
val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime())
val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, failedPartitions, replicaManager,
new Metrics(), new SystemTime(), quota, Some(mockNetwork))
thread.addPartitions(Map(t1p0 -> initialFetchState(0L), t2p1 -> initialFetchState(0L)))
thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), 0L), t2p1 -> initialFetchState(Some(topicId2), 0L)))
//Run it
thread.doWork()
@ -420,7 +420,7 @@ class ReplicaFetcherThreadTest {
val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime())
val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, failedPartitions,
replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
thread.addPartitions(Map(t1p0 -> initialFetchState(0L), t2p1 -> initialFetchState(0L)))
thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), 0L), t2p1 -> initialFetchState(Some(topicId2), 0L)))
//Run it
thread.doWork()
@ -477,7 +477,7 @@ class ReplicaFetcherThreadTest {
// Create the fetcher thread
val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, brokerEndPoint, new SystemTime())
val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, failedPartitions, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
thread.addPartitions(Map(t1p0 -> initialFetchState(0L), t1p1 -> initialFetchState(0L)))
thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), 0L), t1p1 -> initialFetchState(Some(topicId1), 0L)))
// Loop 1 -- both topic partitions will need to fetch another leader epoch
thread.doWork()
@ -552,7 +552,7 @@ class ReplicaFetcherThreadTest {
val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, failedPartitions, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) {
override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = None
}
thread.addPartitions(Map(t1p0 -> initialFetchState(initialLEO), t1p1 -> initialFetchState(initialLEO)))
thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), initialLEO), t1p1 -> initialFetchState(Some(topicId1), initialLEO)))
val partitions = Set(t1p0, t1p1)
// Loop 1 -- both topic partitions skip epoch fetch and send fetch request since we can truncate
@ -665,7 +665,7 @@ class ReplicaFetcherThreadTest {
// Create the fetcher thread
val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, brokerEndPoint, new SystemTime())
val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, failedPartitions, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
thread.addPartitions(Map(t1p0 -> initialFetchState(0L), t1p1 -> initialFetchState(0L)))
thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), 0L), t1p1 -> initialFetchState(Some(topicId1), 0L)))
// Loop 1 -- both topic partitions will truncate to leader offset even though they don't know
// about leader epoch
@ -722,7 +722,7 @@ class ReplicaFetcherThreadTest {
//Create the thread
val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime())
val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, failedPartitions, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
thread.addPartitions(Map(t1p0 -> initialFetchState(initialFetchOffset)))
thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), initialFetchOffset)))
//Run it
thread.doWork()
@ -776,7 +776,7 @@ class ReplicaFetcherThreadTest {
//Create the thread
val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime())
val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, failedPartitions, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
thread.addPartitions(Map(t1p0 -> initialFetchState(0L), t1p1 -> initialFetchState(0L)))
thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), 0L), t1p1 -> initialFetchState(Some(topicId1), 0L)))
//Run thread 3 times
(0 to 3).foreach { _ =>
@ -834,7 +834,7 @@ class ReplicaFetcherThreadTest {
val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, failedPartitions, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
//When
thread.addPartitions(Map(t1p0 -> initialFetchState(0L), t1p1 -> initialFetchState(0L)))
thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), 0L), t1p1 -> initialFetchState(Some(topicId1), 0L)))
//Then all partitions should start in an TruncatingLog state
assertEquals(Option(Truncating), thread.fetchState(t1p0).map(_.state))
@ -889,7 +889,7 @@ class ReplicaFetcherThreadTest {
new SystemTime(), quota, Some(mockNetwork))
//When
thread.addPartitions(Map(t1p0 -> initialFetchState(0L), t1p1 -> initialFetchState(0L)))
thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), 0L), t1p1 -> initialFetchState(Some(topicId1), 0L)))
//When the epoch request is outstanding, remove one of the partitions to simulate a leader change. We do this via a callback passed to the mock thread
val partitionThatBecameLeader = t1p0

View File

@ -241,7 +241,7 @@ class ReplicaManagerTest {
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
rm.getPartitionOrException(new TopicPartition(topic, 0))
.localLogOrException
.localLogOrException
val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes()))
val appendResult = appendRecords(rm, new TopicPartition(topic, 0), records).onFire { response =>
@ -609,7 +609,7 @@ class ReplicaManagerTest {
.localLogOrException
// Append a couple of messages.
for(i <- 1 to 2) {
for (i <- 1 to 2) {
val records = TestUtils.singletonRecords(s"message $i".getBytes)
appendRecords(rm, new TopicPartition(topic, 0), records).onFire { response =>
assertEquals(Errors.NONE, response.error)
@ -1031,7 +1031,7 @@ class ReplicaManagerTest {
val controllerEpoch = 0
var leaderEpoch = 1
val leaderEpochIncrement = 2
val aliveBrokerIds = Seq[Integer] (followerBrokerId, leaderBrokerId)
val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
@ -1079,7 +1079,7 @@ class ReplicaManagerTest {
val leaderBrokerId = 1
val leaderEpoch = 1
val leaderEpochIncrement = 2
val aliveBrokerIds = Seq[Integer] (followerBrokerId, leaderBrokerId)
val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
@ -1724,7 +1724,7 @@ class ReplicaManagerTest {
offsetFromLeader: Long = 5,
leaderEpochFromLeader: Int = 3,
extraProps: Properties = new Properties(),
topicId: Option[Uuid] = None) : (ReplicaManager, LogManager) = {
topicId: Option[Uuid] = None): (ReplicaManager, LogManager) = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
props.asScala ++= extraProps.asScala
@ -1844,9 +1844,9 @@ class ReplicaManagerTest {
threadNamePrefix = Option(this.getClass.getName)) {
override protected def createReplicaFetcherManager(metrics: Metrics,
time: Time,
threadNamePrefix: Option[String],
replicationQuotaManager: ReplicationQuotaManager): ReplicaFetcherManager = {
time: Time,
threadNamePrefix: Option[String],
replicationQuotaManager: ReplicationQuotaManager): ReplicaFetcherManager = {
new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, replicationQuotaManager) {
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = {
@ -1857,6 +1857,7 @@ class ReplicaManagerTest {
// In case the thread starts before the partition is added by AbstractFetcherManager,
// add it here (it's a no-op if already added)
val initialOffset = InitialFetchState(
topicId = topicId,
leader = new BrokerEndPoint(0, "localhost", 9092),
initOffset = 0L, currentLeaderEpoch = leaderEpochInLeaderAndIsr)
addPartitions(Map(new TopicPartition(topic, topicPartition) -> initialOffset))
@ -3368,6 +3369,7 @@ class ReplicaManagerTest {
Mockito.verify(mockReplicaFetcherManager, Mockito.times(1))
.addFetcherForPartitions(
Map(topicPartition -> InitialFetchState(
topicId = Some(FOO_UUID),
leader = BrokerEndPoint(otherId, "localhost", 9093),
currentLeaderEpoch = 0,
initOffset = 0
@ -3407,6 +3409,7 @@ class ReplicaManagerTest {
Mockito.verify(mockReplicaFetcherManager, Mockito.times(1))
.addFetcherForPartitions(
Map(topicPartition -> InitialFetchState(
topicId = Some(FOO_UUID),
leader = BrokerEndPoint(otherId, "localhost", 9093),
currentLeaderEpoch = 1,
initOffset = 1
@ -3469,4 +3472,88 @@ class ReplicaManagerTest {
ClientQuotasImage.EMPTY
)
}
def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: AbstractFetcherManager[T],
tp: TopicPartition,
expectedTopicId: Option[Uuid]): Unit = {
val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp))
assertTrue(fetchState.isDefined)
assertEquals(expectedTopicId, fetchState.get.topicId)
}
@Test
def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
val aliveBrokersIds = Seq(0, 1)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
brokerId = 0, aliveBrokersIds)
try {
val tp = new TopicPartition(topic, 0)
val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0)
val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, aliveBrokersIds, leaderAndIsr)
val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
assertEquals(Errors.NONE, leaderAndIsrResponse1.error)
assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None)
val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, aliveBrokersIds, leaderAndIsr)
val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => ())
assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, Some(topicId))
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = {
val version = if (usesTopicIds) LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort
val topicId = if (usesTopicIds) this.topicId else Uuid.ZERO_UUID
val topicIdOpt = if (usesTopicIds) Some(topicId) else None
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
try {
val topicPartition = new TopicPartition(topic, 0)
val aliveBrokersIds = Seq(0, 1)
replicaManager.createPartition(topicPartition)
.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), None)
val tp = new TopicPartition(topic, 0)
val leaderAndIsr = new LeaderAndIsr(0, 0, aliveBrokersIds.toList, 0)
val leaderAndIsrRequest1 = leaderAndIsrRequest(topicId, tp, aliveBrokersIds, leaderAndIsr, version = version)
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
val partition = replicaManager.getPartitionOrException(tp)
assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == partition.log.get.dir.getParentFile).size)
// Append a couple of messages.
for (i <- 1 to 40) {
val records = TestUtils.singletonRecords(s"message $i".getBytes)
appendRecords(replicaManager, tp, records).onFire { response =>
assertEquals(Errors.NONE, response.error)
}
}
// Find the live and different folder.
val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ == partition.log.get.dir.getParentFile).head
assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath))
assertFetcherHasTopicId(replicaManager.replicaAlterLogDirsManager, partition.topicPartition, topicIdOpt)
// Make sure the future log is created.
replicaManager.futureLocalLogOrException(topicPartition)
assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
// Wait for the ReplicaAlterLogDirsThread to complete.
TestUtils.waitUntilTrue(() => {
replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.isEmpty
}, s"ReplicaAlterLogDirsThread should be gone")
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
}

View File

@ -179,7 +179,7 @@ public class ReplicaFetcherThreadBenchmark {
partition.makeFollower(partitionState, offsetCheckpoints, topicId);
pool.put(tp, partition);
initialFetchStates.put(tp, new InitialFetchState(new BrokerEndPoint(3, "host", 3000), 0, 0));
initialFetchStates.put(tp, new InitialFetchState(topicId, new BrokerEndPoint(3, "host", 3000), 0, 0));
BaseRecords fetched = new BaseRecords() {
@Override
public int sizeInBytes() {