KAFKA-15695: Update the local log start offset of a log after rebuilding the auxiliary state (#14649)

Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>,  Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Alexandre Dupriez <alexandre.dupriez@gmail.com>
This commit is contained in:
Nikhil Ramakrishnan 2023-12-12 16:13:42 +00:00 committed by GitHub
parent c5ee82cab4
commit be531c681c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 2 deletions

View File

@ -229,8 +229,11 @@ public class ReplicaFetcherTierStateMachine implements TierStateMachine {
Partition partition = replicaMgr.getPartitionOrException(topicPartition); Partition partition = replicaMgr.getPartitionOrException(topicPartition);
partition.truncateFullyAndStartAt(nextOffset, false, Option.apply(leaderLogStartOffset)); partition.truncateFullyAndStartAt(nextOffset, false, Option.apply(leaderLogStartOffset));
// Build leader epoch cache. // Increment start offsets
unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented); unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented);
unifiedLog.maybeIncrementLocalLogStartOffset(nextOffset, LeaderOffsetIncremented);
// Build leader epoch cache.
List<EpochEntry> epochs = readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata); List<EpochEntry> epochs = readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata);
if (unifiedLog.leaderEpochCache().isDefined()) { if (unifiedLog.leaderEpochCache().isDefined()) {
unifiedLog.leaderEpochCache().get().assign(epochs); unifiedLog.leaderEpochCache().get().assign(epochs);

View File

@ -972,7 +972,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
} }
} }
private def maybeIncrementLocalLogStartOffset(newLocalLogStartOffset: Long, reason: LogStartOffsetIncrementReason): Unit = { def maybeIncrementLocalLogStartOffset(newLocalLogStartOffset: Long, reason: LogStartOffsetIncrementReason): Unit = {
lock synchronized { lock synchronized {
if (newLocalLogStartOffset > localLogStartOffset()) { if (newLocalLogStartOffset > localLogStartOffset()) {
_localLogStartOffset = newLocalLogStartOffset _localLogStartOffset = newLocalLogStartOffset
@ -1815,6 +1815,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
leaderEpochCache.foreach(_.clearAndFlush()) leaderEpochCache.foreach(_.clearAndFlush())
producerStateManager.truncateFullyAndStartAt(newOffset) producerStateManager.truncateFullyAndStartAt(newOffset)
logStartOffset = logStartOffsetOpt.getOrElse(newOffset) logStartOffset = logStartOffsetOpt.getOrElse(newOffset)
if (remoteLogEnabled()) _localLogStartOffset = newOffset
rebuildProducerState(newOffset, producerStateManager) rebuildProducerState(newOffset, producerStateManager)
updateHighWatermark(localLog.logEndOffsetMetadata) updateHighWatermark(localLog.logEndOffsetMetadata)
} }

View File

@ -3646,6 +3646,12 @@ class UnifiedLogTest {
log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion) log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion)
assertEquals(newLogStartOffset, log.logStartOffset) assertEquals(newLogStartOffset, log.logStartOffset)
assertEquals(log.logStartOffset, log.localLogStartOffset()) assertEquals(log.logStartOffset, log.localLogStartOffset())
// Truncate the local log and verify that the offsets are updated to expected values
val newLocalLogStartOffset = 60L;
log.truncateFullyAndStartAt(newLocalLogStartOffset, Option.apply(newLogStartOffset))
assertEquals(newLogStartOffset, log.logStartOffset)
assertEquals(newLocalLogStartOffset, log.localLogStartOffset())
} }
private class MockLogOffsetsListener extends LogOffsetsListener { private class MockLogOffsetsListener extends LogOffsetsListener {