diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 85afe1102cd..7c452b0923d 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -562,9 +562,8 @@ class Partition(val topicPartition: TopicPartition, followerFetchOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata, followerStartOffset = Log.UnknownOffset, followerFetchTimeMs = 0L, - leaderEndOffset = Log.UnknownOffset - ) - replica.updateLastSentHighWatermark(0L) + leaderEndOffset = Log.UnknownOffset, + lastSentHighwatermark = 0L) } } // we may need to increment high watermark since ISR could be down to 1 @@ -624,7 +623,8 @@ class Partition(val topicPartition: TopicPartition, followerFetchOffsetMetadata: LogOffsetMetadata, followerStartOffset: Long, followerFetchTimeMs: Long, - leaderEndOffset: Long): Boolean = { + leaderEndOffset: Long, + lastSentHighwatermark: Long): Boolean = { getReplica(followerId) match { case Some(followerReplica) => // No need to calculate low watermark if there is no delayed DeleteRecordsRequest @@ -634,7 +634,8 @@ class Partition(val topicPartition: TopicPartition, followerFetchOffsetMetadata, followerStartOffset, followerFetchTimeMs, - leaderEndOffset) + leaderEndOffset, + lastSentHighwatermark) val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L // check if the LW of the partition has incremented diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 5504db58694..f9de7ba784c 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -71,7 +71,8 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log def updateFetchState(followerFetchOffsetMetadata: LogOffsetMetadata, followerStartOffset: Long, followerFetchTimeMs: Long, - leaderEndOffset: Long): Unit = { + leaderEndOffset: Long, + lastSentHighwatermark: Long): Unit = { if (followerFetchOffsetMetadata.messageOffset >= leaderEndOffset) _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, followerFetchTimeMs) else if (followerFetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset) @@ -81,6 +82,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log _logEndOffsetMetadata = followerFetchOffsetMetadata lastFetchLeaderLogEndOffset = leaderEndOffset lastFetchTimeMs = followerFetchTimeMs + updateLastSentHighWatermark(lastSentHighwatermark) trace(s"Updated state of replica to $this") } @@ -92,7 +94,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log * When handling fetches, the last sent high watermark for a replica is checked to see if we should return immediately * in order to propagate the HW more expeditiously. See KIP-392 */ - def updateLastSentHighWatermark(highWatermark: Long): Unit = { + private def updateLastSentHighWatermark(highWatermark: Long): Unit = { _lastSentHighWatermark = highWatermark trace(s"Updated HW of replica to $highWatermark") } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 68b64a53558..7ccf9ba3a9e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -907,16 +907,6 @@ class ReplicaManager(val config: KafkaConfig, } } - // Wrap the given callback function with another function that will update the HW for the remote follower - def maybeUpdateHwAndSendResponse(fetchPartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = { - if (isFromFollower) { - fetchPartitionData.foreach { - case (tp, partitionData) => updateFollowerHighWatermark(tp, replicaId, partitionData.highWatermark) - } - } - responseCallback(fetchPartitionData) - } - // respond immediately if 1) fetch request does not want to wait // 2) fetch request does not require any data // 3) has enough data to respond @@ -927,7 +917,7 @@ class ReplicaManager(val config: KafkaConfig, tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records, result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica, isFromFollower && isAddingReplica(tp, replicaId)) } - maybeUpdateHwAndSendResponse(fetchPartitionData) + responseCallback(fetchPartitionData) } else { // construct the fetch results from the read results val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)] @@ -940,7 +930,7 @@ class ReplicaManager(val config: KafkaConfig, val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus) val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata, - maybeUpdateHwAndSendResponse) + responseCallback) // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } @@ -1577,12 +1567,13 @@ class ReplicaManager(val config: KafkaConfig, followerFetchOffsetMetadata = readResult.info.fetchOffsetMetadata, followerStartOffset = readResult.followerLogStartOffset, followerFetchTimeMs = readResult.fetchTimeMs, - leaderEndOffset = readResult.leaderLogEndOffset)) { + leaderEndOffset = readResult.leaderLogEndOffset, + lastSentHighwatermark = readResult.highWatermark)) { readResult } else { warn(s"Leader $localBrokerId failed to record follower $followerId's position " + - s"${readResult.info.fetchOffsetMetadata.messageOffset} since the replica is not recognized to be " + - s"one of the assigned replicas ${partition.assignmentState.replicas.mkString(",")} " + + s"${readResult.info.fetchOffsetMetadata.messageOffset}, and last sent HW since the replica " + + s"is not recognized to be one of the assigned replicas ${partition.assignmentState.replicas.mkString(",")} " + s"for partition $topicPartition. Empty records will be returned for this partition.") readResult.withEmptyFetchInfo } @@ -1595,15 +1586,6 @@ class ReplicaManager(val config: KafkaConfig, } } - private def updateFollowerHighWatermark(topicPartition: TopicPartition, followerId: Int, highWatermark: Long): Unit = { - nonOfflinePartition(topicPartition).flatMap(_.getReplica(followerId)) match { - case Some(replica) => replica.updateLastSentHighWatermark(highWatermark) - case None => - warn(s"While updating the HW for follower $followerId for partition $topicPartition, " + - s"the replica could not be found.") - } - } - private def leaderPartitionsIterator: Iterator[Partition] = nonOfflinePartitionsIterator.filter(_.leaderLogIfLocal.isDefined) diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index 01167422191..a9b671de0b3 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -164,8 +164,7 @@ class DelayedFetchTest extends EasyMockSupport { val follower = new Replica(replicaId, topicPartition) followerHW.foreach(hw => { - follower.updateFetchState(LogOffsetMetadata.UnknownOffsetMetadata, 0L, 0L, 0L) - follower.updateLastSentHighWatermark(hw) + follower.updateFetchState(LogOffsetMetadata.UnknownOffsetMetadata, 0L, 0L, 0L, hw) }) EasyMock.expect(partition.getReplica(replicaId)) .andReturn(Some(follower)) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 4c527d2538c..14457715104 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -451,7 +451,8 @@ class PartitionTest extends AbstractPartitionTest { followerFetchOffsetMetadata = fetchOffsetMetadata, followerStartOffset = 0L, followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = partition.localLogOrException.logEndOffset) + leaderEndOffset = partition.localLogOrException.logEndOffset, + lastSentHighwatermark = partition.localLogOrException.highWatermark) } def fetchOffsetsForTimestamp(timestamp: Long, isolation: Option[IsolationLevel]): Either[ApiException, Option[TimestampAndOffset]] = { @@ -826,7 +827,8 @@ class PartitionTest extends AbstractPartitionTest { followerFetchOffsetMetadata = fetchOffsetMetadata, followerStartOffset = 0L, followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = partition.localLogOrException.logEndOffset) + leaderEndOffset = partition.localLogOrException.logEndOffset, + lastSentHighwatermark = partition.localLogOrException.highWatermark) } updateFollowerFetchState(follower2, LogOffsetMetadata(0)) @@ -1061,7 +1063,8 @@ class PartitionTest extends AbstractPartitionTest { followerFetchOffsetMetadata = LogOffsetMetadata(3), followerStartOffset = 0L, followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 6L) + leaderEndOffset = 6L, + lastSentHighwatermark = partition.localLogOrException.highWatermark) assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs) assertEquals(3L, remoteReplica.logEndOffset) @@ -1073,7 +1076,8 @@ class PartitionTest extends AbstractPartitionTest { followerFetchOffsetMetadata = LogOffsetMetadata(6L), followerStartOffset = 0L, followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 6L) + leaderEndOffset = 6L, + lastSentHighwatermark = partition.localLogOrException.highWatermark) assertEquals(time.milliseconds(), remoteReplica.lastCaughtUpTimeMs) assertEquals(6L, remoteReplica.logEndOffset) @@ -1121,7 +1125,8 @@ class PartitionTest extends AbstractPartitionTest { followerFetchOffsetMetadata = LogOffsetMetadata(3), followerStartOffset = 0L, followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 6L) + leaderEndOffset = 6L, + lastSentHighwatermark = partition.localLogOrException.highWatermark) assertEquals(Set(brokerId), partition.inSyncReplicaIds) assertEquals(3L, remoteReplica.logEndOffset) @@ -1139,7 +1144,8 @@ class PartitionTest extends AbstractPartitionTest { followerFetchOffsetMetadata = LogOffsetMetadata(10), followerStartOffset = 0L, followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 6L) + leaderEndOffset = 6L, + lastSentHighwatermark = partition.localLogOrException.highWatermark) assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds) assertEquals(10L, remoteReplica.logEndOffset) @@ -1192,7 +1198,8 @@ class PartitionTest extends AbstractPartitionTest { followerFetchOffsetMetadata = LogOffsetMetadata(10), followerStartOffset = 0L, followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 10L) + leaderEndOffset = 10L, + lastSentHighwatermark = partition.localLogOrException.highWatermark) // Follower state is updated, but the ISR has not expanded assertEquals(Set(brokerId), partition.inSyncReplicaIds) @@ -1303,7 +1310,8 @@ class PartitionTest extends AbstractPartitionTest { followerFetchOffsetMetadata = LogOffsetMetadata(5), followerStartOffset = 0L, followerFetchTimeMs = firstFetchTimeMs, - leaderEndOffset = 10L) + leaderEndOffset = 10L, + lastSentHighwatermark = partition.localLogOrException.highWatermark) assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs) assertEquals(5L, partition.localLogOrException.highWatermark) assertEquals(5L, remoteReplica.logEndOffset) @@ -1317,7 +1325,8 @@ class PartitionTest extends AbstractPartitionTest { followerFetchOffsetMetadata = LogOffsetMetadata(10), followerStartOffset = 0L, followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 15L) + leaderEndOffset = 15L, + lastSentHighwatermark = partition.localLogOrException.highWatermark) assertEquals(firstFetchTimeMs, remoteReplica.lastCaughtUpTimeMs) assertEquals(10L, partition.localLogOrException.highWatermark) assertEquals(10L, remoteReplica.logEndOffset) @@ -1373,7 +1382,8 @@ class PartitionTest extends AbstractPartitionTest { followerFetchOffsetMetadata = LogOffsetMetadata(10), followerStartOffset = 0L, followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 10L) + leaderEndOffset = 10L, + lastSentHighwatermark = partition.localLogOrException.highWatermark) assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs) assertEquals(10L, partition.localLogOrException.highWatermark) assertEquals(10L, remoteReplica.logEndOffset) diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala index fea50b8e257..bcf8c02393a 100644 --- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala @@ -39,6 +39,7 @@ class IsrExpirationTest { val replicaLagTimeMaxMs = 100L val replicaFetchWaitMaxMs = 100 val leaderLogEndOffset = 20 + val leaderLogHighWatermark = 20L val overridingProps = new Properties() overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString) @@ -85,7 +86,8 @@ class IsrExpirationTest { followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 1), followerStartOffset = 0L, followerFetchTimeMs= time.milliseconds, - leaderEndOffset = leaderLogEndOffset) + leaderEndOffset = leaderLogEndOffset, + lastSentHighwatermark = partition0.localLogOrException.highWatermark) var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR) @@ -134,7 +136,8 @@ class IsrExpirationTest { followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 2), followerStartOffset = 0L, followerFetchTimeMs= time.milliseconds, - leaderEndOffset = leaderLogEndOffset) + leaderEndOffset = leaderLogEndOffset, + lastSentHighwatermark = partition0.localLogOrException.highWatermark) // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log. // The replicas will no longer be in ISR. We do 2 fetches because we want to simulate the case where the replica is lagging but is not stuck @@ -148,7 +151,8 @@ class IsrExpirationTest { followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 1), followerStartOffset = 0L, followerFetchTimeMs= time.milliseconds, - leaderEndOffset = leaderLogEndOffset) + leaderEndOffset = leaderLogEndOffset, + lastSentHighwatermark = partition0.localLogOrException.highWatermark) } partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR) @@ -165,7 +169,8 @@ class IsrExpirationTest { followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset), followerStartOffset = 0L, followerFetchTimeMs= time.milliseconds, - leaderEndOffset = leaderLogEndOffset) + leaderEndOffset = leaderLogEndOffset, + lastSentHighwatermark = partition0.localLogOrException.highWatermark) } partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR) @@ -190,7 +195,8 @@ class IsrExpirationTest { followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset), followerStartOffset = 0L, followerFetchTimeMs= time.milliseconds, - leaderEndOffset = leaderLogEndOffset) + leaderEndOffset = leaderLogEndOffset, + lastSentHighwatermark = partition0.localLogOrException.highWatermark) var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR) @@ -224,7 +230,8 @@ class IsrExpirationTest { followerFetchOffsetMetadata = LogOffsetMetadata(0L), followerStartOffset = 0L, followerFetchTimeMs= time.milliseconds, - leaderEndOffset = 0L) + leaderEndOffset = 0L, + lastSentHighwatermark = partition.localLogOrException.highWatermark) // set the leader and its hw and the hw update time partition.leaderReplicaIdOpt = Some(leaderId) @@ -236,6 +243,7 @@ class IsrExpirationTest { EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes() EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLogEndOffset)).anyTimes() EasyMock.expect(log.logEndOffset).andReturn(leaderLogEndOffset).anyTimes() + EasyMock.expect(log.highWatermark).andReturn(leaderLogHighWatermark).anyTimes() EasyMock.replay(log) log } diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 898bc3f7339..b32054667d5 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -141,7 +141,8 @@ class SimpleFetchTest { followerFetchOffsetMetadata = leo, followerStartOffset = 0L, followerFetchTimeMs= time.milliseconds, - leaderEndOffset = leo.messageOffset) + leaderEndOffset = leo.messageOffset, + partition.localLogOrException.highWatermark) } @After diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java index 35375a4b4f1..c3e074685b5 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java @@ -161,9 +161,9 @@ public class UpdateFollowerFetchStateBenchmark { public void updateFollowerFetchStateBench() { // measure the impact of two follower fetches on the leader partition.updateFollowerFetchState(1, new LogOffsetMetadata(nextOffset, nextOffset, 0), - 0, 1, nextOffset); + 0, 1, nextOffset, nextOffset); partition.updateFollowerFetchState(2, new LogOffsetMetadata(nextOffset, nextOffset, 0), - 0, 1, nextOffset); + 0, 1, nextOffset, nextOffset); nextOffset++; } @@ -173,8 +173,8 @@ public class UpdateFollowerFetchStateBenchmark { // measure the impact of two follower fetches on the leader when the follower didn't // end up fetching anything partition.updateFollowerFetchState(1, new LogOffsetMetadata(nextOffset, nextOffset, 0), - 0, 1, 100); + 0, 1, 100, nextOffset); partition.updateFollowerFetchState(2, new LogOffsetMetadata(nextOffset, nextOffset, 0), - 0, 1, 100); + 0, 1, 100, nextOffset); } }