KAFKA-10394: Add classes to read and write snapshot for KIP-630 (#9512)

This PR adds support for generating snapshot for KIP-630.

1. Adds the interfaces `RawSnapshotWriter` and `RawSnapshotReader` and the implementations `FileRawSnapshotWriter` and `FileRawSnapshotReader` respectively. These interfaces and implementations are low level API for writing and reading snapshots. They are internal to the Raft implementation and are not exposed to the users of `RaftClient`. They operation at the `Record` level. These types are exposed to the `RaftClient` through the `ReplicatedLog` interface.

2. Adds a buffered snapshot writer: `SnapshotWriter<T>`. This type is a higher-level type and it is exposed through the `RaftClient` interface. A future PR will add the related `SnapshotReader<T>`, which will be used by the state machine to load a snapshot.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
José Armando García Sancio 2020-12-07 14:06:25 -08:00 committed by GitHub
parent b8ebcc2a93
commit ab0807dd85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1153 additions and 14 deletions

View File

@ -318,6 +318,7 @@
<subpackage name="raft">
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.message" />
@ -329,6 +330,12 @@
<allow pkg="com.fasterxml.jackson" />
</subpackage>
<subpackage name="snapshot">
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.test"/>
</subpackage>
<subpackage name="connect">
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.connect.data" />

View File

@ -1321,4 +1321,9 @@ public final class Utils {
return date.getTime();
}
}
@SuppressWarnings("unchecked")
public static <S> Iterator<S> covariantCast(Iterator<? extends S> iterator) {
return (Iterator<S>) iterator;
}
}

View File

@ -16,6 +16,7 @@
*/
package kafka.raft
import java.nio.file.NoSuchFileException
import java.util.Optional
import kafka.log.{AppendOrigin, Log}
@ -24,6 +25,10 @@ import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.raft
import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, Isolation, ReplicatedLog}
import org.apache.kafka.snapshot.FileRawSnapshotReader
import org.apache.kafka.snapshot.FileRawSnapshotWriter
import org.apache.kafka.snapshot.RawSnapshotReader
import org.apache.kafka.snapshot.RawSnapshotWriter
import scala.compat.java8.OptionConverters._
@ -141,6 +146,18 @@ class KafkaMetadataLog(
topicPartition
}
override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = {
FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
}
override def readSnapshot(snapshotId: raft.OffsetAndEpoch): Optional[RawSnapshotReader] = {
try {
Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
} catch {
case e: NoSuchFileException => Optional.empty()
}
}
override def close(): Unit = {
log.close()
}

View File

@ -23,14 +23,14 @@ import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.DescribeQuorumRequestData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.Metrics;
@ -61,6 +61,7 @@ import org.apache.kafka.raft.internals.KafkaRaftMetrics;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.raft.internals.RecordsBatchReader;
import org.apache.kafka.raft.internals.ThresholdPurgatory;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.slf4j.Logger;
import java.io.IOException;
@ -1815,6 +1816,18 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
return shutdownComplete;
}
@Override
public SnapshotWriter<T> createSnapshot(OffsetAndEpoch snapshotId) throws IOException {
return new SnapshotWriter<>(
log.createSnapshot(snapshotId),
MAX_BATCH_SIZE,
memoryPool,
time,
CompressionType.NONE,
serde
);
}
private void close() {
kafkaRaftMetrics.close();
}

View File

@ -35,26 +35,26 @@ import java.util.stream.Collectors;
* only valid state transitions. Below we define the possible state transitions and
* how they are triggered:
*
* Unattached|Resigned =>
* Unattached|Resigned transitions to:
* Unattached: After learning of a new election with a higher epoch
* Voted: After granting a vote to a candidate
* Candidate: After expiration of the election timeout
* Follower: After discovering a leader with an equal or larger epoch
*
* Voted =>
* Voted transitions to:
* Unattached: After learning of a new election with a higher epoch
* Candidate: After expiration of the election timeout
*
* Candidate =>
* Candidate transitions to:
* Unattached: After learning of a new election with a higher epoch
* Candidate: After expiration of the election timeout
* Leader: After receiving a majority of votes
*
* Leader =>
* Leader transitions to:
* Unattached: After learning of a new election with a higher epoch
* Resigned: When shutting down gracefully
*
* Follower =>
* Follower transitions to:
* Unattached: After learning of a new election with a higher epoch
* Candidate: After expiration of the fetch timeout
* Follower: After discovering a leader with a larger epoch
@ -63,11 +63,11 @@ import java.util.stream.Collectors;
* states are not possible for observers, so the only transitions that are possible
* are between Unattached and Follower.
*
* Unattached =>
* Unattached transitions to:
* Unattached: After learning of a new election with a higher epoch
* Follower: After discovering a leader with an equal or larger epoch
*
* Follower =>
* Follower transitions to:
* Unattached: After learning of a new election with a higher epoch
* Follower: After discovering a leader with a larger epoch
*

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.raft;
import org.apache.kafka.snapshot.SnapshotWriter;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@ -100,4 +102,15 @@ public interface RaftClient<T> {
*/
CompletableFuture<Void> shutdown(int timeoutMs);
/**
* Create a writable snapshot file for a given offset and epoch.
*
* The RaftClient assumes that the snapshot return will contain the records up to but
* not including the end offset in the snapshot id. See {@link SnapshotWriter} for
* details on how to use this object.
*
* @param snapshotId the end offset and epoch that identifies the snapshot
* @return a writable snapshot
*/
SnapshotWriter<T> createSnapshot(OffsetAndEpoch snapshotId) throws IOException;
}

View File

@ -18,8 +18,11 @@ package org.apache.kafka.raft;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import java.util.OptionalLong;
@ -149,6 +152,29 @@ public interface ReplicatedLog extends Closeable {
return OptionalLong.of(truncationOffset);
}
/**
* Create a writable snapshot for the given snapshot id.
*
* See {@link RawSnapshotWriter} for details on how to use this object.
*
* @param snapshotId the end offset and epoch that identifies the snapshot
* @return a writable snapshot
*/
RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) throws IOException;
/**
* Opens a readable snapshot for the given snapshot id.
*
* Returns an Optional with a readable snapshot, if the snapshot exists, otherwise
* returns an empty Optional. See {@link RawSnapshotReader} for details on how to
* use this object.
*
* @param snapshotId the end offset and epoch that identifies the snapshot
* @return an Optional with a readable snapshot, if the snapshot exists, otherwise
* returns an empty Optional
*/
Optional<RawSnapshotReader> readSnapshot(OffsetAndEpoch snapshotId) throws IOException;
default void close() {}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.snapshot;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Iterator;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.OffsetAndEpoch;
public final class FileRawSnapshotReader implements RawSnapshotReader {
private final FileRecords fileRecords;
private final OffsetAndEpoch snapshotId;
private FileRawSnapshotReader(FileRecords fileRecords, OffsetAndEpoch snapshotId) {
this.fileRecords = fileRecords;
this.snapshotId = snapshotId;
}
@Override
public OffsetAndEpoch snapshotId() {
return snapshotId;
}
@Override
public long sizeInBytes() {
return fileRecords.sizeInBytes();
}
@Override
public Iterator<RecordBatch> iterator() {
return Utils.covariantCast(fileRecords.batchIterator());
}
@Override
public int read(ByteBuffer buffer, long position) throws IOException {
return fileRecords.channel().read(buffer, position);
}
@Override
public void close() throws IOException {
fileRecords.close();
}
/**
* Opens a snapshot for reading.
*
* @param logDir the directory for the topic partition
* @param snapshotId the end offset and epoch for the snapshotId
* @throws java.nio.file.NoSuchFileException if the snapshot doesn't exist
* @throws IOException for any IO error while opening the snapshot
*/
public static FileRawSnapshotReader open(Path logDir, OffsetAndEpoch snapshotId) throws IOException {
FileRecords fileRecords = FileRecords.open(
Snapshots.snapshotPath(logDir, snapshotId).toFile(),
false, // mutable
true, // fileAlreadyExists
0, // initFileSize
false // preallocate
);
return new FileRawSnapshotReader(fileRecords, snapshotId);
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.snapshot;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.OffsetAndEpoch;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
public final class FileRawSnapshotWriter implements RawSnapshotWriter {
private final Path tempSnapshotPath;
private final FileChannel channel;
private final OffsetAndEpoch snapshotId;
private boolean frozen = false;
private FileRawSnapshotWriter(
Path tempSnapshotPath,
FileChannel channel,
OffsetAndEpoch snapshotId
) {
this.tempSnapshotPath = tempSnapshotPath;
this.channel = channel;
this.snapshotId = snapshotId;
}
@Override
public OffsetAndEpoch snapshotId() {
return snapshotId;
}
@Override
public long sizeInBytes() throws IOException {
return channel.size();
}
@Override
public void append(ByteBuffer buffer) throws IOException {
if (frozen) {
throw new IllegalStateException(
String.format("Append is not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
);
}
Utils.writeFully(channel, buffer);
}
@Override
public boolean isFrozen() {
return frozen;
}
@Override
public void freeze() throws IOException {
if (frozen) {
throw new IllegalStateException(
String.format("Freeze is not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
);
}
channel.close();
frozen = true;
// Set readonly and ignore the result
if (!tempSnapshotPath.toFile().setReadOnly()) {
throw new IOException(String.format("Unable to set file (%s) as read-only", tempSnapshotPath));
}
Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId);
Utils.atomicMoveWithFallback(tempSnapshotPath, destination);
}
@Override
public void close() throws IOException {
try {
channel.close();
} finally {
// This is a noop if freeze was called before calling close
Files.deleteIfExists(tempSnapshotPath);
}
}
/**
* Create a snapshot writer for topic partition log dir and snapshot id.
*
* @param logDir the directory for the topic partition
* @param snapshotId the end offset and epoch for the snapshotId
* @throws IOException for any IO error while creating the snapshot
*/
public static FileRawSnapshotWriter create(Path logDir, OffsetAndEpoch snapshotId) throws IOException {
Path path = Snapshots.createTempFile(logDir, snapshotId);
return new FileRawSnapshotWriter(
path,
FileChannel.open(path, Utils.mkSet(StandardOpenOption.WRITE, StandardOpenOption.APPEND)),
snapshotId
);
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.snapshot;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.raft.OffsetAndEpoch;
/**
* Interface for reading snapshots as a sequence of records.
*/
public interface RawSnapshotReader extends Closeable, Iterable<RecordBatch> {
/**
* Returns the end offset and epoch for the snapshot.
*/
public OffsetAndEpoch snapshotId();
/**
* Returns the number of bytes for the snapshot.
*
* @throws IOException for any IO error while reading the size
*/
public long sizeInBytes() throws IOException;
/**
* Reads bytes from position into the given buffer.
*
* It is not guarantee that the given buffer will be filled.
*
* @param buffer byte buffer to put the read files
* @param position the starting position in the snapshot to read
* @return the number of bytes read
* @throws IOException for any IO error while reading the snapshot
*/
public int read(ByteBuffer buffer, long position) throws IOException;
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.snapshot;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.kafka.raft.OffsetAndEpoch;
/**
* Interface for writing snapshot as a sequence of records.
*/
public interface RawSnapshotWriter extends Closeable {
/**
* Returns the end offset and epoch for the snapshot.
*/
public OffsetAndEpoch snapshotId();
/**
* Returns the number of bytes for the snapshot.
*
* @throws IOException for any IO error while reading the size
*/
public long sizeInBytes() throws IOException;
/**
* Fully appends the buffer to the snapshot.
*
* If the method returns without an exception the given buffer was fully writing the
* snapshot.
*
* @param buffer the buffer to append
* @throws IOException for any IO error during append
*/
public void append(ByteBuffer buffer) throws IOException;
/**
* Returns true if the snapshot has been frozen, otherwise false is returned.
*
* Modification to the snapshot are not allowed once it is frozen.
*/
public boolean isFrozen();
/**
* Freezes the snapshot and marking it as immutable.
*
* @throws IOException for any IO error during freezing
*/
public void freeze() throws IOException;
/**
* Closes the snapshot writer.
*
* If close is called without first calling freeze the the snapshot is aborted.
*
* @throws IOException for any IO error during close
*/
public void close() throws IOException;
}

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.snapshot;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RecordSerde;
import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
import org.apache.kafka.raft.internals.BatchAccumulator;
/**
* A type for writing a snapshot fora given end offset and epoch.
*
* A snapshot writer can be used to append objects until freeze is called. When freeze is
* called the snapshot is validated and marked as immutable. After freeze is called any
* append will fail with an exception.
*
* It is assumed that the content of the snapshot represents all of the records for the
* topic partition from offset 0 up to but not including the end offset in the snapshot
* id.
*
* @see org.apache.kafka.raft.RaftClient#createSnapshot(OffsetAndEpoch)
*/
final public class SnapshotWriter<T> implements Closeable {
final private RawSnapshotWriter snapshot;
final private BatchAccumulator<T> accumulator;
final private Time time;
/**
* Initializes a new instance of the class.
*
* @param snapshot the low level snapshot writer
* @param maxBatchSize the maximum size in byte for a batch
* @param memoryPool the memory pool for buffer allocation
* @param time the clock implementation
* @param compressionType the compression algorithm to use
* @param serde the record serialization and deserialization implementation
*/
public SnapshotWriter(
RawSnapshotWriter snapshot,
int maxBatchSize,
MemoryPool memoryPool,
Time time,
CompressionType compressionType,
RecordSerde<T> serde
) {
this.snapshot = snapshot;
this.time = time;
this.accumulator = new BatchAccumulator<>(
snapshot.snapshotId().epoch,
0,
Integer.MAX_VALUE,
maxBatchSize,
memoryPool,
time,
compressionType,
serde
);
}
/**
* Returns the end offset and epoch for the snapshot.
*/
public OffsetAndEpoch snapshotId() {
return snapshot.snapshotId();
}
/**
* Returns true if the snapshot has been frozen, otherwise false is returned.
*
* Modification to the snapshot are not allowed once it is frozen.
*/
public boolean isFrozen() {
return snapshot.isFrozen();
}
/**
* Appends a list of values to the snapshot.
*
* The list of record passed are guaranteed to get written together.
*
* @param records the list of records to append to the snapshot
* @throws IOException for any IO error while appending
* @throws IllegalStateException if append is called when isFrozen is true
*/
public void append(List<T> records) throws IOException {
if (snapshot.isFrozen()) {
String message = String.format(
"Append not supported. Snapshot is already frozen: id = {}.",
snapshot.snapshotId()
);
throw new IllegalStateException(message);
}
accumulator.append(snapshot.snapshotId().epoch, records);
if (accumulator.needsDrain(time.milliseconds())) {
appendBatches(accumulator.drain());
}
}
/**
* Freezes the snapshot by flushing all pending writes and marking it as immutable.
*
* @throws IOException for any IO error during freezing
*/
public void freeze() throws IOException {
appendBatches(accumulator.drain());
snapshot.freeze();
accumulator.close();
}
/**
* Closes the snapshot writer.
*
* If close is called without first calling freeze the the snapshot is aborted.
*
* @throws IOException for any IO error during close
*/
public void close() throws IOException {
snapshot.close();
accumulator.close();
}
private void appendBatches(List<CompletedBatch<T>> batches) throws IOException {
try {
for (CompletedBatch batch : batches) {
snapshot.append(batch.data.buffer());
}
} finally {
batches.forEach(CompletedBatch::release);
}
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.snapshot;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.NumberFormat;
import org.apache.kafka.raft.OffsetAndEpoch;
final class Snapshots {
private static final String SUFFIX = ".checkpoint";
private static final String PARTIAL_SUFFIX = String.format("%s.part", SUFFIX);
private static final NumberFormat OFFSET_FORMATTER = NumberFormat.getInstance();
private static final NumberFormat EPOCH_FORMATTER = NumberFormat.getInstance();
static {
OFFSET_FORMATTER.setMinimumIntegerDigits(20);
OFFSET_FORMATTER.setGroupingUsed(false);
EPOCH_FORMATTER.setMinimumIntegerDigits(10);
EPOCH_FORMATTER.setGroupingUsed(false);
}
static Path snapshotDir(Path logDir) {
return logDir;
}
static Path snapshotPath(Path logDir, OffsetAndEpoch snapshotId) {
return snapshotDir(logDir).resolve(filenameFromSnapshotId(snapshotId) + SUFFIX);
}
static String filenameFromSnapshotId(OffsetAndEpoch snapshotId) {
return String.format("%s-%s", OFFSET_FORMATTER.format(snapshotId.offset), EPOCH_FORMATTER.format(snapshotId.epoch));
}
static Path moveRename(Path source, OffsetAndEpoch snapshotId) {
return source.resolveSibling(filenameFromSnapshotId(snapshotId) + SUFFIX);
}
static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws IOException {
Path dir = snapshotDir(logDir);
// Create the snapshot directory if it doesn't exists
Files.createDirectories(dir);
String prefix = String.format("%s-", filenameFromSnapshotId(snapshotId));
return Files.createTempFile(dir, prefix, PARTIAL_SUFFIX);
}
}

View File

@ -26,13 +26,19 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
@ -45,6 +51,7 @@ public class MockLog implements ReplicatedLog {
private final List<EpochStartOffset> epochStartOffsets = new ArrayList<>();
private final List<LogBatch> log = new ArrayList<>();
private final Map<OffsetAndEpoch, MockRawSnapshotReader> snapshots = new HashMap<>();
private final TopicPartition topicPartition;
private long nextId = ID_GENERATOR.getAndIncrement();
@ -351,6 +358,16 @@ public class MockLog implements ReplicatedLog {
epochStartOffsets.add(new EpochStartOffset(epoch, startOffset));
}
@Override
public RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) {
return new MockRawSnapshotWriter(snapshotId);
}
@Override
public Optional<RawSnapshotReader> readSnapshot(OffsetAndEpoch snapshotId) {
return Optional.ofNullable(snapshots.get(snapshotId));
}
static class MockOffsetMetadata implements OffsetMetadata {
final long id;
@ -472,4 +489,94 @@ public class MockLog implements ReplicatedLog {
}
}
final class MockRawSnapshotWriter implements RawSnapshotWriter {
private final OffsetAndEpoch snapshotId;
private ByteBufferOutputStream data;
private boolean frozen;
public MockRawSnapshotWriter(OffsetAndEpoch snapshotId) {
this.snapshotId = snapshotId;
this.data = new ByteBufferOutputStream(0);
this.frozen = false;
}
@Override
public OffsetAndEpoch snapshotId() {
return snapshotId;
}
@Override
public long sizeInBytes() {
return data.position();
}
@Override
public void append(ByteBuffer buffer) {
if (frozen) {
throw new RuntimeException("Snapshot is already frozen " + snapshotId);
}
data.write(buffer);
}
@Override
public boolean isFrozen() {
return frozen;
}
@Override
public void freeze() {
if (frozen) {
throw new RuntimeException("Snapshot is already frozen " + snapshotId);
}
frozen = true;
ByteBuffer buffer = data.buffer();
buffer.flip();
snapshots.putIfAbsent(snapshotId, new MockRawSnapshotReader(snapshotId, buffer));
}
@Override
public void close() {}
}
final static class MockRawSnapshotReader implements RawSnapshotReader {
private final OffsetAndEpoch snapshotId;
private final MemoryRecords data;
MockRawSnapshotReader(OffsetAndEpoch snapshotId, ByteBuffer data) {
this.snapshotId = snapshotId;
this.data = MemoryRecords.readableRecords(data);
}
@Override
public OffsetAndEpoch snapshotId() {
return snapshotId;
}
@Override
public long sizeInBytes() {
return data.sizeInBytes();
}
@Override
public Iterator<RecordBatch> iterator() {
return Utils.covariantCast(data.batchIterator());
}
@Override
public int read(ByteBuffer buffer, long position) {
ByteBuffer copy = data.buffer();
copy.position((int) position);
copy.limit((int) position + Math.min(copy.remaining(), buffer.remaining()));
buffer.put(copy);
return copy.remaining();
}
@Override
public void close() {}
}
}

View File

@ -78,7 +78,7 @@ import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
final class RaftClientTestContext {
public final class RaftClientTestContext {
private static final StringSerde STRING_SERDE = new StringSerde();
final TopicPartition metadataPartition = Builder.METADATA_PARTITION;
@ -91,9 +91,9 @@ final class RaftClientTestContext {
private final QuorumStateStore quorumStateStore;
final int localId;
final KafkaRaftClient<String> client;
public final KafkaRaftClient<String> client;
final Metrics metrics;
final MockLog log;
public final MockLog log;
final MockNetworkChannel channel;
final MockTime time;
final MockListener listener;
@ -124,7 +124,7 @@ final class RaftClientTestContext {
private int appendLingerMs = DEFAULT_APPEND_LINGER_MS;
private MemoryPool memoryPool = MemoryPool.NONE;
Builder(int localId, Set<Integer> voters) {
public Builder(int localId, Set<Integer> voters) {
this.voters = voters;
this.localId = localId;
}
@ -175,7 +175,7 @@ final class RaftClientTestContext {
return this;
}
RaftClientTestContext build() throws IOException {
public RaftClientTestContext build() throws IOException {
Metrics metrics = new Metrics(time);
MockNetworkChannel channel = new MockNetworkChannel();
LogContext logContext = new LogContext();

View File

@ -0,0 +1,289 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.snapshot;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Iterator;
import java.util.stream.IntStream;
import org.apache.kafka.common.record.BufferSupplier.GrowableBufferSupplier;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public final class FileRawSnapshotTest {
@Test
public void testWritingSnapshot() throws IOException {
Path tempDir = TestUtils.tempDirectory().toPath();
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3);
int bufferSize = 256;
int batches = 10;
int expectedSize = 0;
try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(tempDir, offsetAndEpoch)) {
assertEquals(0, snapshot.sizeInBytes());
MemoryRecords records = buildRecords(ByteBuffer.wrap(randomBytes(bufferSize)));
for (int i = 0; i < batches; i++) {
snapshot.append(records.buffer());
expectedSize += records.sizeInBytes();
}
assertEquals(expectedSize, snapshot.sizeInBytes());
snapshot.freeze();
}
// File should exist and the size should be the sum of all the buffers
assertTrue(Files.exists(Snapshots.snapshotPath(tempDir, offsetAndEpoch)));
assertEquals(expectedSize, Files.size(Snapshots.snapshotPath(tempDir, offsetAndEpoch)));
}
@Test
public void testWriteReadSnapshot() throws IOException {
Path tempDir = TestUtils.tempDirectory().toPath();
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3);
int bufferSize = 256;
int batches = 10;
ByteBuffer expectedBuffer = ByteBuffer.wrap(randomBytes(bufferSize));
try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(tempDir, offsetAndEpoch)) {
MemoryRecords records = buildRecords(expectedBuffer);
for (int i = 0; i < batches; i++) {
snapshot.append(records.buffer());
}
snapshot.freeze();
}
try (FileRawSnapshotReader snapshot = FileRawSnapshotReader.open(tempDir, offsetAndEpoch)) {
int countBatches = 0;
int countRecords = 0;
for (RecordBatch batch : snapshot) {
countBatches += 1;
Iterator<Record> records = batch.streamingIterator(new GrowableBufferSupplier());
while (records.hasNext()) {
Record record = records.next();
countRecords += 1;
assertFalse(record.hasKey());
assertTrue(record.hasValue());
assertEquals(bufferSize, record.value().remaining());
assertEquals(expectedBuffer, record.value());
}
}
assertEquals(batches, countBatches);
assertEquals(batches, countRecords);
}
}
@Test
public void testBatchWriteReadSnapshot() throws IOException {
Path tempDir = TestUtils.tempDirectory().toPath();
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3);
int bufferSize = 256;
int batchSize = 3;
int batches = 10;
try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(tempDir, offsetAndEpoch)) {
for (int i = 0; i < batches; i++) {
ByteBuffer[] buffers = IntStream
.range(0, batchSize)
.mapToObj(ignore -> ByteBuffer.wrap(randomBytes(bufferSize))).toArray(ByteBuffer[]::new);
snapshot.append(buildRecords(buffers).buffer());
}
snapshot.freeze();
}
try (FileRawSnapshotReader snapshot = FileRawSnapshotReader.open(tempDir, offsetAndEpoch)) {
int countBatches = 0;
int countRecords = 0;
for (RecordBatch batch : snapshot) {
countBatches += 1;
Iterator<Record> records = batch.streamingIterator(new GrowableBufferSupplier());
while (records.hasNext()) {
Record record = records.next();
countRecords += 1;
assertFalse(record.hasKey());
assertTrue(record.hasValue());
assertEquals(bufferSize, record.value().remaining());
}
}
assertEquals(batches, countBatches);
assertEquals(batches * batchSize, countRecords);
}
}
@Test
public void testBufferWriteReadSnapshot() throws IOException {
Path tempDir = TestUtils.tempDirectory().toPath();
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3);
int bufferSize = 256;
int batchSize = 3;
int batches = 10;
int expectedSize = 0;
try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(tempDir, offsetAndEpoch)) {
for (int i = 0; i < batches; i++) {
ByteBuffer[] buffers = IntStream
.range(0, batchSize)
.mapToObj(ignore -> ByteBuffer.wrap(randomBytes(bufferSize))).toArray(ByteBuffer[]::new);
MemoryRecords records = buildRecords(buffers);
snapshot.append(records.buffer());
expectedSize += records.sizeInBytes();
}
assertEquals(expectedSize, snapshot.sizeInBytes());
snapshot.freeze();
}
// File should exist and the size should be the sum of all the buffers
assertTrue(Files.exists(Snapshots.snapshotPath(tempDir, offsetAndEpoch)));
assertEquals(expectedSize, Files.size(Snapshots.snapshotPath(tempDir, offsetAndEpoch)));
try (FileRawSnapshotReader snapshot = FileRawSnapshotReader.open(tempDir, offsetAndEpoch)) {
int countBatches = 0;
int countRecords = 0;
for (RecordBatch batch : snapshot) {
countBatches += 1;
Iterator<Record> records = batch.streamingIterator(new GrowableBufferSupplier());
while (records.hasNext()) {
Record record = records.next();
countRecords += 1;
assertFalse(record.hasKey());
assertTrue(record.hasValue());
assertEquals(bufferSize, record.value().remaining());
}
}
assertEquals(batches, countBatches);
assertEquals(batches * batchSize, countRecords);
}
}
@Test
public void testAbortedSnapshot() throws IOException {
Path tempDir = TestUtils.tempDirectory().toPath();
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(20L, 2);
int bufferSize = 256;
int batches = 10;
try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(tempDir, offsetAndEpoch)) {
MemoryRecords records = buildRecords(ByteBuffer.wrap(randomBytes(bufferSize)));
for (int i = 0; i < batches; i++) {
snapshot.append(records.buffer());
}
}
// File should not exist since freeze was not called before
assertFalse(Files.exists(Snapshots.snapshotPath(tempDir, offsetAndEpoch)));
assertEquals(0, Files.list(Snapshots.snapshotDir(tempDir)).count());
}
@Test
public void testAppendToFrozenSnapshot() throws IOException {
Path tempDir = TestUtils.tempDirectory().toPath();
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3);
int bufferSize = 256;
int batches = 10;
try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(tempDir, offsetAndEpoch)) {
MemoryRecords records = buildRecords(ByteBuffer.wrap(randomBytes(bufferSize)));
for (int i = 0; i < batches; i++) {
snapshot.append(records.buffer());
}
snapshot.freeze();
assertThrows(RuntimeException.class, () -> snapshot.append(records.buffer()));
}
// File should exist and the size should be greater than the sum of all the buffers
assertTrue(Files.exists(Snapshots.snapshotPath(tempDir, offsetAndEpoch)));
assertTrue(Files.size(Snapshots.snapshotPath(tempDir, offsetAndEpoch)) > bufferSize * batches);
}
@Test
public void testCreateSnapshotWithSameId() throws IOException {
Path tempDir = TestUtils.tempDirectory().toPath();
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(20L, 2);
int bufferSize = 256;
int batches = 1;
try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(tempDir, offsetAndEpoch)) {
MemoryRecords records = buildRecords(ByteBuffer.wrap(randomBytes(bufferSize)));
for (int i = 0; i < batches; i++) {
snapshot.append(records.buffer());
}
snapshot.freeze();
}
// Create another snapshot with the same id
try (FileRawSnapshotWriter snapshot = FileRawSnapshotWriter.create(tempDir, offsetAndEpoch)) {
MemoryRecords records = buildRecords(ByteBuffer.wrap(randomBytes(bufferSize)));
for (int i = 0; i < batches; i++) {
snapshot.append(records.buffer());
}
snapshot.freeze();
}
}
private static byte[] randomBytes(int size) {
byte[] array = new byte[size];
TestUtils.SEEDED_RANDOM.nextBytes(array);
return array;
}
private static MemoryRecords buildRecords(ByteBuffer... buffers) {
return MemoryRecords.withRecords(
CompressionType.NONE,
Arrays.stream(buffers).map(buffer -> new SimpleRecord(buffer)).toArray(SimpleRecord[]::new)
);
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.snapshot;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.kafka.common.record.BufferSupplier.GrowableBufferSupplier;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClientTestContext;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
final public class SnapshotWriterTest {
private final int localId = 0;
private final Set<Integer> voters = Collections.singleton(localId);
@Test
public void testWritingSnapshot() throws IOException {
OffsetAndEpoch id = new OffsetAndEpoch(10L, 3);
List<List<String>> expected = buildRecords(3, 3);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
try (SnapshotWriter<String> snapshot = context.client.createSnapshot(id)) {
expected.forEach(batch -> {
assertDoesNotThrow(() -> snapshot.append(batch));
});
snapshot.freeze();
}
try (RawSnapshotReader reader = context.log.readSnapshot(id).get()) {
assertSnapshot(expected, reader);
}
}
@Test
public void testAbortedSnapshot() throws IOException {
OffsetAndEpoch id = new OffsetAndEpoch(10L, 3);
List<List<String>> expected = buildRecords(3, 3);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
try (SnapshotWriter<String> snapshot = context.client.createSnapshot(id)) {
expected.forEach(batch -> {
assertDoesNotThrow(() -> snapshot.append(batch));
});
}
assertFalse(context.log.readSnapshot(id).isPresent());
}
@Test
public void testAppendToFrozenSnapshot() throws IOException {
OffsetAndEpoch id = new OffsetAndEpoch(10L, 3);
List<List<String>> expected = buildRecords(3, 3);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
try (SnapshotWriter<String> snapshot = context.client.createSnapshot(id)) {
expected.forEach(batch -> {
assertDoesNotThrow(() -> snapshot.append(batch));
});
snapshot.freeze();
assertThrows(RuntimeException.class, () -> snapshot.append(expected.get(0)));
}
}
private List<List<String>> buildRecords(int recordsPerBatch, int batches) {
Random random = new Random(0);
List<List<String>> result = new ArrayList<>(batches);
for (int i = 0; i < batches; i++) {
List<String> batch = new ArrayList<>(recordsPerBatch);
for (int j = 0; j < recordsPerBatch; j++) {
batch.add(String.valueOf(random.nextInt()));
}
result.add(batch);
}
return result;
}
private void assertSnapshot(List<List<String>> batches, RawSnapshotReader reader) {
List<String> expected = new ArrayList<>();
batches.forEach(expected::addAll);
List<String> actual = new ArrayList<>(expected.size());
reader.forEach(batch -> {
batch.streamingIterator(new GrowableBufferSupplier()).forEachRemaining(record -> {
actual.add(Utils.utf8(record.value()));
});
});
assertEquals(expected, actual);
}
}