KAFKA-9105; Add back truncateHead method to ProducerStateManager (#7599)

The truncateHead method was removed from ProducerStateManager by github.com/apache/kafka/commit/c49775b. This meant that snapshots were no longer removed when the log start offset increased, even though the intent of that change was to remove snapshots but preserve the in-memory mapping. This patch adds the required functionality back.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Bob Barrett 2019-10-25 14:40:45 -07:00 committed by Jason Gustafson
parent 27ba8f5a39
commit c5df208281
3 changed files with 49 additions and 2 deletions

View File

@ -1254,6 +1254,7 @@ class Log(@volatile var dir: File,
info(s"Incrementing log start offset to $newLogStartOffset")
logStartOffset = newLogStartOffset
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
producerStateManager.truncateHead(newLogStartOffset)
maybeIncrementFirstUnstableOffset()
}
}

View File

@ -598,8 +598,9 @@ class ProducerStateManager(val topicPartition: TopicPartition,
* Truncate the producer id mapping to the given offset range and reload the entries from the most recent
* snapshot in range (if there is one). We delete snapshot files prior to the logStartOffset but do not remove
* producer state from the map. This means that in-memory and on-disk state can diverge, and in the case of
* broker failover or unclean shutdown, any in-memory state not persisted in the snapshots will be lost.
* Note that the log end offset is assumed to be less than or equal to the high watermark.
* broker failover or unclean shutdown, any in-memory state not persisted in the snapshots will be lost, which
* would lead to UNKNOWN_PRODUCER_ID errors. Note that the log end offset is assumed to be less than or equal
* to the high watermark.
*/
def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long): Unit = {
// remove all out of range snapshots
@ -615,6 +616,8 @@ class ProducerStateManager(val topicPartition: TopicPartition,
// safe to clear the unreplicated transactions
unreplicatedTxns.clear()
loadFromSnapshot(logStartOffset, currentTimeMs)
} else {
truncateHead(logStartOffset)
}
}
@ -688,6 +691,20 @@ class ProducerStateManager(val topicPartition: TopicPartition,
*/
def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFile(file))
/**
* When we remove the head of the log due to retention, we need to remove snapshots older than
* the new log start offset.
*/
def truncateHead(logStartOffset: Long): Unit = {
removeUnreplicatedTransactions(logStartOffset)
if (lastMapOffset < logStartOffset)
lastMapOffset = logStartOffset
deleteSnapshotsBefore(logStartOffset)
lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset)
}
private def removeUnreplicatedTransactions(offset: Long): Unit = {
val iterator = unreplicatedTxns.entrySet.iterator
while (iterator.hasNext) {

View File

@ -1504,6 +1504,35 @@ class LogTest {
log.appendAsLeader(nextRecords, leaderEpoch = 0)
}
@Test
def testDeleteSnapshotsOnIncrementLogStartOffset(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig)
val pid1 = 1L
val pid2 = 2L
val epoch = 0.toShort
log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "a".getBytes)), producerId = pid1,
producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
log.roll()
log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "b".getBytes)), producerId = pid2,
producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
log.roll()
assertEquals(2, log.activeProducersWithLastSequence.size)
assertEquals(2, ProducerStateManager.listSnapshotFiles(log.producerStateManager.logDir).size)
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(2L)
// Deleting records should not remove producer state but should delete snapshots
assertEquals(2, log.activeProducersWithLastSequence.size)
assertEquals(1, ProducerStateManager.listSnapshotFiles(log.producerStateManager.logDir).size)
val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
assertTrue(retainedLastSeqOpt.isDefined)
assertEquals(0, retainedLastSeqOpt.get)
}
/**
* Test for jitter s for time based log roll. This test appends messages then changes the time
* using the mock clock to force the log to roll and checks the number of segments.