mirror of https://github.com/apache/kafka.git
KAFKA-9133; Cleaner should handle log start offset larger than active segment base offset (#7662)
This was a regression in 2.3.1. In the case of a DeleteRecords call, the log start offset may be higher than the active segment base offset. The cleaner should allow for this case gracefully. Reviewers: Jun Rao <junrao@gmail.com> Co-Authored-By: Tim Van Laer <timvlaer@users.noreply.github.com>
This commit is contained in:
parent
4deb80676e
commit
92dd337688
|
@ -2122,8 +2122,8 @@ class Log(@volatile var dir: File,
|
||||||
lock synchronized {
|
lock synchronized {
|
||||||
val view = Option(segments.floorKey(from)).map { floor =>
|
val view = Option(segments.floorKey(from)).map { floor =>
|
||||||
if (to < floor)
|
if (to < floor)
|
||||||
throw new IllegalArgumentException(s"Invalid log segment range: requested segments from offset $from " +
|
throw new IllegalArgumentException(s"Invalid log segment range: requested segments in $topicPartition " +
|
||||||
s"mapping to segment with base offset $floor, which is greater than limit offset $to")
|
s"from offset $from mapping to segment with base offset $floor, which is greater than limit offset $to")
|
||||||
segments.subMap(floor, to)
|
segments.subMap(floor, to)
|
||||||
}.getOrElse(segments.headMap(to))
|
}.getOrElse(segments.headMap(to))
|
||||||
view.values.asScala
|
view.values.asScala
|
||||||
|
@ -2133,9 +2133,9 @@ class Log(@volatile var dir: File,
|
||||||
def nonActiveLogSegmentsFrom(from: Long): Iterable[LogSegment] = {
|
def nonActiveLogSegmentsFrom(from: Long): Iterable[LogSegment] = {
|
||||||
lock synchronized {
|
lock synchronized {
|
||||||
if (from > activeSegment.baseOffset)
|
if (from > activeSegment.baseOffset)
|
||||||
throw new IllegalArgumentException("Illegal request for non-active segments beginning at " +
|
Seq.empty
|
||||||
s"offset $from, which is larger than the active segment's base offset ${activeSegment.baseOffset}")
|
else
|
||||||
logSegments(from, activeSegment.baseOffset)
|
logSegments(from, activeSegment.baseOffset)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -447,17 +447,6 @@ object LogCleaner {
|
||||||
fileSuffix = Log.CleanedFileSuffix, initFileSize = log.initFileSize, preallocate = log.config.preallocate)
|
fileSuffix = Log.CleanedFileSuffix, initFileSize = log.initFileSize, preallocate = log.config.preallocate)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Given the first dirty offset and an uncleanable offset, calculates the total cleanable bytes for this log
|
|
||||||
* @return the biggest uncleanable offset and the total amount of cleanable bytes
|
|
||||||
*/
|
|
||||||
def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = {
|
|
||||||
val firstUncleanableSegment = log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)
|
|
||||||
val firstUncleanableOffset = firstUncleanableSegment.baseOffset
|
|
||||||
val cleanableBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum
|
|
||||||
|
|
||||||
(firstUncleanableOffset, cleanableBytes)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1061,7 +1050,7 @@ private case class LogToClean(topicPartition: TopicPartition,
|
||||||
uncleanableOffset: Long,
|
uncleanableOffset: Long,
|
||||||
needCompactionNow: Boolean = false) extends Ordered[LogToClean] {
|
needCompactionNow: Boolean = false) extends Ordered[LogToClean] {
|
||||||
val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size.toLong).sum
|
val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size.toLong).sum
|
||||||
val (firstUncleanableOffset, cleanableBytes) = LogCleaner.calculateCleanableBytes(log, firstDirtyOffset, uncleanableOffset)
|
val (firstUncleanableOffset, cleanableBytes) = LogCleanerManager.calculateCleanableBytes(log, firstDirtyOffset, uncleanableOffset)
|
||||||
val totalBytes = cleanBytes + cleanableBytes
|
val totalBytes = cleanBytes + cleanableBytes
|
||||||
val cleanableRatio = cleanableBytes / totalBytes.toDouble
|
val cleanableRatio = cleanableBytes / totalBytes.toDouble
|
||||||
override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
|
override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
|
||||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.kafka.common.TopicPartition
|
||||||
import org.apache.kafka.common.utils.Time
|
import org.apache.kafka.common.utils.Time
|
||||||
import org.apache.kafka.common.errors.KafkaStorageException
|
import org.apache.kafka.common.errors.KafkaStorageException
|
||||||
|
|
||||||
import scala.collection.{Iterable, Seq, immutable, mutable}
|
import scala.collection.{Iterable, Seq, mutable}
|
||||||
|
|
||||||
private[log] sealed trait LogCleaningState
|
private[log] sealed trait LogCleaningState
|
||||||
private[log] case object LogCleaningInProgress extends LogCleaningState
|
private[log] case object LogCleaningInProgress extends LogCleaningState
|
||||||
|
@ -109,8 +109,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
|
||||||
val now = Time.SYSTEM.milliseconds
|
val now = Time.SYSTEM.milliseconds
|
||||||
partitions.map { tp =>
|
partitions.map { tp =>
|
||||||
val log = logs.get(tp)
|
val log = logs.get(tp)
|
||||||
val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, tp, lastClean, now)
|
val lastCleanOffset = lastClean.get(tp)
|
||||||
val (_, uncleanableBytes) = LogCleaner.calculateCleanableBytes(log, firstDirtyOffset, firstUncleanableDirtyOffset)
|
val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, lastCleanOffset, now)
|
||||||
|
val (_, uncleanableBytes) = calculateCleanableBytes(log, firstDirtyOffset, firstUncleanableDirtyOffset)
|
||||||
uncleanableBytes
|
uncleanableBytes
|
||||||
}.sum
|
}.sum
|
||||||
}
|
}
|
||||||
|
@ -185,7 +186,8 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
|
||||||
}.map {
|
}.map {
|
||||||
case (topicPartition, log) => // create a LogToClean instance for each
|
case (topicPartition, log) => // create a LogToClean instance for each
|
||||||
try {
|
try {
|
||||||
val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, topicPartition, lastClean, now)
|
val lastCleanOffset = lastClean.get(topicPartition)
|
||||||
|
val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, lastCleanOffset, now)
|
||||||
val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now)
|
val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now)
|
||||||
preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
|
preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
|
||||||
|
|
||||||
|
@ -519,15 +521,11 @@ private[log] object LogCleanerManager extends Logging {
|
||||||
* Returns the range of dirty offsets that can be cleaned.
|
* Returns the range of dirty offsets that can be cleaned.
|
||||||
*
|
*
|
||||||
* @param log the log
|
* @param log the log
|
||||||
* @param lastClean the map of checkpointed offsets
|
* @param lastCleanOffset the last checkpointed offset
|
||||||
* @param now the current time in milliseconds of the cleaning operation
|
* @param now the current time in milliseconds of the cleaning operation
|
||||||
* @return the lower (inclusive) and upper (exclusive) offsets
|
* @return the lower (inclusive) and upper (exclusive) offsets
|
||||||
*/
|
*/
|
||||||
def cleanableOffsets(log: Log, topicPartition: TopicPartition, lastClean: immutable.Map[TopicPartition, Long], now: Long): (Long, Long) = {
|
def cleanableOffsets(log: Log, lastCleanOffset: Option[Long], now: Long): (Long, Long) = {
|
||||||
|
|
||||||
// the checkpointed offset, ie., the first offset of the next dirty segment
|
|
||||||
val lastCleanOffset: Option[Long] = lastClean.get(topicPartition)
|
|
||||||
|
|
||||||
// If the log segments are abnormally truncated and hence the checkpointed offset is no longer valid;
|
// If the log segments are abnormally truncated and hence the checkpointed offset is no longer valid;
|
||||||
// reset to the log starting offset and log the error
|
// reset to the log starting offset and log the error
|
||||||
val firstDirtyOffset = {
|
val firstDirtyOffset = {
|
||||||
|
@ -543,7 +541,7 @@ private[log] object LogCleanerManager extends Logging {
|
||||||
} else if (checkpointDirtyOffset > log.logEndOffset) {
|
} else if (checkpointDirtyOffset > log.logEndOffset) {
|
||||||
// The dirty offset has gotten ahead of the log end offset. This could happen if there was data
|
// The dirty offset has gotten ahead of the log end offset. This could happen if there was data
|
||||||
// corruption at the end of the log. We conservatively assume that the full log needs cleaning.
|
// corruption at the end of the log. We conservatively assume that the full log needs cleaning.
|
||||||
warn(s"The last checkpoint dirty offset for partition $topicPartition is $checkpointDirtyOffset, " +
|
warn(s"The last checkpoint dirty offset for partition ${log.name} is $checkpointDirtyOffset, " +
|
||||||
s"which is larger than the log end offset ${log.logEndOffset}. Resetting to the log start offset $logStartOffset.")
|
s"which is larger than the log end offset ${log.logEndOffset}. Resetting to the log start offset $logStartOffset.")
|
||||||
logStartOffset
|
logStartOffset
|
||||||
} else {
|
} else {
|
||||||
|
@ -570,14 +568,31 @@ private[log] object LogCleanerManager extends Logging {
|
||||||
val dirtyNonActiveSegments = log.nonActiveLogSegmentsFrom(firstDirtyOffset)
|
val dirtyNonActiveSegments = log.nonActiveLogSegmentsFrom(firstDirtyOffset)
|
||||||
dirtyNonActiveSegments.find { s =>
|
dirtyNonActiveSegments.find { s =>
|
||||||
val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
|
val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
|
||||||
debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - minCompactionLagMs}; is uncleanable=$isUncleanable")
|
debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} " +
|
||||||
|
s"segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - minCompactionLagMs}; " +
|
||||||
|
s"is uncleanable=$isUncleanable")
|
||||||
isUncleanable
|
isUncleanable
|
||||||
}.map(_.baseOffset)
|
}.map(_.baseOffset)
|
||||||
} else None
|
} else None
|
||||||
).flatten.min
|
).flatten.min
|
||||||
|
|
||||||
debug(s"Finding range of cleanable offsets for log=${log.name} topicPartition=$topicPartition. Last clean offset=$lastCleanOffset now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset activeSegment.baseOffset=${log.activeSegment.baseOffset}")
|
debug(s"Finding range of cleanable offsets for log=${log.name}. Last clean offset=$lastCleanOffset " +
|
||||||
|
s"now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset " +
|
||||||
|
s"activeSegment.baseOffset=${log.activeSegment.baseOffset}")
|
||||||
|
|
||||||
(firstDirtyOffset, firstUncleanableDirtyOffset)
|
(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableDirtyOffset))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given the first dirty offset and an uncleanable offset, calculates the total cleanable bytes for this log
|
||||||
|
* @return the biggest uncleanable offset and the total amount of cleanable bytes
|
||||||
|
*/
|
||||||
|
def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = {
|
||||||
|
val firstUncleanableSegment = log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)
|
||||||
|
val firstUncleanableOffset = firstUncleanableSegment.baseOffset
|
||||||
|
val cleanableBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum
|
||||||
|
|
||||||
|
(firstUncleanableOffset, cleanableBytes)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,8 +78,8 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K
|
||||||
val uncleanableBytesGauge = getGauge[Long]("uncleanable-bytes", uncleanableDirectory)
|
val uncleanableBytesGauge = getGauge[Long]("uncleanable-bytes", uncleanableDirectory)
|
||||||
|
|
||||||
TestUtils.waitUntilTrue(() => uncleanablePartitionsCountGauge.value() == 2, "There should be 2 uncleanable partitions", 2000L)
|
TestUtils.waitUntilTrue(() => uncleanablePartitionsCountGauge.value() == 2, "There should be 2 uncleanable partitions", 2000L)
|
||||||
val expectedTotalUncleanableBytes = LogCleaner.calculateCleanableBytes(log, 0, log.logSegments.last.baseOffset)._2 +
|
val expectedTotalUncleanableBytes = LogCleanerManager.calculateCleanableBytes(log, 0, log.logSegments.last.baseOffset)._2 +
|
||||||
LogCleaner.calculateCleanableBytes(log2, 0, log2.logSegments.last.baseOffset)._2
|
LogCleanerManager.calculateCleanableBytes(log2, 0, log2.logSegments.last.baseOffset)._2
|
||||||
TestUtils.waitUntilTrue(() => uncleanableBytesGauge.value() == expectedTotalUncleanableBytes,
|
TestUtils.waitUntilTrue(() => uncleanableBytesGauge.value() == expectedTotalUncleanableBytes,
|
||||||
s"There should be $expectedTotalUncleanableBytes uncleanable bytes", 1000L)
|
s"There should be $expectedTotalUncleanableBytes uncleanable bytes", 1000L)
|
||||||
|
|
||||||
|
|
|
@ -213,6 +213,54 @@ class LogCleanerManagerTest extends Logging {
|
||||||
assertEquals(10L, filthiestLog.firstDirtyOffset)
|
assertEquals(10L, filthiestLog.firstDirtyOffset)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testLogStartOffsetLargerThanActiveSegmentBaseOffset(): Unit = {
|
||||||
|
val tp = new TopicPartition("foo", 0)
|
||||||
|
val log = createLog(segmentSize = 2048, LogConfig.Compact, tp)
|
||||||
|
|
||||||
|
val logs = new Pool[TopicPartition, Log]()
|
||||||
|
logs.put(tp, log)
|
||||||
|
|
||||||
|
appendRecords(log, numRecords = 3)
|
||||||
|
appendRecords(log, numRecords = 3)
|
||||||
|
appendRecords(log, numRecords = 3)
|
||||||
|
|
||||||
|
assertEquals(1, log.logSegments.size)
|
||||||
|
|
||||||
|
log.maybeIncrementLogStartOffset(2L)
|
||||||
|
|
||||||
|
val cleanerManager = createCleanerManagerMock(logs)
|
||||||
|
cleanerCheckpoints.put(tp, 0L)
|
||||||
|
|
||||||
|
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
|
||||||
|
assertEquals(2L, filthiestLog.firstDirtyOffset)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testDirtyOffsetLargerThanActiveSegmentBaseOffset(): Unit = {
|
||||||
|
// It is possible in the case of an unclean leader election for the checkpoint
|
||||||
|
// dirty offset to get ahead of the active segment base offset, but still be
|
||||||
|
// within the range of the log.
|
||||||
|
|
||||||
|
val tp = new TopicPartition("foo", 0)
|
||||||
|
|
||||||
|
val logs = new Pool[TopicPartition, Log]()
|
||||||
|
val log = createLog(2048, LogConfig.Compact, topicPartition = tp)
|
||||||
|
logs.put(tp, log)
|
||||||
|
|
||||||
|
appendRecords(log, numRecords = 3)
|
||||||
|
appendRecords(log, numRecords = 3)
|
||||||
|
|
||||||
|
assertEquals(1, log.logSegments.size)
|
||||||
|
assertEquals(0L, log.activeSegment.baseOffset)
|
||||||
|
|
||||||
|
val cleanerManager = createCleanerManagerMock(logs)
|
||||||
|
cleanerCheckpoints.put(tp, 3L)
|
||||||
|
|
||||||
|
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
|
||||||
|
assertEquals(3L, filthiestLog.firstDirtyOffset)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* When checking for logs with segments ready for deletion
|
* When checking for logs with segments ready for deletion
|
||||||
* we shouldn't consider logs where cleanup.policy=delete
|
* we shouldn't consider logs where cleanup.policy=delete
|
||||||
|
@ -373,8 +421,8 @@ class LogCleanerManagerTest extends Logging {
|
||||||
while(log.numberOfSegments < 8)
|
while(log.numberOfSegments < 8)
|
||||||
log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
|
log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
|
||||||
|
|
||||||
val lastClean = Map(topicPartition -> 0L)
|
val lastCleanOffset = Some(0L)
|
||||||
val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds)
|
val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds)
|
||||||
assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
|
assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
|
||||||
assertEquals("The first uncleanable offset begins with the active segment.", log.activeSegment.baseOffset, cleanableOffsets._2)
|
assertEquals("The first uncleanable offset begins with the active segment.", log.activeSegment.baseOffset, cleanableOffsets._2)
|
||||||
}
|
}
|
||||||
|
@ -403,8 +451,8 @@ class LogCleanerManagerTest extends Logging {
|
||||||
while (log.numberOfSegments < 8)
|
while (log.numberOfSegments < 8)
|
||||||
log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t1), leaderEpoch = 0)
|
log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t1), leaderEpoch = 0)
|
||||||
|
|
||||||
val lastClean = Map(topicPartition -> 0L)
|
val lastCleanOffset = Some(0L)
|
||||||
val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds)
|
val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds)
|
||||||
assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
|
assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
|
||||||
assertEquals("The first uncleanable offset begins with the second block of log entries.", activeSegAtT0.baseOffset, cleanableOffsets._2)
|
assertEquals("The first uncleanable offset begins with the second block of log entries.", activeSegAtT0.baseOffset, cleanableOffsets._2)
|
||||||
}
|
}
|
||||||
|
@ -428,8 +476,8 @@ class LogCleanerManagerTest extends Logging {
|
||||||
|
|
||||||
time.sleep(compactionLag + 1)
|
time.sleep(compactionLag + 1)
|
||||||
|
|
||||||
val lastClean = Map(topicPartition -> 0L)
|
val lastCleanOffset = Some(0L)
|
||||||
val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds)
|
val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds)
|
||||||
assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
|
assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
|
||||||
assertEquals("The first uncleanable offset begins with active segment.", log.activeSegment.baseOffset, cleanableOffsets._2)
|
assertEquals("The first uncleanable offset begins with active segment.", log.activeSegment.baseOffset, cleanableOffsets._2)
|
||||||
}
|
}
|
||||||
|
@ -456,8 +504,7 @@ class LogCleanerManagerTest extends Logging {
|
||||||
|
|
||||||
time.sleep(compactionLag + 1)
|
time.sleep(compactionLag + 1)
|
||||||
// although the compaction lag has been exceeded, the undecided data should not be cleaned
|
// although the compaction lag has been exceeded, the undecided data should not be cleaned
|
||||||
var cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition,
|
var cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), time.milliseconds())
|
||||||
Map(topicPartition -> 0L), time.milliseconds())
|
|
||||||
assertEquals(0L, cleanableOffsets._1)
|
assertEquals(0L, cleanableOffsets._1)
|
||||||
assertEquals(0L, cleanableOffsets._2)
|
assertEquals(0L, cleanableOffsets._2)
|
||||||
|
|
||||||
|
@ -467,16 +514,14 @@ class LogCleanerManagerTest extends Logging {
|
||||||
log.updateHighWatermark(4L)
|
log.updateHighWatermark(4L)
|
||||||
|
|
||||||
// the first segment should now become cleanable immediately
|
// the first segment should now become cleanable immediately
|
||||||
cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition,
|
cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), time.milliseconds())
|
||||||
Map(topicPartition -> 0L), time.milliseconds())
|
|
||||||
assertEquals(0L, cleanableOffsets._1)
|
assertEquals(0L, cleanableOffsets._1)
|
||||||
assertEquals(3L, cleanableOffsets._2)
|
assertEquals(3L, cleanableOffsets._2)
|
||||||
|
|
||||||
time.sleep(compactionLag + 1)
|
time.sleep(compactionLag + 1)
|
||||||
|
|
||||||
// the second segment becomes cleanable after the compaction lag
|
// the second segment becomes cleanable after the compaction lag
|
||||||
cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition,
|
cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), time.milliseconds())
|
||||||
Map(topicPartition -> 0L), time.milliseconds())
|
|
||||||
assertEquals(0L, cleanableOffsets._1)
|
assertEquals(0L, cleanableOffsets._1)
|
||||||
assertEquals(4L, cleanableOffsets._2)
|
assertEquals(4L, cleanableOffsets._2)
|
||||||
}
|
}
|
||||||
|
@ -571,26 +616,28 @@ class LogCleanerManagerTest extends Logging {
|
||||||
recordsPerBatch: Int,
|
recordsPerBatch: Int,
|
||||||
batchesPerSegment: Int): Unit = {
|
batchesPerSegment: Int): Unit = {
|
||||||
for (i <- 0 until numBatches) {
|
for (i <- 0 until numBatches) {
|
||||||
val startOffset = i * recordsPerBatch
|
appendRecords(log, recordsPerBatch)
|
||||||
val endOffset = startOffset + recordsPerBatch
|
|
||||||
var lastTimestamp = 0L
|
|
||||||
val records = (startOffset until endOffset).map { offset =>
|
|
||||||
val currentTimestamp = time.milliseconds()
|
|
||||||
if (offset == endOffset - 1)
|
|
||||||
lastTimestamp = currentTimestamp
|
|
||||||
|
|
||||||
new SimpleRecord(currentTimestamp, s"key-$offset".getBytes, s"value-$offset".getBytes)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, records:_*), leaderEpoch = 1)
|
|
||||||
log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
|
|
||||||
|
|
||||||
if (i % batchesPerSegment == 0)
|
if (i % batchesPerSegment == 0)
|
||||||
log.roll()
|
log.roll()
|
||||||
}
|
}
|
||||||
log.roll()
|
log.roll()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def appendRecords(log: Log, numRecords: Int): Unit = {
|
||||||
|
val startOffset = log.logEndOffset
|
||||||
|
val endOffset = startOffset + numRecords
|
||||||
|
var lastTimestamp = 0L
|
||||||
|
val records = (startOffset until endOffset).map { offset =>
|
||||||
|
val currentTimestamp = time.milliseconds()
|
||||||
|
if (offset == endOffset - 1)
|
||||||
|
lastTimestamp = currentTimestamp
|
||||||
|
new SimpleRecord(currentTimestamp, s"key-$offset".getBytes, s"value-$offset".getBytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, records:_*), leaderEpoch = 1)
|
||||||
|
log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
|
||||||
|
}
|
||||||
|
|
||||||
private def makeLog(dir: File = logDir, config: LogConfig) =
|
private def makeLog(dir: File = logDir, config: LogConfig) =
|
||||||
Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
|
Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
|
||||||
time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000,
|
time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000,
|
||||||
|
|
|
@ -535,6 +535,7 @@ class LogTest {
|
||||||
assertEquals(0 until 5, nonActiveBaseOffsetsFrom(0L))
|
assertEquals(0 until 5, nonActiveBaseOffsetsFrom(0L))
|
||||||
assertEquals(Seq.empty, nonActiveBaseOffsetsFrom(5L))
|
assertEquals(Seq.empty, nonActiveBaseOffsetsFrom(5L))
|
||||||
assertEquals(2 until 5, nonActiveBaseOffsetsFrom(2L))
|
assertEquals(2 until 5, nonActiveBaseOffsetsFrom(2L))
|
||||||
|
assertEquals(Seq.empty, nonActiveBaseOffsetsFrom(6L))
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue