KAFKA-12205; Delete snapshots less than the snapshot at the log start (#10021)

This patch adds logic to delete old snapshots. There are three cases we handle:

1. Remove old snapshots after a follower completes fetching a snapshot and truncates the log to the latest snapshot
2. Remove old snapshots after a new snapshot is created.
3. Remove old snapshots during recovery after the node is restarted.

Reviewers: Cao Manh Dat<caomanhdat317@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
dengziming 2021-03-12 02:10:27 +08:00 committed by GitHub
parent 800d9b5abc
commit 0e5591beda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 362 additions and 74 deletions

View File

@ -16,7 +16,7 @@
*/
package kafka.raft
import java.io.File
import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
import java.util.concurrent.ConcurrentSkipListSet
import java.util.{NoSuchElementException, Optional, Properties}
@ -24,9 +24,9 @@ import java.util.{NoSuchElementException, Optional, Properties}
import kafka.api.ApiVersion
import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel}
import kafka.utils.Scheduler
import kafka.utils.{Logging, Scheduler}
import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog}
import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
@ -35,21 +35,14 @@ import scala.compat.java8.OptionConverters._
final class KafkaMetadataLog private (
log: Log,
scheduler: Scheduler,
// This object needs to be thread-safe because it is used by the snapshotting thread to notify the
// polling thread when snapshots are created.
snapshotIds: ConcurrentSkipListSet[OffsetAndEpoch],
topicPartition: TopicPartition,
maxFetchSizeInBytes: Int
) extends ReplicatedLog {
/* The oldest snapshot id is the snapshot at the log start offset. Since the KafkaMetadataLog doesn't
* currently delete snapshots, it is possible for the oldest snapshot id to not be the smallest
* snapshot id in the snapshotIds set.
*/
private[this] var oldestSnapshotId = snapshotIds
.stream()
.filter(_.offset == startOffset)
.findAny()
maxFetchSizeInBytes: Int,
val fileDeleteDelayMs: Long // Visible for testing,
) extends ReplicatedLog with Logging {
override def read(startOffset: Long, readIsolation: Isolation): LogFetchInfo = {
val isolation = readIsolation match {
@ -109,7 +102,7 @@ final class KafkaMetadataLog private (
override def lastFetchedEpoch: Int = {
log.latestEpoch.getOrElse {
latestSnapshotId.map[Int] { snapshotId =>
latestSnapshotId().map[Int] { snapshotId =>
val logEndOffset = endOffset().offset
if (snapshotId.offset == startOffset && snapshotId.offset == logEndOffset) {
// Return the epoch of the snapshot when the log is empty
@ -126,12 +119,12 @@ final class KafkaMetadataLog private (
}
override def endOffsetForEpoch(epoch: Int): OffsetAndEpoch = {
(log.endOffsetForEpoch(epoch), oldestSnapshotId.asScala) match {
(log.endOffsetForEpoch(epoch), earliestSnapshotId().asScala) match {
case (Some(offsetAndEpoch), Some(snapshotId)) if (
offsetAndEpoch.offset == snapshotId.offset &&
offsetAndEpoch.leaderEpoch == epoch) =>
// The epoch is smaller thant the smallest epoch on the log. Overide the diverging
// The epoch is smaller than the smallest epoch on the log. Override the diverging
// epoch to the oldest snapshot which should be the snapshot at the log start offset
new OffsetAndEpoch(snapshotId.offset, snapshotId.epoch)
@ -164,13 +157,14 @@ final class KafkaMetadataLog private (
override def truncateToLatestSnapshot(): Boolean = {
val latestEpoch = log.latestEpoch.getOrElse(0)
latestSnapshotId.asScala match {
latestSnapshotId().asScala match {
case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
(snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) =>
// Truncate the log fully if the latest snapshot is greater than the log end offset
log.truncateFullyAndStartAt(snapshotId.offset)
oldestSnapshotId = latestSnapshotId
// Delete snapshot after truncating
removeSnapshotFilesBefore(snapshotId)
true
@ -228,7 +222,7 @@ final class KafkaMetadataLog private (
if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) {
// Since snapshots are less than the high-watermark absolute offset comparison is okay.
throw new IllegalArgumentException(
s"Attemting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)"
s"Attempting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)"
)
}
}
@ -258,8 +252,13 @@ final class KafkaMetadataLog private (
}
}
override def oldestSnapshotId(): Optional[OffsetAndEpoch] = {
oldestSnapshotId
override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
try {
Optional.of(snapshotIds.first)
} catch {
case _: NoSuchElementException =>
Optional.empty()
}
}
override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
@ -267,14 +266,15 @@ final class KafkaMetadataLog private (
}
override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
latestSnapshotId.asScala match {
latestSnapshotId().asScala match {
case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
startOffset < logStartSnapshotId.offset &&
logStartSnapshotId.offset <= snapshotId.offset &&
log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
log.deleteOldSegments()
oldestSnapshotId = Optional.of(logStartSnapshotId)
// Delete snapshot after increasing LogStartOffset
removeSnapshotFilesBefore(logStartSnapshotId)
true
@ -282,6 +282,32 @@ final class KafkaMetadataLog private (
}
}
/**
* Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
*/
private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Unit = {
val expiredSnapshotIdsIter = snapshotIds.headSet(logStartSnapshotId, false).iterator()
while (expiredSnapshotIdsIter.hasNext) {
val snapshotId = expiredSnapshotIdsIter.next()
// If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists
// on the file system, so we should first remove snapshotId and then delete snapshot file.
expiredSnapshotIdsIter.remove()
val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
val destination = Snapshots.deleteRename(path, snapshotId)
try {
Utils.atomicMoveWithFallback(path, destination)
} catch {
case e: IOException =>
error(s"Error renaming snapshot file: $path to $destination", e)
}
scheduler.schedule(
"delete-snapshot-file",
() => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId),
fileDeleteDelayMs)
}
}
override def close(): Unit = {
log.close()
}
@ -320,9 +346,11 @@ object KafkaMetadataLog {
val metadataLog = new KafkaMetadataLog(
log,
scheduler,
recoverSnapshots(log),
topicPartition,
maxFetchSizeInBytes
maxFetchSizeInBytes,
defaultLogConfig.fileDeleteDelayMs
)
// When recovering, truncate fully if the latest snapshot is after the log end offset. This can happen to a follower
@ -336,7 +364,8 @@ object KafkaMetadataLog {
log: Log
): ConcurrentSkipListSet[OffsetAndEpoch] = {
val snapshotIds = new ConcurrentSkipListSet[OffsetAndEpoch]()
// Scan the log directory; deleting partial snapshots and remembering immutable snapshots
// Scan the log directory; deleting partial snapshots and older snapshot, only remembering immutable snapshots start
// from logStartOffset
Files
.walk(log.dir.toPath, 1)
.map[Optional[SnapshotPath]] { path =>
@ -348,7 +377,10 @@ object KafkaMetadataLog {
}
.forEach { path =>
path.ifPresent { snapshotPath =>
if (snapshotPath.partial) {
if (snapshotPath.partial ||
snapshotPath.deleted ||
snapshotPath.snapshotId.offset < log.logStartOffset) {
// Delete partial snapshot, deleted snapshot and older snapshot
Files.deleteIfExists(snapshotPath.path)
} else {
snapshotIds.add(snapshotPath.snapshotId)

View File

@ -32,13 +32,13 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.raft.internals.BatchBuilder
import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, OffsetAndEpoch, RecordSerde, ReplicatedLog}
import org.apache.kafka.snapshot.{SnapshotPath, Snapshots}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
final class KafkaMetadataLogTest {
import KafkaMetadataLogTest._
var tempDir: File = null
var tempDir: File = _
val mockTime = new MockTime()
@BeforeEach
@ -142,6 +142,39 @@ final class KafkaMetadataLogTest {
assertEquals(offset, log.highWatermark.offset)
}
@Test
def testUpdateLogStartOffsetWillRemoveOlderSnapshot(): Unit = {
val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime)
val offset = 10
val epoch = 0
append(log, offset, epoch)
val oldSnapshotId = new OffsetAndEpoch(offset, epoch)
TestUtils.resource(log.createSnapshot(oldSnapshotId)) { snapshot =>
snapshot.freeze()
}
append(log, offset, epoch, log.endOffset.offset)
val newSnapshotId = new OffsetAndEpoch(offset * 2, epoch)
TestUtils.resource(log.createSnapshot(newSnapshotId)) { snapshot =>
snapshot.freeze()
}
log.updateHighWatermark(new LogOffsetMetadata(offset * 2))
assertTrue(log.deleteBeforeSnapshot(newSnapshotId))
log.close()
mockTime.sleep(log.fileDeleteDelayMs)
// Assert that the log dir doesn't contain any older snapshots
Files
.walk(logDir, 1)
.map[Optional[SnapshotPath]](Snapshots.parse)
.filter(_.isPresent)
.forEach { path =>
assertFalse(path.get.snapshotId.offset < log.startOffset)
}
}
@Test
def testUpdateLogStartOffsetWithMissingSnapshot(): Unit = {
val log = buildMetadataLog(tempDir, mockTime)
@ -212,6 +245,53 @@ final class KafkaMetadataLogTest {
assertEquals(greaterEpochSnapshotId.offset, log.highWatermark.offset)
}
@Test
def testTruncateWillRemoveOlderSnapshot(): Unit = {
val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime)
val numberOfRecords = 10
val epoch = 1
append(log, 1, epoch - 1)
val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1)
TestUtils.resource(log.createSnapshot(oldSnapshotId1)) { snapshot =>
snapshot.freeze()
}
append(log, 1, epoch, log.endOffset.offset)
val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
TestUtils.resource(log.createSnapshot(oldSnapshotId2)) { snapshot =>
snapshot.freeze()
}
append(log, numberOfRecords - 2, epoch, log.endOffset.offset)
val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
TestUtils.resource(log.createSnapshot(oldSnapshotId3)) { snapshot =>
snapshot.freeze()
}
val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)
append(log, numberOfRecords, epoch, log.endOffset.offset)
TestUtils.resource(log.createSnapshot(greaterSnapshotId)) { snapshot =>
snapshot.freeze()
}
assertNotEquals(log.earliestSnapshotId(), log.latestSnapshotId())
assertTrue(log.truncateToLatestSnapshot())
assertEquals(log.earliestSnapshotId(), log.latestSnapshotId())
log.close()
mockTime.sleep(log.fileDeleteDelayMs)
// Assert that the log dir doesn't contain any older snapshots
Files
.walk(logDir, 1)
.map[Optional[SnapshotPath]](Snapshots.parse)
.filter(_.isPresent)
.forEach { path =>
assertFalse(path.get.snapshotId.offset < log.startOffset)
}
}
@Test
def testDoesntTruncateFully(): Unit = {
val log = buildMetadataLog(tempDir, mockTime)
@ -238,7 +318,7 @@ final class KafkaMetadataLogTest {
}
@Test
def testCleanupSnapshots(): Unit = {
def testCleanupPartialSnapshots(): Unit = {
val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime)
val numberOfRecords = 10
val epoch = 1
@ -258,7 +338,7 @@ final class KafkaMetadataLogTest {
val secondLog = buildMetadataLog(tempDir, mockTime)
assertEquals(snapshotId, secondLog.latestSnapshotId.get)
assertEquals(snapshotId, secondLog.latestSnapshotId().get)
assertEquals(0, log.startOffset)
assertEquals(epoch, log.lastFetchedEpoch)
assertEquals(numberOfRecords, log.endOffset().offset)
@ -274,6 +354,55 @@ final class KafkaMetadataLogTest {
}
}
@Test
def testCleanupOlderSnapshots(): Unit = {
val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime)
val numberOfRecords = 10
val epoch = 1
append(log, 1, epoch - 1)
val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1)
TestUtils.resource(log.createSnapshot(oldSnapshotId1)) { snapshot =>
snapshot.freeze()
}
append(log, 1, epoch, log.endOffset.offset)
val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
TestUtils.resource(log.createSnapshot(oldSnapshotId2)) { snapshot =>
snapshot.freeze()
}
append(log, numberOfRecords - 2, epoch, log.endOffset.offset)
val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
TestUtils.resource(log.createSnapshot(oldSnapshotId3)) { snapshot =>
snapshot.freeze()
}
val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)
append(log, numberOfRecords, epoch, log.endOffset.offset)
TestUtils.resource(log.createSnapshot(greaterSnapshotId)) { snapshot =>
snapshot.freeze()
}
log.close()
val secondLog = buildMetadataLog(tempDir, mockTime)
assertEquals(greaterSnapshotId, secondLog.latestSnapshotId().get)
assertEquals(3 * numberOfRecords, secondLog.startOffset)
assertEquals(epoch, secondLog.lastFetchedEpoch)
mockTime.sleep(log.fileDeleteDelayMs)
// Assert that the log dir doesn't contain any older snapshots
Files
.walk(logDir, 1)
.map[Optional[SnapshotPath]](Snapshots.parse)
.filter(_.isPresent)
.forEach { path =>
assertFalse(path.get.snapshotId.offset < log.startOffset)
}
}
@Test
def testCreateReplicatedLogTruncatesFully(): Unit = {
val log = buildMetadataLog(tempDir, mockTime)
@ -290,7 +419,7 @@ final class KafkaMetadataLogTest {
val secondLog = buildMetadataLog(tempDir, mockTime)
assertEquals(snapshotId, secondLog.latestSnapshotId.get)
assertEquals(snapshotId, secondLog.latestSnapshotId().get)
assertEquals(snapshotId.offset, secondLog.startOffset)
assertEquals(snapshotId.epoch, secondLog.lastFetchedEpoch)
assertEquals(snapshotId.offset, secondLog.endOffset().offset)
@ -406,7 +535,7 @@ object KafkaMetadataLogTest {
private def createLogDirectory(logDir: File, logDirName: String): File = {
val logDirPath = logDir.getAbsolutePath
val dir = new File(logDirPath, logDirName)
if (!Files.exists((dir.toPath))) {
if (!Files.exists(dir.toPath)) {
Files.createDirectories(dir.toPath)
}
dir

View File

@ -2155,7 +2155,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
private void maybeUpdateOldestSnapshotId() {
log.latestSnapshotId().ifPresent(snapshotId -> log.deleteBeforeSnapshot(snapshotId));
log.latestSnapshotId().ifPresent(log::deleteBeforeSnapshot);
}
private void wakeup() {

View File

@ -85,10 +85,10 @@ public interface ReplicatedLog extends Closeable {
if (startOffset() == 0 && offset == 0) {
return ValidOffsetAndEpoch.valid(new OffsetAndEpoch(0, 0));
} else if (
oldestSnapshotId().isPresent() &&
earliestSnapshotId().isPresent() &&
((offset < startOffset()) ||
(offset == startOffset() && epoch != oldestSnapshotId().get().epoch) ||
(epoch < oldestSnapshotId().get().epoch))
(offset == startOffset() && epoch != earliestSnapshotId().get().epoch) ||
(epoch < earliestSnapshotId().get().epoch))
) {
/* Send a snapshot if the leader has a snapshot at the log start offset and
* 1. the fetch offset is less than the log start offset or
@ -96,15 +96,12 @@ public interface ReplicatedLog extends Closeable {
* the oldest snapshot or
* 3. last fetch epoch is less than the oldest snapshot's epoch
*/
OffsetAndEpoch latestSnapshotId = latestSnapshotId().orElseThrow(() -> {
return new IllegalStateException(
String.format(
"Log start offset (%s) is greater than zero but latest snapshot was not found",
startOffset()
)
);
});
OffsetAndEpoch latestSnapshotId = latestSnapshotId().orElseThrow(() -> new IllegalStateException(
String.format(
"Log start offset (%s) is greater than zero but latest snapshot was not found",
startOffset()
)
));
return ValidOffsetAndEpoch.snapshot(latestSnapshotId);
} else {
@ -263,7 +260,7 @@ public interface ReplicatedLog extends Closeable {
* @return an Optional snapshot id at the log start offset if nonzero, otherwise returns an empty
* Optional
*/
Optional<OffsetAndEpoch> oldestSnapshotId();
Optional<OffsetAndEpoch> earliestSnapshotId();
/**
* Notifies the replicated log when a new snapshot is available.

View File

@ -23,11 +23,13 @@ public final class SnapshotPath {
public final Path path;
public final OffsetAndEpoch snapshotId;
public final boolean partial;
public final boolean deleted;
public SnapshotPath(Path path, OffsetAndEpoch snapshotId, boolean partial) {
public SnapshotPath(Path path, OffsetAndEpoch snapshotId, boolean partial, boolean deleted) {
this.path = path;
this.snapshotId = snapshotId;
this.partial = partial;
this.deleted = deleted;
}
@Override

View File

@ -16,16 +16,21 @@
*/
package org.apache.kafka.snapshot;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.NumberFormat;
import java.util.Optional;
import org.apache.kafka.raft.OffsetAndEpoch;
public final class Snapshots {
private static final String SUFFIX = ".checkpoint";
private static final Logger log = LoggerFactory.getLogger(Snapshots.class);
private static final String SUFFIX = ".checkpoint";
private static final String PARTIAL_SUFFIX = String.format("%s.part", SUFFIX);
private static final String DELETE_SUFFIX = String.format("%s.deleted", SUFFIX);
private static final NumberFormat OFFSET_FORMATTER = NumberFormat.getInstance();
private static final NumberFormat EPOCH_FORMATTER = NumberFormat.getInstance();
@ -45,7 +50,7 @@ public final class Snapshots {
return logDir;
}
static Path snapshotPath(Path logDir, OffsetAndEpoch snapshotId) {
public static Path snapshotPath(Path logDir, OffsetAndEpoch snapshotId) {
return snapshotDir(logDir).resolve(filenameFromSnapshotId(snapshotId) + SUFFIX);
}
@ -57,6 +62,10 @@ public final class Snapshots {
return source.resolveSibling(filenameFromSnapshotId(snapshotId) + SUFFIX);
}
public static Path deleteRename(Path source, OffsetAndEpoch snapshotId) {
return source.resolveSibling(filenameFromSnapshotId(snapshotId) + DELETE_SUFFIX);
}
public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws IOException {
Path dir = snapshotDir(logDir);
@ -76,12 +85,13 @@ public final class Snapshots {
String name = filename.toString();
boolean partial;
boolean partial = false;
boolean deleted = false;
if (name.endsWith(PARTIAL_SUFFIX)) {
partial = true;
} else if (name.endsWith(SUFFIX)) {
partial = false;
} else {
} else if (name.endsWith(DELETE_SUFFIX)) {
deleted = true;
} else if (!name.endsWith(SUFFIX)) {
return Optional.empty();
}
@ -90,6 +100,22 @@ public final class Snapshots {
name.substring(OFFSET_WIDTH + 1, OFFSET_WIDTH + EPOCH_WIDTH + 1)
);
return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial));
return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial, deleted));
}
/**
* Delete the snapshot from the filesystem, the caller may firstly rename snapshot file to
* ${file}.deleted, so we try to delete the file as well as the renamed file if exists.
*/
public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) {
Path immutablePath = Snapshots.snapshotPath(logDir, snapshotId);
Path deletingPath = Snapshots.deleteRename(immutablePath, snapshotId);
try {
return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletingPath);
} catch (IOException e) {
log.error("Error deleting snapshot file " + deletingPath, e);
return false;
}
}
}

View File

@ -62,7 +62,6 @@ public class MockLog implements ReplicatedLog {
private long nextId = ID_GENERATOR.getAndIncrement();
private LogOffsetMetadata highWatermark = new LogOffsetMetadata(0, Optional.empty());
private long lastFlushedOffset = 0;
private Optional<OffsetAndEpoch> oldestSnapshotId = Optional.empty();
public MockLog(TopicPartition topicPartition) {
this.topicPartition = topicPartition;
@ -89,7 +88,7 @@ public class MockLog implements ReplicatedLog {
batches.clear();
epochStartOffsets.clear();
oldestSnapshotId = Optional.of(snapshotId);
snapshots.headMap(snapshotId, false).clear();
updateHighWatermark(new LogOffsetMetadata(snapshotId.offset));
flush();
@ -174,14 +173,12 @@ public class MockLog implements ReplicatedLog {
@Override
public int lastFetchedEpoch() {
return logLastFetchedEpoch().orElseGet(() -> {
return latestSnapshotId().map(id -> id.epoch).orElse(0);
});
return logLastFetchedEpoch().orElseGet(() -> latestSnapshotId().map(id -> id.epoch).orElse(0));
}
@Override
public OffsetAndEpoch endOffsetForEpoch(int epoch) {
int epochLowerBound = oldestSnapshotId.map(id -> id.epoch).orElse(0);
int epochLowerBound = earliestSnapshotId().map(id -> id.epoch).orElse(0);
for (EpochStartOffset epochStartOffset : epochStartOffsets) {
if (epochStartOffset.epoch > epoch) {
return new OffsetAndEpoch(epochStartOffset.startOffset, epochLowerBound);
@ -216,7 +213,7 @@ public class MockLog implements ReplicatedLog {
}
private long logStartOffset() {
return oldestSnapshotId.map(id -> id.offset).orElse(0L);
return earliestSnapshotId().map(id -> id.offset).orElse(0L);
}
private List<LogEntry> buildEntries(RecordBatch batch, Function<Record, Long> offsetSupplier) {
@ -415,8 +412,12 @@ public class MockLog implements ReplicatedLog {
}
@Override
public Optional<OffsetAndEpoch> oldestSnapshotId() {
return oldestSnapshotId;
public Optional<OffsetAndEpoch> earliestSnapshotId() {
try {
return Optional.of(snapshots.firstKey());
} catch (NoSuchElementException e) {
return Optional.empty();
}
}
@Override
@ -441,11 +442,11 @@ public class MockLog implements ReplicatedLog {
Optional<OffsetAndEpoch> snapshotIdOpt = latestSnapshotId();
if (snapshotIdOpt.isPresent()) {
OffsetAndEpoch snapshotId = snapshotIdOpt.get();
if (logStartOffset() < logStartSnapshotId.offset &&
if (startOffset() < logStartSnapshotId.offset &&
highWatermark.offset >= logStartSnapshotId.offset &&
snapshotId.offset >= logStartSnapshotId.offset) {
oldestSnapshotId = Optional.of(logStartSnapshotId);
snapshots.headMap(logStartSnapshotId, false).clear();
batches.removeIf(entry -> entry.lastOffset() < logStartSnapshotId.offset);

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -57,6 +58,11 @@ public class MockLogTest {
log = new MockLog(topicPartition);
}
@AfterEach
public void cleanup() {
log.close();
}
@Test
public void testTopicPartition() {
assertEquals(topicPartition, log.topicPartition());
@ -594,6 +600,53 @@ public class MockLogTest {
assertFalse(log.truncateToLatestSnapshot());
}
@Test
public void testTruncateWillRemoveOlderSnapshot() throws IOException {
int numberOfRecords = 10;
int epoch = 0;
OffsetAndEpoch sameEpochSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch);
appendBatch(numberOfRecords, epoch);
try (RawSnapshotWriter snapshot = log.createSnapshot(sameEpochSnapshotId)) {
snapshot.freeze();
}
OffsetAndEpoch greaterEpochSnapshotId = new OffsetAndEpoch(2 * numberOfRecords, epoch + 1);
appendBatch(numberOfRecords, epoch);
try (RawSnapshotWriter snapshot = log.createSnapshot(greaterEpochSnapshotId)) {
snapshot.freeze();
}
assertTrue(log.truncateToLatestSnapshot());
assertEquals(Optional.empty(), log.readSnapshot(sameEpochSnapshotId));
}
@Test
public void testUpdateLogStartOffsetWillRemoveOlderSnapshot() throws IOException {
int numberOfRecords = 10;
int epoch = 0;
OffsetAndEpoch sameEpochSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch);
appendBatch(numberOfRecords, epoch);
try (RawSnapshotWriter snapshot = log.createSnapshot(sameEpochSnapshotId)) {
snapshot.freeze();
}
OffsetAndEpoch greaterEpochSnapshotId = new OffsetAndEpoch(2 * numberOfRecords, epoch + 1);
appendBatch(numberOfRecords, epoch);
try (RawSnapshotWriter snapshot = log.createSnapshot(greaterEpochSnapshotId)) {
snapshot.freeze();
}
log.updateHighWatermark(new LogOffsetMetadata(greaterEpochSnapshotId.offset));
assertTrue(log.deleteBeforeSnapshot(greaterEpochSnapshotId));
assertEquals(Optional.empty(), log.readSnapshot(sameEpochSnapshotId));
}
private Optional<OffsetRange> readOffsets(long startOffset, Isolation isolation) {
Records records = log.read(startOffset, isolation).records;
long firstReadOffset = -1L;

View File

@ -16,15 +16,22 @@
*/
package org.apache.kafka.snapshot;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
final public class SnapshotsTest {
@ -34,12 +41,13 @@ final public class SnapshotsTest {
TestUtils.RANDOM.nextInt(Integer.MAX_VALUE),
TestUtils.RANDOM.nextInt(Integer.MAX_VALUE)
);
Path path = Snapshots.snapshotPath(FileSystems.getDefault().getPath("/root"), snapshotId);
Path path = Snapshots.snapshotPath(TestUtils.tempDirectory().toPath(), snapshotId);
SnapshotPath snapshotPath = Snapshots.parse(path).get();
assertEquals(path, snapshotPath.path);
assertEquals(snapshotId, snapshotPath.snapshotId);
assertEquals(false, snapshotPath.partial);
assertFalse(snapshotPath.partial);
assertFalse(snapshotPath.deleted);
}
@Test
@ -57,7 +65,21 @@ final public class SnapshotsTest {
assertEquals(path, snapshotPath.path);
assertEquals(snapshotId, snapshotPath.snapshotId);
assertEquals(true, snapshotPath.partial);
assertTrue(snapshotPath.partial);
}
@Test
public void testValidDeletedSnapshotFilename() {
OffsetAndEpoch snapshotId = new OffsetAndEpoch(
TestUtils.RANDOM.nextInt(Integer.MAX_VALUE),
TestUtils.RANDOM.nextInt(Integer.MAX_VALUE)
);
Path path = Snapshots.snapshotPath(TestUtils.tempDirectory().toPath(), snapshotId);
Path deletedPath = Snapshots.deleteRename(path, snapshotId);
SnapshotPath snapshotPath = Snapshots.parse(deletedPath).get();
assertEquals(snapshotId, snapshotPath.snapshotId);
assertTrue(snapshotPath.deleted);
}
@Test
@ -75,4 +97,30 @@ final public class SnapshotsTest {
// partition metadata
assertEquals(Optional.empty(), Snapshots.parse(root.resolve("partition.metadata")));
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDeleteSnapshot(boolean renameBeforeDeleting) throws IOException {
OffsetAndEpoch snapshotId = new OffsetAndEpoch(
TestUtils.RANDOM.nextInt(Integer.MAX_VALUE),
TestUtils.RANDOM.nextInt(Integer.MAX_VALUE)
);
Path logDirPath = TestUtils.tempDirectory().toPath();
try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(logDirPath, snapshotId, Optional.empty())) {
snapshot.freeze();
Path snapshotPath = Snapshots.snapshotPath(logDirPath, snapshotId);
assertTrue(Files.exists(snapshotPath));
if (renameBeforeDeleting)
// rename snapshot before deleting
Utils.atomicMoveWithFallback(snapshotPath, Snapshots.deleteRename(snapshotPath, snapshotId));
assertTrue(Snapshots.deleteSnapshotIfExists(logDirPath, snapshot.snapshotId()));
assertFalse(Files.exists(snapshotPath));
assertFalse(Files.exists(Snapshots.deleteRename(snapshotPath, snapshotId)));
}
}
}