MINOR: Add reason to log message when incrementing the log start offset (#8701)

Sometimes logging leaves us guessing at the cause of an increment to the log start offset. Since this results in deletion of user data, we should provide the reason explicitly.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Jason Gustafson 2020-05-26 13:52:19 -07:00 committed by GitHub
parent c6adcca95f
commit c95b45d04f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 48 additions and 37 deletions

View File

@ -1128,7 +1128,7 @@ class Partition(val topicPartition: TopicPartition,
if (convertedOffset < 0)
throw new OffsetOutOfRangeException(s"The offset $convertedOffset for partition $topicPartition is not valid")
leaderLog.maybeIncrementLogStartOffset(convertedOffset)
leaderLog.maybeIncrementLogStartOffset(convertedOffset, ClientRecordDeletion)
LogDeleteRecordsResult(
requestedOffset = convertedOffset,
lowWatermark = lowWatermarkIfLeader)

View File

@ -180,6 +180,17 @@ object RollParams {
}
}
sealed trait LogStartOffsetIncrementReason
case object ClientRecordDeletion extends LogStartOffsetIncrementReason {
override def toString: String = "client delete records request"
}
case object LeaderOffsetIncremented extends LogStartOffsetIncrementReason {
override def toString: String = "leader offset increment"
}
case object SegmentDeletion extends LogStartOffsetIncrementReason {
override def toString: String = "segment deletion"
}
/**
* An append-only log for storing messages.
*
@ -1263,20 +1274,20 @@ class Log(@volatile private var _dir: File,
/**
* Increment the log start offset if the provided offset is larger.
*/
def maybeIncrementLogStartOffset(newLogStartOffset: Long): Unit = {
if (newLogStartOffset > highWatermark)
throw new OffsetOutOfRangeException(s"Cannot increment the log start offset to $newLogStartOffset of partition $topicPartition " +
s"since it is larger than the high watermark $highWatermark")
def maybeIncrementLogStartOffset(newLogStartOffset: Long, reason: LogStartOffsetIncrementReason): Unit = {
// We don't have to write the log start offset to log-start-offset-checkpoint immediately.
// The deleteRecordsOffset may be lost only if all in-sync replicas of this broker are shutdown
// in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low.
maybeHandleIOException(s"Exception while increasing log start offset for $topicPartition to $newLogStartOffset in dir ${dir.getParent}") {
lock synchronized {
if (newLogStartOffset > highWatermark)
throw new OffsetOutOfRangeException(s"Cannot increment the log start offset to $newLogStartOffset of partition $topicPartition " +
s"since it is larger than the high watermark $highWatermark")
checkIfMemoryMappedBufferClosed()
if (newLogStartOffset > logStartOffset) {
info(s"Incrementing log start offset to $newLogStartOffset")
updateLogStartOffset(newLogStartOffset)
info(s"Incremented log start offset to $newLogStartOffset due to $reason")
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
producerStateManager.truncateHead(newLogStartOffset)
maybeIncrementFirstUnstableOffset()
@ -1712,7 +1723,7 @@ class Log(@volatile private var _dir: File,
checkIfMemoryMappedBufferClosed()
// remove the segments for lookups
removeAndDeleteSegments(deletable, asyncDelete = true)
maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset, SegmentDeletion)
}
}
numToDelete

View File

@ -22,7 +22,7 @@ import java.util.Optional
import kafka.api.Request
import kafka.cluster.BrokerEndPoint
import kafka.log.LogAppendInfo
import kafka.log.{LeaderOffsetIncremented, LogAppendInfo}
import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.QuotaFactory.UnboundedQuota
@ -35,7 +35,7 @@ import org.apache.kafka.common.requests.FetchResponse.PartitionData
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
import scala.jdk.CollectionConverters._
import scala.collection.{mutable, Map, Seq, Set}
import scala.collection.{Map, Seq, Set, mutable}
class ReplicaAlterLogDirsThread(name: String,
sourceBroker: BrokerEndPoint,
@ -122,7 +122,7 @@ class ReplicaAlterLogDirsThread(name: String,
None
futureLog.updateHighWatermark(partitionData.highWatermark)
futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset)
futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LeaderOffsetIncremented)
if (partition.maybeReplaceCurrentWithFutureReplica())
removePartitions(Set(topicPartition))

View File

@ -21,7 +21,7 @@ import java.util.Optional
import kafka.api._
import kafka.cluster.BrokerEndPoint
import kafka.log.LogAppendInfo
import kafka.log.{LeaderOffsetIncremented, LogAppendInfo}
import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import org.apache.kafka.clients.FetchSessionHandler
@ -35,7 +35,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{LogContext, Time}
import scala.jdk.CollectionConverters._
import scala.collection.{mutable, Map}
import scala.collection.{Map, mutable}
class ReplicaFetcherThread(name: String,
fetcherId: Int,
@ -175,7 +175,7 @@ class ReplicaFetcherThread(name: String,
// For the follower replica, we do not need to keep its segment base offset and physical position.
// These values will be computed upon becoming leader or handling a preferred read replica fetch.
val followerHighWatermark = log.updateHighWatermark(partitionData.highWatermark)
log.maybeIncrementLogStartOffset(leaderLogStartOffset)
log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented)
if (logTrace)
trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")

View File

@ -18,13 +18,13 @@ package kafka.cluster
import java.util.Properties
import kafka.log.{Log, LogConfig, LogManager}
import kafka.log.{ClientRecordDeletion, Log, LogConfig, LogManager}
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.errors.OffsetOutOfRangeException
import org.apache.kafka.common.utils.Utils
import org.junit.{After, Before, Test}
import org.junit.Assert._
import org.junit.{After, Before, Test}
class ReplicaTest {
@ -122,6 +122,6 @@ class ReplicaTest {
}
log.updateHighWatermark(25L)
log.maybeIncrementLogStartOffset(26L)
log.maybeIncrementLogStartOffset(26L, ClientRecordDeletion)
}
}

View File

@ -209,7 +209,7 @@ class LogCleanerManagerTest extends Logging {
val tp = new TopicPartition("foo", 0)
val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, batchIncrement = 5)
logs.get(tp).maybeIncrementLogStartOffset(10L)
logs.get(tp).maybeIncrementLogStartOffset(10L, ClientRecordDeletion)
val cleanerManager = createCleanerManagerMock(logs)
cleanerCheckpoints.put(tp, 0L)
@ -232,7 +232,7 @@ class LogCleanerManagerTest extends Logging {
assertEquals(1, log.logSegments.size)
log.maybeIncrementLogStartOffset(2L)
log.maybeIncrementLogStartOffset(2L, ClientRecordDeletion)
val cleanerManager = createCleanerManagerMock(logs)
cleanerCheckpoints.put(tp, 0L)
@ -493,13 +493,13 @@ class LogCleanerManagerTest extends Logging {
def testCleanableOffsetsNeedsCheckpointReset(): Unit = {
val tp = new TopicPartition("foo", 0)
val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, batchIncrement = 5)
logs.get(tp).maybeIncrementLogStartOffset(10L)
logs.get(tp).maybeIncrementLogStartOffset(10L, ClientRecordDeletion)
var lastCleanOffset = Some(15L)
var cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset, time.milliseconds)
assertFalse("Checkpoint offset should not be reset if valid", cleanableOffsets.forceUpdateCheckpoint)
logs.get(tp).maybeIncrementLogStartOffset(20L)
logs.get(tp).maybeIncrementLogStartOffset(20L, ClientRecordDeletion)
cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset, time.milliseconds)
assertTrue("Checkpoint offset needs to be reset if less than log start offset", cleanableOffsets.forceUpdateCheckpoint)
@ -608,7 +608,7 @@ class LogCleanerManagerTest extends Logging {
val tp = new TopicPartition("foo", 0)
val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, batchIncrement = 5)
logs.get(tp).maybeIncrementLogStartOffset(20L)
logs.get(tp).maybeIncrementLogStartOffset(20L, ClientRecordDeletion)
val cleanerManager = createCleanerManagerMock(logs)
cleanerCheckpoints.put(tp, 15L)
@ -629,7 +629,7 @@ class LogCleanerManagerTest extends Logging {
// create two logs, one with an invalid offset, and one that is dirtier than the log with an invalid offset
val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20, batchIncrement = 5)
logs.get(tp0).maybeIncrementLogStartOffset(15L)
logs.get(tp0).maybeIncrementLogStartOffset(15L, ClientRecordDeletion)
val cleanerManager = createCleanerManagerMock(logs)
cleanerCheckpoints.put(tp0, 10L)
cleanerCheckpoints.put(tp1, 5L)

View File

@ -35,8 +35,8 @@ import org.junit.Assert._
import org.junit.{After, Test}
import org.scalatest.Assertions.{assertThrows, fail, intercept}
import scala.jdk.CollectionConverters._
import scala.collection._
import scala.jdk.CollectionConverters._
/**
* Unit tests for the log cleaning logic
@ -128,7 +128,7 @@ class LogCleanerTest {
override def run(): Unit = {
deleteStartLatch.await(5000, TimeUnit.MILLISECONDS)
log.updateHighWatermark(log.activeSegment.baseOffset)
log.maybeIncrementLogStartOffset(log.activeSegment.baseOffset)
log.maybeIncrementLogStartOffset(log.activeSegment.baseOffset, LeaderOffsetIncremented)
log.updateHighWatermark(log.activeSegment.baseOffset)
log.deleteOldSegments()
deleteCompleteLatch.countDown()

View File

@ -613,7 +613,7 @@ class LogTest {
// Increment the log start offset
val startOffset = 4
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(startOffset)
log.maybeIncrementLogStartOffset(startOffset, ClientRecordDeletion)
assertTrue(log.logEndOffset > log.logStartOffset)
// Append garbage to a segment below the current log start offset
@ -1209,7 +1209,7 @@ class LogTest {
assertEquals(2, log.activeProducersWithLastSequence.size)
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(1L)
log.maybeIncrementLogStartOffset(1L, ClientRecordDeletion)
// Deleting records should not remove producer state
assertEquals(2, log.activeProducersWithLastSequence.size)
@ -1244,7 +1244,7 @@ class LogTest {
assertEquals(2, log.activeProducersWithLastSequence.size)
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(1L)
log.maybeIncrementLogStartOffset(1L, ClientRecordDeletion)
log.deleteOldSegments()
// Deleting records should not remove producer state
@ -1664,7 +1664,7 @@ class LogTest {
assertEquals(2, ProducerStateManager.listSnapshotFiles(log.producerStateManager.logDir).size)
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(2L)
log.maybeIncrementLogStartOffset(2L, ClientRecordDeletion)
// Deleting records should not remove producer state but should delete snapshots
assertEquals(2, log.activeProducersWithLastSequence.size)
@ -3310,17 +3310,17 @@ class LogTest {
assertEquals(log.logStartOffset, 0)
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(1)
log.maybeIncrementLogStartOffset(1, ClientRecordDeletion)
log.deleteOldSegments()
assertEquals("should have 3 segments", 3, log.numberOfSegments)
assertEquals(log.logStartOffset, 1)
log.maybeIncrementLogStartOffset(6)
log.maybeIncrementLogStartOffset(6, ClientRecordDeletion)
log.deleteOldSegments()
assertEquals("should have 2 segments", 2, log.numberOfSegments)
assertEquals(log.logStartOffset, 6)
log.maybeIncrementLogStartOffset(15)
log.maybeIncrementLogStartOffset(15, ClientRecordDeletion)
log.deleteOldSegments()
assertEquals("should have 1 segments", 1, log.numberOfSegments)
assertEquals(log.logStartOffset, 15)
@ -3438,7 +3438,7 @@ class LogTest {
// Three segments should be created
assertEquals(3, log.logSegments.count(_ => true))
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(recordsPerSegment)
log.maybeIncrementLogStartOffset(recordsPerSegment, ClientRecordDeletion)
// The first segment, which is entirely before the log start offset, should be deleted
// Of the remaining the segments, the first can overlap the log start offset and the rest must have a base offset
@ -4128,7 +4128,7 @@ class LogTest {
assertEquals(Some(0L), log.firstUnstableOffset)
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(5L)
log.maybeIncrementLogStartOffset(5L, ClientRecordDeletion)
// the first unstable offset should be lower bounded by the log start offset
assertEquals(Some(5L), log.firstUnstableOffset)
@ -4153,7 +4153,7 @@ class LogTest {
assertEquals(Some(0L), log.firstUnstableOffset)
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(8L)
log.maybeIncrementLogStartOffset(8L, ClientRecordDeletion)
log.updateHighWatermark(log.logEndOffset)
log.deleteOldSegments()
assertEquals(1, log.logSegments.size)

View File

@ -21,7 +21,7 @@ import java.io.File
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Optional, Properties, Random}
import kafka.log.{Log, LogSegment}
import kafka.log.{ClientRecordDeletion, Log, LogSegment}
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
@ -78,7 +78,7 @@ class LogOffsetTest extends BaseRequestTest {
log.flush()
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(3)
log.maybeIncrementLogStartOffset(3, ClientRecordDeletion)
log.deleteOldSegments()
val offsets = log.legacyFetchOffsetsBefore(ListOffsetRequest.LATEST_TIMESTAMP, 15)