KAFKA-7400: Compacted topic segments that precede the log start offse… (#5646)

* KAFKA-7400: Compacted topic segments that precede the log start offset are not cleaned up

Currently we don't delete any log segments if the cleanup policy doesn't include delete. This patch changes the behavior to delete log segments that fully precede the log start offset even when deletion is not enabled. Tested with unit tests to verify that LogManager.cleanupLogs now cleans logs with cleanup.policy=compact and that Log.deleteOldSegments deletes segments that preced the start offset regardless of the cleanup policy.

Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Jason Gustafson <jason@confluent.io>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Bob Barrett 2018-09-21 13:31:45 -07:00 committed by Jun Rao
parent 96132e2dbb
commit 0bc7008e75
5 changed files with 57 additions and 12 deletions

View File

@ -1353,12 +1353,17 @@ class Log(@volatile var dir: File,
}
/**
* Delete any log segments that have either expired due to time based retention
* or because the log size is > retentionSize
* If topic deletion is enabled, delete any log segments that have either expired due to time based retention
* or because the log size is > retentionSize.
*
* Whether or not deletion is enabled, delete any log segments that are before the log start offset
*/
def deleteOldSegments(): Int = {
if (!config.delete) return 0
deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
if (config.delete) {
deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
} else {
deleteLogStartOffsetBreachedSegments()
}
}
private def deleteRetentionMsBreachedSegments(): Int = {

View File

@ -171,12 +171,13 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
}
/**
* Find any logs that have compact and delete enabled
* Find any logs that have compaction enabled. Include logs without delete enabled, as they may have segments
* that precede the start offset.
*/
def deletableLogs(): Iterable[(TopicPartition, Log)] = {
inLock(lock) {
val toClean = logs.filter { case (topicPartition, log) =>
!inProgress.contains(topicPartition) && isCompactAndDelete(log)
!inProgress.contains(topicPartition) && log.config.compact
}
toClean.foreach { case (tp, _) => inProgress.put(tp, LogCleaningInProgress) }
toClean

View File

@ -77,17 +77,17 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
}
/**
* When looking for logs with segments ready to be deleted we shouldn't consider
* logs with cleanup.policy=compact as they shouldn't have segments truncated.
* When looking for logs with segments ready to be deleted we should consider
* logs with cleanup.policy=compact because they may have segments from before the log start offset
*/
@Test
def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyCompactLogs(): Unit = {
def testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactLogs(): Unit = {
val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
val cleanerManager: LogCleanerManager = createCleanerManager(log)
val readyToDelete = cleanerManager.deletableLogs().size
assertEquals("should have 1 logs ready to be deleted", 0, readyToDelete)
assertEquals("should have 1 logs ready to be deleted", 1, readyToDelete)
}
/**

View File

@ -190,13 +190,26 @@ class LogManagerTest {
}
/**
* Ensures that LogManager only runs on logs with cleanup.policy=delete
* Ensures that LogManager doesn't run on logs with cleanup.policy=compact,delete
* LogCleaner.CleanerThread handles all logs where compaction is enabled.
*/
@Test
def testDoesntCleanLogsWithCompactDeletePolicy() {
testDoesntCleanLogs(LogConfig.Compact + "," + LogConfig.Delete)
}
/**
* Ensures that LogManager doesn't run on logs with cleanup.policy=compact
* LogCleaner.CleanerThread handles all logs where compaction is enabled.
*/
@Test
def testDoesntCleanLogsWithCompactPolicy() {
testDoesntCleanLogs(LogConfig.Compact)
}
private def testDoesntCleanLogs(policy: String) {
val logProps = new Properties()
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete)
logProps.put(LogConfig.CleanupPolicyProp, policy)
val log = logManager.getOrCreateLog(new TopicPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps))
var offset = 0L
for (_ <- 0 until 200) {

View File

@ -2746,6 +2746,32 @@ class LogTest {
assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments)
}
@Test
def shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete(): Unit = {
def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L)
val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact")
// Create log with start offset ahead of the first log segment
val log = createLog(logDir, logConfig, brokerTopicStats, logStartOffset = 5L)
// append some messages to create some segments
for (_ <- 0 until 15)
log.appendAsLeader(createRecords, leaderEpoch = 0)
// Three segments should be created, with the first one entirely preceding the log start offset
assertEquals(3, log.logSegments.count(_ => true))
assertTrue(log.logSegments.slice(1, 2).head.baseOffset <= log.logStartOffset)
// The first segment, which is entirely before the log start offset, should be deleted
// Of the remaining the segments, the first can overlap the log start offset and the rest must have a base offset
// greater than the start offset
log.onHighWatermarkIncremented(log.logEndOffset)
log.deleteOldSegments()
assertEquals("There should be 2 segments remaining", 2, log.numberOfSegments)
assertTrue(log.logSegments.head.baseOffset <= log.logStartOffset)
assertTrue(log.logSegments.tail.forall(s => s.baseOffset > log.logStartOffset))
}
@Test
def shouldApplyEpochToMessageOnAppendIfLeader() {
val records = (0 until 50).toArray.map(id => new SimpleRecord(id.toString.getBytes))