mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table (#15631)
Co-authored-by: hzh0425 <642256541@qq.com> Reviewers: Luke Chen <showuon@gmail.com>, Jun Rao <junrao@gmail.com>
This commit is contained in:
		
							parent
							
								
									f6bf85edbf
								
							
						
					
					
						commit
						d092787487
					
				|  | @ -1511,10 +1511,14 @@ class UnifiedLog(@volatile var logStartOffset: Long, | |||
|           } | ||||
|         } | ||||
|         localLog.checkIfMemoryMappedBufferClosed() | ||||
|         // remove the segments for lookups | ||||
|         localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason) | ||||
|         if (segmentsToDelete.nonEmpty) { | ||||
|           // increment the local-log-start-offset or log-start-offset before removing the segment for lookups | ||||
|           val newLocalLogStartOffset = localLog.segments.higherSegment(segmentsToDelete.last.baseOffset()).get.baseOffset() | ||||
|           incrementStartOffset(newLocalLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion) | ||||
|           // remove the segments for lookups | ||||
|           localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason) | ||||
|         } | ||||
|         deleteProducerSnapshots(deletable, asyncDelete = true) | ||||
|         incrementStartOffset(localLog.segments.firstSegmentBaseOffset.getAsLong, LogStartOffsetIncrementReason.SegmentDeletion) | ||||
|       } | ||||
|       numToDelete | ||||
|     } | ||||
|  |  | |||
|  | @ -40,7 +40,7 @@ import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEn | |||
| import org.junit.jupiter.api.Assertions._ | ||||
| import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} | ||||
| import org.junit.jupiter.params.ParameterizedTest | ||||
| import org.junit.jupiter.params.provider.EnumSource | ||||
| import org.junit.jupiter.params.provider.{EnumSource, ValueSource} | ||||
| import org.mockito.ArgumentMatchers | ||||
| import org.mockito.ArgumentMatchers.{any, anyLong} | ||||
| import org.mockito.Mockito.{doThrow, mock, spy, when} | ||||
|  | @ -951,8 +951,9 @@ class UnifiedLogTest { | |||
|     assertEquals(0, lastSeq) | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   def testRetentionDeletesProducerStateSnapshots(): Unit = { | ||||
|   @ParameterizedTest(name = "testRetentionDeletesProducerStateSnapshots with createEmptyActiveSegment: {0}") | ||||
|   @ValueSource(booleans = Array(true, false)) | ||||
|   def testRetentionDeletesProducerStateSnapshots(createEmptyActiveSegment: Boolean): Unit = { | ||||
|     val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0) | ||||
|     val log = createLog(logDir, logConfig) | ||||
|     val pid1 = 1L | ||||
|  | @ -966,10 +967,14 @@ class UnifiedLogTest { | |||
|     log.roll() | ||||
|     log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), producerId = pid1, | ||||
|       producerEpoch = epoch, sequence = 2), leaderEpoch = 0) | ||||
|     if (createEmptyActiveSegment) { | ||||
|       log.roll() | ||||
|     } | ||||
| 
 | ||||
|     log.updateHighWatermark(log.logEndOffset) | ||||
| 
 | ||||
|     assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size) | ||||
|     val numProducerSnapshots = if (createEmptyActiveSegment) 3 else 2 | ||||
|     assertEquals(numProducerSnapshots, ProducerStateManager.listSnapshotFiles(logDir).size) | ||||
|     // Sleep to breach the retention period | ||||
|     mockTime.sleep(1000 * 60 + 1) | ||||
|     log.deleteOldSegments() | ||||
|  | @ -977,6 +982,7 @@ class UnifiedLogTest { | |||
|     mockTime.sleep(1) | ||||
|     assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size, | ||||
|       "expect a single producer state snapshot remaining") | ||||
|     assertEquals(3, log.logStartOffset) | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|  | @ -4091,6 +4097,30 @@ class UnifiedLogTest { | |||
|     assertEquals(1, log.logSegments.size) | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   def testIncrementLocalLogStartOffsetAfterLocalLogDeletion(): Unit = { | ||||
|     val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, fileDeleteDelayMs = 0, remoteLogStorageEnable = true) | ||||
|     val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) | ||||
| 
 | ||||
|     var offset = 0L | ||||
|     for(_ <- 0 until 50) { | ||||
|       val records = TestUtils.singletonRecords("test".getBytes()) | ||||
|       val info = log.appendAsLeader(records, leaderEpoch = 0) | ||||
|       offset = info.lastOffset | ||||
|       if (offset != 0 && offset % 10 == 0) | ||||
|         log.roll() | ||||
|     } | ||||
|     assertEquals(5, log.logSegments.size) | ||||
|     log.updateHighWatermark(log.logEndOffset) | ||||
|     // simulate calls to upload 3 segments to remote storage | ||||
|     log.updateHighestOffsetInRemoteStorage(30) | ||||
| 
 | ||||
|     log.deleteOldSegments() | ||||
|     assertEquals(2, log.logSegments.size()) | ||||
|     assertEquals(0, log.logStartOffset) | ||||
|     assertEquals(31, log.localLogStartOffset()) | ||||
|   } | ||||
| 
 | ||||
|   private def appendTransactionalToBuffer(buffer: ByteBuffer, | ||||
|                                           producerId: Long, | ||||
|                                           producerEpoch: Short, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue