From c5df2082811af3cd3d7bc20974dddd5780a5836e Mon Sep 17 00:00:00 2001 From: Bob Barrett Date: Fri, 25 Oct 2019 14:40:45 -0700 Subject: [PATCH] KAFKA-9105; Add back truncateHead method to ProducerStateManager (#7599) The truncateHead method was removed from ProducerStateManager by github.com/apache/kafka/commit/c49775b. This meant that snapshots were no longer removed when the log start offset increased, even though the intent of that change was to remove snapshots but preserve the in-memory mapping. This patch adds the required functionality back. Reviewers: Jason Gustafson --- core/src/main/scala/kafka/log/Log.scala | 1 + .../kafka/log/ProducerStateManager.scala | 21 ++++++++++++-- .../test/scala/unit/kafka/log/LogTest.scala | 29 +++++++++++++++++++ 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 5e04c3cb55c..ccf1d167632 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1254,6 +1254,7 @@ class Log(@volatile var dir: File, info(s"Incrementing log start offset to $newLogStartOffset") logStartOffset = newLogStartOffset leaderEpochCache.foreach(_.truncateFromStart(logStartOffset)) + producerStateManager.truncateHead(newLogStartOffset) maybeIncrementFirstUnstableOffset() } } diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index ae5b77ab95f..04dafd8a04a 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -598,8 +598,9 @@ class ProducerStateManager(val topicPartition: TopicPartition, * Truncate the producer id mapping to the given offset range and reload the entries from the most recent * snapshot in range (if there is one). We delete snapshot files prior to the logStartOffset but do not remove * producer state from the map. This means that in-memory and on-disk state can diverge, and in the case of - * broker failover or unclean shutdown, any in-memory state not persisted in the snapshots will be lost. - * Note that the log end offset is assumed to be less than or equal to the high watermark. + * broker failover or unclean shutdown, any in-memory state not persisted in the snapshots will be lost, which + * would lead to UNKNOWN_PRODUCER_ID errors. Note that the log end offset is assumed to be less than or equal + * to the high watermark. */ def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long): Unit = { // remove all out of range snapshots @@ -615,6 +616,8 @@ class ProducerStateManager(val topicPartition: TopicPartition, // safe to clear the unreplicated transactions unreplicatedTxns.clear() loadFromSnapshot(logStartOffset, currentTimeMs) + } else { + truncateHead(logStartOffset) } } @@ -688,6 +691,20 @@ class ProducerStateManager(val topicPartition: TopicPartition, */ def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFile(file)) + /** + * When we remove the head of the log due to retention, we need to remove snapshots older than + * the new log start offset. + */ + def truncateHead(logStartOffset: Long): Unit = { + removeUnreplicatedTransactions(logStartOffset) + + if (lastMapOffset < logStartOffset) + lastMapOffset = logStartOffset + + deleteSnapshotsBefore(logStartOffset) + lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset) + } + private def removeUnreplicatedTransactions(offset: Long): Unit = { val iterator = unreplicatedTxns.entrySet.iterator while (iterator.hasNext) { diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index fdf40c64c0f..570e252dc7d 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -1504,6 +1504,35 @@ class LogTest { log.appendAsLeader(nextRecords, leaderEpoch = 0) } + @Test + def testDeleteSnapshotsOnIncrementLogStartOffset(): Unit = { + val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) + val log = createLog(logDir, logConfig) + val pid1 = 1L + val pid2 = 2L + val epoch = 0.toShort + + log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "a".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 0), leaderEpoch = 0) + log.roll() + log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "b".getBytes)), producerId = pid2, + producerEpoch = epoch, sequence = 0), leaderEpoch = 0) + log.roll() + + assertEquals(2, log.activeProducersWithLastSequence.size) + assertEquals(2, ProducerStateManager.listSnapshotFiles(log.producerStateManager.logDir).size) + + log.updateHighWatermark(log.logEndOffset) + log.maybeIncrementLogStartOffset(2L) + + // Deleting records should not remove producer state but should delete snapshots + assertEquals(2, log.activeProducersWithLastSequence.size) + assertEquals(1, ProducerStateManager.listSnapshotFiles(log.producerStateManager.logDir).size) + val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2) + assertTrue(retainedLastSeqOpt.isDefined) + assertEquals(0, retainedLastSeqOpt.get) + } + /** * Test for jitter s for time based log roll. This test appends messages then changes the time * using the mock clock to force the log to roll and checks the number of segments.