MINOR: refactor replica last sent HW updates due to performance regression (#7671)

This change fixes a performance regression due to follower last seen highwatermark
handling introduced in 23beeea. maybeUpdateHwAndSendResponse is expensive for
brokers with high partition counts, as it requires a partition and a replica lookup for every
partition being fetched. This refactor moves the last seen watermark update into the follower
fetch state update where we have already looked up the partition and replica.

I've seen cases where maybeUpdateHwAndSendResponse is responsible 8% of CPU usage, not including the responseCallback call that is part of it.

I have benchmarked this change with `UpdateFollowerFetchStateBenchmark` and it adds 5ns
of overhead to Partition.updateFollowerFetchState, which is a rounding error compared to the
current overhead of maybeUpdateHwAndSendResponse.

Reviewers: David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Lucas Bradstreet 2019-11-12 21:21:18 -08:00 committed by Ismael Juma
parent 9a125a72a2
commit 1675115ec1
8 changed files with 57 additions and 54 deletions

View File

@ -562,9 +562,8 @@ class Partition(val topicPartition: TopicPartition,
followerFetchOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata,
followerStartOffset = Log.UnknownOffset,
followerFetchTimeMs = 0L,
leaderEndOffset = Log.UnknownOffset
)
replica.updateLastSentHighWatermark(0L)
leaderEndOffset = Log.UnknownOffset,
lastSentHighwatermark = 0L)
}
}
// we may need to increment high watermark since ISR could be down to 1
@ -624,7 +623,8 @@ class Partition(val topicPartition: TopicPartition,
followerFetchOffsetMetadata: LogOffsetMetadata,
followerStartOffset: Long,
followerFetchTimeMs: Long,
leaderEndOffset: Long): Boolean = {
leaderEndOffset: Long,
lastSentHighwatermark: Long): Boolean = {
getReplica(followerId) match {
case Some(followerReplica) =>
// No need to calculate low watermark if there is no delayed DeleteRecordsRequest
@ -634,7 +634,8 @@ class Partition(val topicPartition: TopicPartition,
followerFetchOffsetMetadata,
followerStartOffset,
followerFetchTimeMs,
leaderEndOffset)
leaderEndOffset,
lastSentHighwatermark)
val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
// check if the LW of the partition has incremented

View File

@ -71,7 +71,8 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
def updateFetchState(followerFetchOffsetMetadata: LogOffsetMetadata,
followerStartOffset: Long,
followerFetchTimeMs: Long,
leaderEndOffset: Long): Unit = {
leaderEndOffset: Long,
lastSentHighwatermark: Long): Unit = {
if (followerFetchOffsetMetadata.messageOffset >= leaderEndOffset)
_lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, followerFetchTimeMs)
else if (followerFetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)
@ -81,6 +82,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
_logEndOffsetMetadata = followerFetchOffsetMetadata
lastFetchLeaderLogEndOffset = leaderEndOffset
lastFetchTimeMs = followerFetchTimeMs
updateLastSentHighWatermark(lastSentHighwatermark)
trace(s"Updated state of replica to $this")
}
@ -92,7 +94,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
* When handling fetches, the last sent high watermark for a replica is checked to see if we should return immediately
* in order to propagate the HW more expeditiously. See KIP-392
*/
def updateLastSentHighWatermark(highWatermark: Long): Unit = {
private def updateLastSentHighWatermark(highWatermark: Long): Unit = {
_lastSentHighWatermark = highWatermark
trace(s"Updated HW of replica to $highWatermark")
}

View File

@ -907,16 +907,6 @@ class ReplicaManager(val config: KafkaConfig,
}
}
// Wrap the given callback function with another function that will update the HW for the remote follower
def maybeUpdateHwAndSendResponse(fetchPartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
if (isFromFollower) {
fetchPartitionData.foreach {
case (tp, partitionData) => updateFollowerHighWatermark(tp, replicaId, partitionData.highWatermark)
}
}
responseCallback(fetchPartitionData)
}
// respond immediately if 1) fetch request does not want to wait
// 2) fetch request does not require any data
// 3) has enough data to respond
@ -927,7 +917,7 @@ class ReplicaManager(val config: KafkaConfig,
tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica, isFromFollower && isAddingReplica(tp, replicaId))
}
maybeUpdateHwAndSendResponse(fetchPartitionData)
responseCallback(fetchPartitionData)
} else {
// construct the fetch results from the read results
val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
@ -940,7 +930,7 @@ class ReplicaManager(val config: KafkaConfig,
val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,
fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
maybeUpdateHwAndSendResponse)
responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
@ -1577,12 +1567,13 @@ class ReplicaManager(val config: KafkaConfig,
followerFetchOffsetMetadata = readResult.info.fetchOffsetMetadata,
followerStartOffset = readResult.followerLogStartOffset,
followerFetchTimeMs = readResult.fetchTimeMs,
leaderEndOffset = readResult.leaderLogEndOffset)) {
leaderEndOffset = readResult.leaderLogEndOffset,
lastSentHighwatermark = readResult.highWatermark)) {
readResult
} else {
warn(s"Leader $localBrokerId failed to record follower $followerId's position " +
s"${readResult.info.fetchOffsetMetadata.messageOffset} since the replica is not recognized to be " +
s"one of the assigned replicas ${partition.assignmentState.replicas.mkString(",")} " +
s"${readResult.info.fetchOffsetMetadata.messageOffset}, and last sent HW since the replica " +
s"is not recognized to be one of the assigned replicas ${partition.assignmentState.replicas.mkString(",")} " +
s"for partition $topicPartition. Empty records will be returned for this partition.")
readResult.withEmptyFetchInfo
}
@ -1595,15 +1586,6 @@ class ReplicaManager(val config: KafkaConfig,
}
}
private def updateFollowerHighWatermark(topicPartition: TopicPartition, followerId: Int, highWatermark: Long): Unit = {
nonOfflinePartition(topicPartition).flatMap(_.getReplica(followerId)) match {
case Some(replica) => replica.updateLastSentHighWatermark(highWatermark)
case None =>
warn(s"While updating the HW for follower $followerId for partition $topicPartition, " +
s"the replica could not be found.")
}
}
private def leaderPartitionsIterator: Iterator[Partition] =
nonOfflinePartitionsIterator.filter(_.leaderLogIfLocal.isDefined)

View File

@ -164,8 +164,7 @@ class DelayedFetchTest extends EasyMockSupport {
val follower = new Replica(replicaId, topicPartition)
followerHW.foreach(hw => {
follower.updateFetchState(LogOffsetMetadata.UnknownOffsetMetadata, 0L, 0L, 0L)
follower.updateLastSentHighWatermark(hw)
follower.updateFetchState(LogOffsetMetadata.UnknownOffsetMetadata, 0L, 0L, 0L, hw)
})
EasyMock.expect(partition.getReplica(replicaId))
.andReturn(Some(follower))

View File

@ -451,7 +451,8 @@ class PartitionTest extends AbstractPartitionTest {
followerFetchOffsetMetadata = fetchOffsetMetadata,
followerStartOffset = 0L,
followerFetchTimeMs = time.milliseconds(),
leaderEndOffset = partition.localLogOrException.logEndOffset)
leaderEndOffset = partition.localLogOrException.logEndOffset,
lastSentHighwatermark = partition.localLogOrException.highWatermark)
}
def fetchOffsetsForTimestamp(timestamp: Long, isolation: Option[IsolationLevel]): Either[ApiException, Option[TimestampAndOffset]] = {
@ -826,7 +827,8 @@ class PartitionTest extends AbstractPartitionTest {
followerFetchOffsetMetadata = fetchOffsetMetadata,
followerStartOffset = 0L,
followerFetchTimeMs = time.milliseconds(),
leaderEndOffset = partition.localLogOrException.logEndOffset)
leaderEndOffset = partition.localLogOrException.logEndOffset,
lastSentHighwatermark = partition.localLogOrException.highWatermark)
}
updateFollowerFetchState(follower2, LogOffsetMetadata(0))
@ -1061,7 +1063,8 @@ class PartitionTest extends AbstractPartitionTest {
followerFetchOffsetMetadata = LogOffsetMetadata(3),
followerStartOffset = 0L,
followerFetchTimeMs = time.milliseconds(),
leaderEndOffset = 6L)
leaderEndOffset = 6L,
lastSentHighwatermark = partition.localLogOrException.highWatermark)
assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
assertEquals(3L, remoteReplica.logEndOffset)
@ -1073,7 +1076,8 @@ class PartitionTest extends AbstractPartitionTest {
followerFetchOffsetMetadata = LogOffsetMetadata(6L),
followerStartOffset = 0L,
followerFetchTimeMs = time.milliseconds(),
leaderEndOffset = 6L)
leaderEndOffset = 6L,
lastSentHighwatermark = partition.localLogOrException.highWatermark)
assertEquals(time.milliseconds(), remoteReplica.lastCaughtUpTimeMs)
assertEquals(6L, remoteReplica.logEndOffset)
@ -1121,7 +1125,8 @@ class PartitionTest extends AbstractPartitionTest {
followerFetchOffsetMetadata = LogOffsetMetadata(3),
followerStartOffset = 0L,
followerFetchTimeMs = time.milliseconds(),
leaderEndOffset = 6L)
leaderEndOffset = 6L,
lastSentHighwatermark = partition.localLogOrException.highWatermark)
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
assertEquals(3L, remoteReplica.logEndOffset)
@ -1139,7 +1144,8 @@ class PartitionTest extends AbstractPartitionTest {
followerFetchOffsetMetadata = LogOffsetMetadata(10),
followerStartOffset = 0L,
followerFetchTimeMs = time.milliseconds(),
leaderEndOffset = 6L)
leaderEndOffset = 6L,
lastSentHighwatermark = partition.localLogOrException.highWatermark)
assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
assertEquals(10L, remoteReplica.logEndOffset)
@ -1192,7 +1198,8 @@ class PartitionTest extends AbstractPartitionTest {
followerFetchOffsetMetadata = LogOffsetMetadata(10),
followerStartOffset = 0L,
followerFetchTimeMs = time.milliseconds(),
leaderEndOffset = 10L)
leaderEndOffset = 10L,
lastSentHighwatermark = partition.localLogOrException.highWatermark)
// Follower state is updated, but the ISR has not expanded
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
@ -1303,7 +1310,8 @@ class PartitionTest extends AbstractPartitionTest {
followerFetchOffsetMetadata = LogOffsetMetadata(5),
followerStartOffset = 0L,
followerFetchTimeMs = firstFetchTimeMs,
leaderEndOffset = 10L)
leaderEndOffset = 10L,
lastSentHighwatermark = partition.localLogOrException.highWatermark)
assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
assertEquals(5L, partition.localLogOrException.highWatermark)
assertEquals(5L, remoteReplica.logEndOffset)
@ -1317,7 +1325,8 @@ class PartitionTest extends AbstractPartitionTest {
followerFetchOffsetMetadata = LogOffsetMetadata(10),
followerStartOffset = 0L,
followerFetchTimeMs = time.milliseconds(),
leaderEndOffset = 15L)
leaderEndOffset = 15L,
lastSentHighwatermark = partition.localLogOrException.highWatermark)
assertEquals(firstFetchTimeMs, remoteReplica.lastCaughtUpTimeMs)
assertEquals(10L, partition.localLogOrException.highWatermark)
assertEquals(10L, remoteReplica.logEndOffset)
@ -1373,7 +1382,8 @@ class PartitionTest extends AbstractPartitionTest {
followerFetchOffsetMetadata = LogOffsetMetadata(10),
followerStartOffset = 0L,
followerFetchTimeMs = time.milliseconds(),
leaderEndOffset = 10L)
leaderEndOffset = 10L,
lastSentHighwatermark = partition.localLogOrException.highWatermark)
assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
assertEquals(10L, partition.localLogOrException.highWatermark)
assertEquals(10L, remoteReplica.logEndOffset)

View File

@ -39,6 +39,7 @@ class IsrExpirationTest {
val replicaLagTimeMaxMs = 100L
val replicaFetchWaitMaxMs = 100
val leaderLogEndOffset = 20
val leaderLogHighWatermark = 20L
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
@ -85,7 +86,8 @@ class IsrExpirationTest {
followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 1),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = leaderLogEndOffset)
leaderEndOffset = leaderLogEndOffset,
lastSentHighwatermark = partition0.localLogOrException.highWatermark)
var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR)
@ -134,7 +136,8 @@ class IsrExpirationTest {
followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 2),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = leaderLogEndOffset)
leaderEndOffset = leaderLogEndOffset,
lastSentHighwatermark = partition0.localLogOrException.highWatermark)
// Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log.
// The replicas will no longer be in ISR. We do 2 fetches because we want to simulate the case where the replica is lagging but is not stuck
@ -148,7 +151,8 @@ class IsrExpirationTest {
followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 1),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = leaderLogEndOffset)
leaderEndOffset = leaderLogEndOffset,
lastSentHighwatermark = partition0.localLogOrException.highWatermark)
}
partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR)
@ -165,7 +169,8 @@ class IsrExpirationTest {
followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = leaderLogEndOffset)
leaderEndOffset = leaderLogEndOffset,
lastSentHighwatermark = partition0.localLogOrException.highWatermark)
}
partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR)
@ -190,7 +195,8 @@ class IsrExpirationTest {
followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = leaderLogEndOffset)
leaderEndOffset = leaderLogEndOffset,
lastSentHighwatermark = partition0.localLogOrException.highWatermark)
var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR)
@ -224,7 +230,8 @@ class IsrExpirationTest {
followerFetchOffsetMetadata = LogOffsetMetadata(0L),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = 0L)
leaderEndOffset = 0L,
lastSentHighwatermark = partition.localLogOrException.highWatermark)
// set the leader and its hw and the hw update time
partition.leaderReplicaIdOpt = Some(leaderId)
@ -236,6 +243,7 @@ class IsrExpirationTest {
EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLogEndOffset)).anyTimes()
EasyMock.expect(log.logEndOffset).andReturn(leaderLogEndOffset).anyTimes()
EasyMock.expect(log.highWatermark).andReturn(leaderLogHighWatermark).anyTimes()
EasyMock.replay(log)
log
}

View File

@ -141,7 +141,8 @@ class SimpleFetchTest {
followerFetchOffsetMetadata = leo,
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = leo.messageOffset)
leaderEndOffset = leo.messageOffset,
partition.localLogOrException.highWatermark)
}
@After

View File

@ -161,9 +161,9 @@ public class UpdateFollowerFetchStateBenchmark {
public void updateFollowerFetchStateBench() {
// measure the impact of two follower fetches on the leader
partition.updateFollowerFetchState(1, new LogOffsetMetadata(nextOffset, nextOffset, 0),
0, 1, nextOffset);
0, 1, nextOffset, nextOffset);
partition.updateFollowerFetchState(2, new LogOffsetMetadata(nextOffset, nextOffset, 0),
0, 1, nextOffset);
0, 1, nextOffset, nextOffset);
nextOffset++;
}
@ -173,8 +173,8 @@ public class UpdateFollowerFetchStateBenchmark {
// measure the impact of two follower fetches on the leader when the follower didn't
// end up fetching anything
partition.updateFollowerFetchState(1, new LogOffsetMetadata(nextOffset, nextOffset, 0),
0, 1, 100);
0, 1, 100, nextOffset);
partition.updateFollowerFetchState(2, new LogOffsetMetadata(nextOffset, nextOffset, 0),
0, 1, 100);
0, 1, 100, nextOffset);
}
}