KAFKA-14704; Follower should truncate before incrementing high watermark (#13230)

When a leader becomes a follower, it is likely that it has uncommitted records in its log. When it reaches out to the leader, the leader will detect that they have diverged and it will return the diverging epoch and offset. The follower truncates it log based on this.

There is a small caveat in this process. When the leader return the diverging epoch and offset, it also includes its high watermark, low watermark, start offset and end offset. The current code in the `AbstractFetcherThread` works as follow. First it process the partition data and then it checks whether there is a diverging epoch/offset. The former may accidentally expose uncommitted records as this step updates the local watermark to whatever is received from the leader. As the follower, or the former leader, may have uncommitted records, it will be able to updated the high watermark to a larger offset if the leader has a higher watermark than the current local one. This result in exposing uncommitted records until the log is finally truncated. The time window is short but a fetch requests coming at the right time to the follower could read those records. This is especially true for clients out there which uses recent versions of the fetch request but without implementing KIP-320.

When this happens, the follower logs the following messages: 
* `Truncating XXX to offset 21434 below high watermark 21437`
* `Non-monotonic update of high watermark from (offset=21437 segment=[20998:98390]) to (offset=21434 segment=[20998:97843])`.

This patch proposes to mitigate the issue by starting by checking on whether a diverging epoch/offset is provided by the leader and skip processing the partition data if it is. This basically means that the first fetch request will result in truncating the log and a subsequent fetch request will update the low/high watermarks.

Reviewers: Ritika Reddy <rreddy@confluent.io>, Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
David Jacot 2023-02-14 09:54:32 +01:00
parent 6a86e54c76
commit b8e01e2406
3 changed files with 185 additions and 32 deletions

View File

@ -331,33 +331,39 @@ abstract class AbstractFetcherThread(name: String,
Errors.forCode(partitionData.errorCode) match {
case Errors.NONE =>
try {
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
partitionData)
if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
// If a diverging epoch is present, we truncate the log of the replica
// but we don't process the partition data in order to not update the
// low/high watermarks until the truncation is actually done. Those will
// be updated by the next fetch.
divergingEndOffsets += topicPartition -> new EpochEndOffset()
.setPartition(topicPartition.partition)
.setErrorCode(Errors.NONE.code)
.setLeaderEpoch(partitionData.divergingEpoch.epoch)
.setEndOffset(partitionData.divergingEpoch.endOffset)
} else {
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
val logAppendInfoOpt = processPartitionData(
topicPartition,
currentFetchState.fetchOffset,
partitionData
)
logAppendInfoOpt.foreach { logAppendInfo =>
val validBytes = logAppendInfo.validBytes
val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
logAppendInfoOpt.foreach { logAppendInfo =>
val validBytes = logAppendInfo.validBytes
val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
// ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
if (validBytes > 0 && partitionStates.contains(topicPartition)) {
// Update partitionStates only if there is no exception during processPartitionData
val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
currentFetchState.currentLeaderEpoch, state = Fetching,
logAppendInfo.lastLeaderEpoch)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
fetcherStats.byteRate.mark(validBytes)
}
}
if (leader.isTruncationOnFetchSupported) {
FetchResponse.divergingEpoch(partitionData).ifPresent { divergingEpoch =>
divergingEndOffsets += topicPartition -> new EpochEndOffset()
.setPartition(topicPartition.partition)
.setErrorCode(Errors.NONE.code)
.setLeaderEpoch(divergingEpoch.epoch)
.setEndOffset(divergingEpoch.endOffset)
// ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
if (validBytes > 0 && partitionStates.contains(topicPartition)) {
// Update partitionStates only if there is no exception during processPartitionData
val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
currentFetchState.currentLeaderEpoch, state = Fetching,
logAppendInfo.lastLeaderEpoch)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
fetcherStats.byteRate.mark(validBytes)
}
}
}
} catch {

View File

@ -999,6 +999,77 @@ class AbstractFetcherThreadTest {
fetcher.verifyLastFetchedEpoch(partition, Some(5))
}
@Test
def testTruncateOnFetchDoesNotProcessPartitionData(): Unit = {
assumeTrue(truncateOnFetch)
val partition = new TopicPartition("topic", 0)
var truncateCalls = 0
var processPartitionDataCalls = 0
val fetcher = new MockFetcherThread(new MockLeaderEndPoint) {
override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = {
processPartitionDataCalls += 1
super.processPartitionData(topicPartition, fetchOffset, partitionData)
}
override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = {
truncateCalls += 1
super.truncate(topicPartition, truncationState)
}
}
val replicaLog = Seq(
mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
mkBatch(baseOffset = 1, leaderEpoch = 0, new SimpleRecord("b".getBytes)),
mkBatch(baseOffset = 2, leaderEpoch = 2, new SimpleRecord("c".getBytes)),
mkBatch(baseOffset = 3, leaderEpoch = 4, new SimpleRecord("d".getBytes)),
mkBatch(baseOffset = 4, leaderEpoch = 4, new SimpleRecord("e".getBytes)),
mkBatch(baseOffset = 5, leaderEpoch = 4, new SimpleRecord("f".getBytes)),
)
val replicaState = PartitionState(replicaLog, leaderEpoch = 5, highWatermark = 1L)
fetcher.setReplicaState(partition, replicaState)
fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 5)))
assertEquals(6L, replicaState.logEndOffset)
fetcher.verifyLastFetchedEpoch(partition, expectedEpoch = Some(4))
val leaderLog = Seq(
mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
mkBatch(baseOffset = 1, leaderEpoch = 0, new SimpleRecord("b".getBytes)),
mkBatch(baseOffset = 2, leaderEpoch = 2, new SimpleRecord("c".getBytes)),
mkBatch(baseOffset = 3, leaderEpoch = 5, new SimpleRecord("g".getBytes)),
mkBatch(baseOffset = 4, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
)
val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark = 4L)
fetcher.mockLeader.setLeaderState(partition, leaderState)
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
// The first fetch should result in truncating the follower's log and
// it should not process the data hence not update the high watermarks.
fetcher.doWork()
assertEquals(1, truncateCalls)
assertEquals(0, processPartitionDataCalls)
assertEquals(3L, replicaState.logEndOffset)
assertEquals(1L, replicaState.highWatermark)
// Truncate should have been called only once and process partition data
// should have been called at least once. The log end offset and the high
// watermark are updated.
TestUtils.waitUntilTrue(() => {
fetcher.doWork()
fetcher.replicaPartitionState(partition).log == fetcher.mockLeader.leaderPartitionState(partition).log
}, "Failed to reconcile leader and follower logs")
fetcher.verifyLastFetchedEpoch(partition, Some(5))
assertEquals(1, truncateCalls)
assertTrue(processPartitionDataCalls >= 1)
assertEquals(5L, replicaState.logEndOffset)
assertEquals(4L, replicaState.highWatermark)
}
@Test
def testMaybeUpdateTopicIds(): Unit = {
val partition = new TopicPartition("topic1", 0)
@ -1287,13 +1358,8 @@ class AbstractFetcherThreadTest {
val state = replicaPartitionState(topicPartition)
if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
val divergingEpoch = partitionData.divergingEpoch
truncateOnFetchResponse(Map(topicPartition -> new EpochEndOffset()
.setPartition(topicPartition.partition)
.setErrorCode(Errors.NONE.code)
.setLeaderEpoch(divergingEpoch.epoch)
.setEndOffset(divergingEpoch.endOffset)))
return None
throw new IllegalStateException("processPartitionData should not be called for a partition with " +
"a diverging epoch.")
}
// Throw exception if the fetchOffset does not match the fetcherThread partition state

View File

@ -659,6 +659,87 @@ class ReplicaFetcherThreadTest {
partitions.foreach { tp => assertEquals(Fetching, thread.fetchState(tp).get.state) }
}
@Test
def testTruncateOnFetchDoesNotUpdateHighWatermark(): Unit = {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
val quota: ReplicationQuotaManager = mock(classOf[ReplicationQuotaManager])
val logManager: LogManager = mock(classOf[LogManager])
val log: UnifiedLog = mock(classOf[UnifiedLog])
val partition: Partition = mock(classOf[Partition])
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
val logEndOffset = 150
val highWatermark = 130
when(log.highWatermark).thenReturn(highWatermark)
when(log.latestEpoch).thenReturn(Some(5))
when(log.endOffsetForEpoch(4)).thenReturn(Some(OffsetAndEpoch(149, 4)))
when(log.logEndOffset).thenReturn(logEndOffset)
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.logManager).thenReturn(logManager)
when(replicaManager.localLogOrException(t1p0)).thenReturn(log)
when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition)
when(partition.localLogOrException).thenReturn(log)
when(partition.appendRecordsToFollowerOrFutureReplica(any(), any())).thenReturn(None)
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ")
val mockNetwork = new MockBlockingSender(
Collections.emptyMap(),
brokerEndPoint,
new SystemTime()
)
val leader = new RemoteLeaderEndPoint(
logContext.logPrefix,
mockNetwork,
new FetchSessionHandler(logContext, brokerEndPoint.id),
config,
replicaManager,
quota,
() => config.interBrokerProtocolVersion
)
val thread = new ReplicaFetcherThread(
"fetcher-thread",
leader,
config,
failedPartitions,
replicaManager,
quota,
logContext.logPrefix,
() => config.interBrokerProtocolVersion
)
thread.addPartitions(Map(
t1p0 -> initialFetchState(Some(topicId1), logEndOffset))
)
// Prepare the fetch response data.
mockNetwork.setFetchPartitionDataForNextResponse(Map(
t1p0 -> new FetchResponseData.PartitionData()
.setPartitionIndex(t1p0.partition)
.setLastStableOffset(0)
.setLogStartOffset(0)
.setHighWatermark(160) // HWM is higher on the leader.
.setDivergingEpoch(new FetchResponseData.EpochEndOffset()
.setEpoch(4)
.setEndOffset(140))
))
mockNetwork.setIdsForNextResponse(topicIds)
// Sends the fetch request and processes the response. This should truncate the
// log but it should not update the high watermark.
thread.doWork()
assertEquals(1, mockNetwork.fetchCount)
verify(partition, times(1)).truncateTo(140, false)
verify(log, times(0)).maybeUpdateHighWatermark(anyLong())
}
@Test
def shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20(): Unit = {