KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table (#15631)

Co-authored-by: hzh0425 <642256541@qq.com>

Reviewers: Luke Chen <showuon@gmail.com>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Kamal Chandraprakash 2024-04-17 22:11:29 +05:30
parent f6bf85edbf
commit d092787487
2 changed files with 41 additions and 7 deletions

View File

@ -1511,10 +1511,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
}
localLog.checkIfMemoryMappedBufferClosed()
// remove the segments for lookups
localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason)
if (segmentsToDelete.nonEmpty) {
// increment the local-log-start-offset or log-start-offset before removing the segment for lookups
val newLocalLogStartOffset = localLog.segments.higherSegment(segmentsToDelete.last.baseOffset()).get.baseOffset()
incrementStartOffset(newLocalLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion)
// remove the segments for lookups
localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason)
}
deleteProducerSnapshots(deletable, asyncDelete = true)
incrementStartOffset(localLog.segments.firstSegmentBaseOffset.getAsLong, LogStartOffsetIncrementReason.SegmentDeletion)
}
numToDelete
}

View File

@ -40,7 +40,7 @@ import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEn
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyLong}
import org.mockito.Mockito.{doThrow, mock, spy, when}
@ -951,8 +951,9 @@ class UnifiedLogTest {
assertEquals(0, lastSeq)
}
@Test
def testRetentionDeletesProducerStateSnapshots(): Unit = {
@ParameterizedTest(name = "testRetentionDeletesProducerStateSnapshots with createEmptyActiveSegment: {0}")
@ValueSource(booleans = Array(true, false))
def testRetentionDeletesProducerStateSnapshots(createEmptyActiveSegment: Boolean): Unit = {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0)
val log = createLog(logDir, logConfig)
val pid1 = 1L
@ -966,10 +967,14 @@ class UnifiedLogTest {
log.roll()
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), producerId = pid1,
producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
if (createEmptyActiveSegment) {
log.roll()
}
log.updateHighWatermark(log.logEndOffset)
assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
val numProducerSnapshots = if (createEmptyActiveSegment) 3 else 2
assertEquals(numProducerSnapshots, ProducerStateManager.listSnapshotFiles(logDir).size)
// Sleep to breach the retention period
mockTime.sleep(1000 * 60 + 1)
log.deleteOldSegments()
@ -977,6 +982,7 @@ class UnifiedLogTest {
mockTime.sleep(1)
assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size,
"expect a single producer state snapshot remaining")
assertEquals(3, log.logStartOffset)
}
@Test
@ -4091,6 +4097,30 @@ class UnifiedLogTest {
assertEquals(1, log.logSegments.size)
}
@Test
def testIncrementLocalLogStartOffsetAfterLocalLogDeletion(): Unit = {
val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
var offset = 0L
for(_ <- 0 until 50) {
val records = TestUtils.singletonRecords("test".getBytes())
val info = log.appendAsLeader(records, leaderEpoch = 0)
offset = info.lastOffset
if (offset != 0 && offset % 10 == 0)
log.roll()
}
assertEquals(5, log.logSegments.size)
log.updateHighWatermark(log.logEndOffset)
// simulate calls to upload 3 segments to remote storage
log.updateHighestOffsetInRemoteStorage(30)
log.deleteOldSegments()
assertEquals(2, log.logSegments.size())
assertEquals(0, log.logStartOffset)
assertEquals(31, log.localLogStartOffset())
}
private def appendTransactionalToBuffer(buffer: ByteBuffer,
producerId: Long,
producerEpoch: Short,