mirror of https://github.com/apache/kafka.git
KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table (#15631)
Co-authored-by: hzh0425 <642256541@qq.com> Reviewers: Luke Chen <showuon@gmail.com>, Jun Rao <junrao@gmail.com>
This commit is contained in:
parent
f6bf85edbf
commit
d092787487
|
|
@ -1511,10 +1511,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
|
|||
}
|
||||
}
|
||||
localLog.checkIfMemoryMappedBufferClosed()
|
||||
// remove the segments for lookups
|
||||
localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason)
|
||||
if (segmentsToDelete.nonEmpty) {
|
||||
// increment the local-log-start-offset or log-start-offset before removing the segment for lookups
|
||||
val newLocalLogStartOffset = localLog.segments.higherSegment(segmentsToDelete.last.baseOffset()).get.baseOffset()
|
||||
incrementStartOffset(newLocalLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion)
|
||||
// remove the segments for lookups
|
||||
localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason)
|
||||
}
|
||||
deleteProducerSnapshots(deletable, asyncDelete = true)
|
||||
incrementStartOffset(localLog.segments.firstSegmentBaseOffset.getAsLong, LogStartOffsetIncrementReason.SegmentDeletion)
|
||||
}
|
||||
numToDelete
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEn
|
|||
import org.junit.jupiter.api.Assertions._
|
||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
|
||||
import org.junit.jupiter.params.ParameterizedTest
|
||||
import org.junit.jupiter.params.provider.EnumSource
|
||||
import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
|
||||
import org.mockito.ArgumentMatchers
|
||||
import org.mockito.ArgumentMatchers.{any, anyLong}
|
||||
import org.mockito.Mockito.{doThrow, mock, spy, when}
|
||||
|
|
@ -951,8 +951,9 @@ class UnifiedLogTest {
|
|||
assertEquals(0, lastSeq)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testRetentionDeletesProducerStateSnapshots(): Unit = {
|
||||
@ParameterizedTest(name = "testRetentionDeletesProducerStateSnapshots with createEmptyActiveSegment: {0}")
|
||||
@ValueSource(booleans = Array(true, false))
|
||||
def testRetentionDeletesProducerStateSnapshots(createEmptyActiveSegment: Boolean): Unit = {
|
||||
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0)
|
||||
val log = createLog(logDir, logConfig)
|
||||
val pid1 = 1L
|
||||
|
|
@ -966,10 +967,14 @@ class UnifiedLogTest {
|
|||
log.roll()
|
||||
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), producerId = pid1,
|
||||
producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
|
||||
if (createEmptyActiveSegment) {
|
||||
log.roll()
|
||||
}
|
||||
|
||||
log.updateHighWatermark(log.logEndOffset)
|
||||
|
||||
assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
|
||||
val numProducerSnapshots = if (createEmptyActiveSegment) 3 else 2
|
||||
assertEquals(numProducerSnapshots, ProducerStateManager.listSnapshotFiles(logDir).size)
|
||||
// Sleep to breach the retention period
|
||||
mockTime.sleep(1000 * 60 + 1)
|
||||
log.deleteOldSegments()
|
||||
|
|
@ -977,6 +982,7 @@ class UnifiedLogTest {
|
|||
mockTime.sleep(1)
|
||||
assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size,
|
||||
"expect a single producer state snapshot remaining")
|
||||
assertEquals(3, log.logStartOffset)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -4091,6 +4097,30 @@ class UnifiedLogTest {
|
|||
assertEquals(1, log.logSegments.size)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testIncrementLocalLogStartOffsetAfterLocalLogDeletion(): Unit = {
|
||||
val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
|
||||
val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
|
||||
|
||||
var offset = 0L
|
||||
for(_ <- 0 until 50) {
|
||||
val records = TestUtils.singletonRecords("test".getBytes())
|
||||
val info = log.appendAsLeader(records, leaderEpoch = 0)
|
||||
offset = info.lastOffset
|
||||
if (offset != 0 && offset % 10 == 0)
|
||||
log.roll()
|
||||
}
|
||||
assertEquals(5, log.logSegments.size)
|
||||
log.updateHighWatermark(log.logEndOffset)
|
||||
// simulate calls to upload 3 segments to remote storage
|
||||
log.updateHighestOffsetInRemoteStorage(30)
|
||||
|
||||
log.deleteOldSegments()
|
||||
assertEquals(2, log.logSegments.size())
|
||||
assertEquals(0, log.logStartOffset)
|
||||
assertEquals(31, log.localLogStartOffset())
|
||||
}
|
||||
|
||||
private def appendTransactionalToBuffer(buffer: ByteBuffer,
|
||||
producerId: Long,
|
||||
producerEpoch: Short,
|
||||
|
|
|
|||
Loading…
Reference in New Issue