mirror of https://github.com/apache/kafka.git
KAFKA-10706; Ensure leader epoch cache is cleaned after truncation to end offset (#9633)
This patch fixes a liveness bug which prevents follower truncation from completing after a leader election. If there are consecutive leader elections without writing any data entries, then the leader and follower may have conflicting epoch entries at the end of the log. The problem is the shortcut return in `Log.truncateTo` when the truncation offset is larger than or equal to the end offset, which prevents the conflicting entries from being resolved. Here we change this case to ensure `LeaderEpochFileCache.truncateFromEnd` is still called. Reviewers: Jun Rao <junrao@gmail.com>
This commit is contained in:
parent
012cea2f1b
commit
cb60abc1df
|
@ -2072,6 +2072,15 @@ class Log(@volatile var dir: File,
|
|||
throw new IllegalArgumentException(s"Cannot truncate partition $topicPartition to a negative offset (%d).".format(targetOffset))
|
||||
if (targetOffset >= logEndOffset) {
|
||||
info(s"Truncating to $targetOffset has no effect as the largest offset in the log is ${logEndOffset - 1}")
|
||||
|
||||
// Always truncate epoch cache since we may have a conflicting epoch entry at the
|
||||
// end of the log from the leader. This could happen if this broker was a leader
|
||||
// and inserted the first start offset entry, but then failed to append any entries
|
||||
// before another leader was elected.
|
||||
lock synchronized {
|
||||
leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset))
|
||||
}
|
||||
|
||||
false
|
||||
} else {
|
||||
info(s"Truncating to offset $targetOffset")
|
||||
|
|
|
@ -469,6 +469,37 @@ class LogTest {
|
|||
assertEquals(101L, log.logEndOffset)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testTruncateToEndOffsetClearsEpochCache(): Unit = {
|
||||
val log = createLog(logDir, LogConfig())
|
||||
|
||||
// Seed some initial data in the log
|
||||
val records = TestUtils.records(List(new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)),
|
||||
baseOffset = 27)
|
||||
appendAsFollower(log, records, leaderEpoch = 19)
|
||||
assertEquals(Some(19), log.leaderEpochCache.flatMap(_.latestEpoch))
|
||||
assertEquals(29, log.logEndOffset)
|
||||
|
||||
def verifyTruncationClearsEpochCache(epoch: Int, truncationOffset: Long): Unit = {
|
||||
// Simulate becoming a leader
|
||||
log.maybeAssignEpochStartOffset(leaderEpoch = epoch, startOffset = log.logEndOffset)
|
||||
assertEquals(Some(epoch), log.leaderEpochCache.flatMap(_.latestEpoch))
|
||||
assertEquals(29, log.logEndOffset)
|
||||
|
||||
// Now we become the follower and truncate to an offset greater
|
||||
// than or equal to the log end offset. The trivial epoch entry
|
||||
// at the end of the log should be gone
|
||||
log.truncateTo(truncationOffset)
|
||||
assertEquals(Some(19), log.leaderEpochCache.flatMap(_.latestEpoch))
|
||||
assertEquals(29, log.logEndOffset)
|
||||
}
|
||||
|
||||
// Truncations greater than or equal to the log end offset should
|
||||
// clear the epoch cache
|
||||
verifyTruncationClearsEpochCache(epoch = 20, truncationOffset = log.logEndOffset)
|
||||
verifyTruncationClearsEpochCache(epoch = 24, truncationOffset = log.logEndOffset + 1)
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the values returned by the logSegments call
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue