KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter (#11529)

Change the snapshot API so that SnapshotWriter and SnapshotReader are interfaces. Change the existing types SnapshotWriter and SnapshotReader to use a different name and to implement the interfaces introduced by this commit.

Co-authored-by: loboxu <loboxu@tencent.com>
Reviews: José Armando García Sancio <jsancio@users.noreply.github.com>
This commit is contained in:
loboya~ 2021-12-01 02:44:39 +08:00 committed by GitHub
parent d1e0d2b474
commit 42306ba267
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 377 additions and 284 deletions

View File

@ -21,9 +21,9 @@ import java.util.Collections
import kafka.testkit.KafkaClusterTestKit
import kafka.testkit.TestKitNodes
import kafka.utils.TestUtils
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.BufferSupplier
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.snapshot.SnapshotReader
import org.apache.kafka.snapshot.RecordsSnapshotReader
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotEquals
import org.junit.jupiter.api.Assertions.assertTrue
@ -76,7 +76,7 @@ class RaftClusterSnapshotTest {
// For every controller and broker perform some sanity checks against the lastest snapshot
for ((_, raftManager) <- cluster.raftManagers().asScala) {
TestUtils.resource(
SnapshotReader.of(
RecordsSnapshotReader.of(
raftManager.replicatedLog.latestSnapshot.get(),
new MetadataRecordSerde(),
BufferSupplier.create(),

View File

@ -30,7 +30,7 @@ import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.queue.EventQueue
import org.apache.kafka.raft.OffsetAndEpoch
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.snapshot.{MockRawSnapshotWriter, SnapshotWriter}
import org.apache.kafka.snapshot.{MockRawSnapshotWriter, RecordsSnapshotWriter, SnapshotWriter}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
@ -50,7 +50,7 @@ class BrokerMetadataSnapshotterTest {
committedEpoch: Int,
lastContainedLogTime: Long): SnapshotWriter[ApiMessageAndVersion] = {
val offsetAndEpoch = new OffsetAndEpoch(committedOffset, committedEpoch)
SnapshotWriter.createWithHeader(
RecordsSnapshotWriter.createWithHeader(
() => {
Optional.of(
new MockRawSnapshotWriter(offsetAndEpoch, consumeSnapshotBuffer(committedOffset, committedEpoch))

View File

@ -77,8 +77,9 @@ import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@ -493,7 +494,7 @@ public class QuorumControllerTest {
}
private SnapshotReader<ApiMessageAndVersion> createSnapshotReader(RawSnapshotReader reader) {
return SnapshotReader.of(
return RecordsSnapshotReader.of(
reader,
new MetadataRecordSerde(),
BufferSupplier.create(),

View File

@ -30,9 +30,10 @@ import org.apache.kafka.controller.SnapshotGenerator.Section;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@ -94,7 +95,7 @@ public class SnapshotGeneratorTest {
long committedOffset,
long lastContainedLogTime
) {
return SnapshotWriter.createWithHeader(
return RecordsSnapshotWriter.createWithHeader(
() -> createNewSnapshot(new OffsetAndEpoch(committedOffset + 1, 1)),
1024,
MemoryPool.NONE,

View File

@ -37,8 +37,10 @@ import org.apache.kafka.snapshot.MockRawSnapshotReader;
import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -470,7 +472,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
if (snapshot.isPresent()) {
log.trace("Node {}: handling snapshot with id {}.", nodeId, snapshot.get().snapshotId());
listenerData.handleSnapshot(
SnapshotReader.of(
RecordsSnapshotReader.of(
snapshot.get(),
new MetadataRecordSerde(),
BufferSupplier.create(),
@ -707,7 +709,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
long lastContainedLogTimestamp
) {
OffsetAndEpoch snapshotId = new OffsetAndEpoch(committedOffset + 1, committedEpoch);
return SnapshotWriter.createWithHeader(
return RecordsSnapshotWriter.createWithHeader(
() -> createNewSnapshot(snapshotId),
1024,
MemoryPool.NONE,

View File

@ -72,8 +72,10 @@ import org.apache.kafka.raft.internals.ThresholdPurgatory;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.slf4j.Logger;
import java.util.Collections;
@ -331,7 +333,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private Optional<SnapshotReader<T>> latestSnapshot() {
return log.latestSnapshot().map(reader ->
SnapshotReader.of(reader, serde, BufferSupplier.create(), MAX_BATCH_SIZE_BYTES)
RecordsSnapshotReader.of(reader, serde, BufferSupplier.create(), MAX_BATCH_SIZE_BYTES)
);
}
@ -2347,7 +2349,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
int committedEpoch,
long lastContainedLogTime
) {
return SnapshotWriter.createWithHeader(
return RecordsSnapshotWriter.createWithHeader(
() -> log.createNewSnapshot(new OffsetAndEpoch(committedOffset + 1, committedEpoch)),
MAX_BATCH_SIZE_BYTES,
memoryPool,

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.snapshot;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalLong;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.raft.internals.RecordsIterator;
public final class RecordsSnapshotReader<T> implements SnapshotReader<T> {
private final OffsetAndEpoch snapshotId;
private final RecordsIterator<T> iterator;
private Optional<Batch<T>> nextBatch = Optional.empty();
private OptionalLong lastContainedLogTimestamp = OptionalLong.empty();
private RecordsSnapshotReader(
OffsetAndEpoch snapshotId,
RecordsIterator<T> iterator
) {
this.snapshotId = snapshotId;
this.iterator = iterator;
}
@Override
public OffsetAndEpoch snapshotId() {
return snapshotId;
}
@Override
public long lastContainedLogOffset() {
return snapshotId.offset - 1;
}
@Override
public int lastContainedLogEpoch() {
return snapshotId.epoch;
}
@Override
public long lastContainedLogTimestamp() {
if (!lastContainedLogTimestamp.isPresent()) {
nextBatch.ifPresent(batch -> {
throw new IllegalStateException(
String.format(
"nextBatch was present when last contained log timestamp was not present",
batch
)
);
});
nextBatch = nextBatch();
}
return lastContainedLogTimestamp.getAsLong();
}
@Override
public boolean hasNext() {
if (!nextBatch.isPresent()) {
nextBatch = nextBatch();
}
return nextBatch.isPresent();
}
@Override
public Batch<T> next() {
if (!hasNext()) {
throw new NoSuchElementException("Snapshot reader doesn't have any more elements");
}
Batch<T> batch = nextBatch.get();
nextBatch = Optional.empty();
return batch;
}
@Override
public void close() {
iterator.close();
}
public static <T> RecordsSnapshotReader<T> of(
RawSnapshotReader snapshot,
RecordSerde<T> serde,
BufferSupplier bufferSupplier,
int maxBatchSize
) {
return new RecordsSnapshotReader<>(
snapshot.snapshotId(),
new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, maxBatchSize)
);
}
/**
* Returns the next non-control Batch
*/
private Optional<Batch<T>> nextBatch() {
while (iterator.hasNext()) {
Batch<T> batch = iterator.next();
if (!lastContainedLogTimestamp.isPresent()) {
// The Batch type doesn't support returning control batches. For now lets just use
// the append time of the first batch
lastContainedLogTimestamp = OptionalLong.of(batch.appendTimestamp());
}
if (!batch.records().isEmpty()) {
return Optional.of(batch);
}
}
return Optional.empty();
}
}

View File

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.snapshot;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.record.ControlRecordUtils;
import java.util.Optional;
import java.util.List;
import java.util.function.Supplier;
final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {
final private RawSnapshotWriter snapshot;
final private BatchAccumulator<T> accumulator;
final private Time time;
final private long lastContainedLogTimestamp;
private RecordsSnapshotWriter(
RawSnapshotWriter snapshot,
int maxBatchSize,
MemoryPool memoryPool,
Time time,
long lastContainedLogTimestamp,
CompressionType compressionType,
RecordSerde<T> serde
) {
this.snapshot = snapshot;
this.time = time;
this.lastContainedLogTimestamp = lastContainedLogTimestamp;
this.accumulator = new BatchAccumulator<>(
snapshot.snapshotId().epoch,
0,
Integer.MAX_VALUE,
maxBatchSize,
memoryPool,
time,
compressionType,
serde
);
}
/**
* Adds a {@link SnapshotHeaderRecord} to snapshot
*
* @throws IllegalStateException if the snapshot is not empty
*/
private void initializeSnapshotWithHeader() {
if (snapshot.sizeInBytes() != 0) {
String message = String.format(
"Initializing writer with a non-empty snapshot: id = '%s'.",
snapshot.snapshotId()
);
throw new IllegalStateException(message);
}
SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord()
.setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION)
.setLastContainedLogTimestamp(lastContainedLogTimestamp);
accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds());
accumulator.forceDrain();
}
/**
* Adds a {@link SnapshotFooterRecord} to the snapshot
*
* No more records should be appended to the snapshot after calling this method
*/
private void finalizeSnapshotWithFooter() {
SnapshotFooterRecord footerRecord = new SnapshotFooterRecord()
.setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds());
accumulator.forceDrain();
}
/**
* Create an instance of this class and initialize
* the underlying snapshot with {@link SnapshotHeaderRecord}
*
* @param supplier a lambda to create the low level snapshot writer
* @param maxBatchSize the maximum size in byte for a batch
* @param memoryPool the memory pool for buffer allocation
* @param snapshotTime the clock implementation
* @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot
* @param compressionType the compression algorithm to use
* @param serde the record serialization and deserialization implementation
* @return {@link Optional}{@link RecordsSnapshotWriter}
*/
public static <T> Optional<SnapshotWriter<T>> createWithHeader(
Supplier<Optional<RawSnapshotWriter>> supplier,
int maxBatchSize,
MemoryPool memoryPool,
Time snapshotTime,
long lastContainedLogTimestamp,
CompressionType compressionType,
RecordSerde<T> serde
) {
return supplier.get().map(snapshot -> {
RecordsSnapshotWriter<T> writer = new RecordsSnapshotWriter<>(
snapshot,
maxBatchSize,
memoryPool,
snapshotTime,
lastContainedLogTimestamp,
CompressionType.NONE,
serde);
writer.initializeSnapshotWithHeader();
return writer;
});
}
@Override
public OffsetAndEpoch snapshotId() {
return snapshot.snapshotId();
}
@Override
public long lastContainedLogOffset() {
return snapshot.snapshotId().offset - 1;
}
@Override
public int lastContainedLogEpoch() {
return snapshot.snapshotId().epoch;
}
@Override
public boolean isFrozen() {
return snapshot.isFrozen();
}
@Override
public void append(List<T> records) {
if (snapshot.isFrozen()) {
String message = String.format(
"Append not supported. Snapshot is already frozen: id = '%s'.",
snapshot.snapshotId()
);
throw new IllegalStateException(message);
}
accumulator.append(snapshot.snapshotId().epoch, records);
if (accumulator.needsDrain(time.milliseconds())) {
appendBatches(accumulator.drain());
}
}
@Override
public void freeze() {
finalizeSnapshotWithFooter();
appendBatches(accumulator.drain());
snapshot.freeze();
accumulator.close();
}
@Override
public void close() {
snapshot.close();
accumulator.close();
}
private void appendBatches(List<CompletedBatch<T>> batches) {
try {
for (CompletedBatch<T> batch : batches) {
snapshot.append(batch.data);
}
} finally {
batches.forEach(CompletedBatch::release);
}
}
}

View File

@ -14,19 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.snapshot;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalLong;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.raft.internals.RecordsIterator;
import java.util.Iterator;
/**
* A type for reading an immutable snapshot.
@ -43,119 +36,29 @@ import org.apache.kafka.raft.internals.RecordsIterator;
* offsets and epoch from the log are included in this snapshot. Both of these values are
* inclusive.
*/
public final class SnapshotReader<T> implements AutoCloseable, Iterator<Batch<T>> {
private final OffsetAndEpoch snapshotId;
private final RecordsIterator<T> iterator;
private Optional<Batch<T>> nextBatch = Optional.empty();
private OptionalLong lastContainedLogTimestamp = OptionalLong.empty();
private SnapshotReader(
OffsetAndEpoch snapshotId,
RecordsIterator<T> iterator
) {
this.snapshotId = snapshotId;
this.iterator = iterator;
}
public interface SnapshotReader<T> extends AutoCloseable, Iterator<Batch<T>> {
/**
* Returns the end offset and epoch for the snapshot.
*/
public OffsetAndEpoch snapshotId() {
return snapshotId;
}
OffsetAndEpoch snapshotId();
/**
* Returns the last log offset which is represented in the snapshot.
*/
public long lastContainedLogOffset() {
return snapshotId.offset - 1;
}
long lastContainedLogOffset();
/**
* Returns the epoch of the last log offset which is represented in the snapshot.
*/
public int lastContainedLogEpoch() {
return snapshotId.epoch;
}
int lastContainedLogEpoch();
/**
* Returns the timestamp of the last log offset which is represented in the snapshot.
*/
public long lastContainedLogTimestamp() {
if (!lastContainedLogTimestamp.isPresent()) {
nextBatch.ifPresent(batch -> {
throw new IllegalStateException(
String.format(
"nextBatch was present when last contained log timestamp was not present",
batch
)
);
});
nextBatch = nextBatch();
}
return lastContainedLogTimestamp.getAsLong();
}
@Override
public boolean hasNext() {
if (!nextBatch.isPresent()) {
nextBatch = nextBatch();
}
return nextBatch.isPresent();
}
@Override
public Batch<T> next() {
if (!hasNext()) {
throw new NoSuchElementException("Snapshot reader doesn't have any more elements");
}
Batch<T> batch = nextBatch.get();
nextBatch = Optional.empty();
return batch;
}
long lastContainedLogTimestamp();
/**
* Closes the snapshot reader.
*/
public void close() {
iterator.close();
}
public static <T> SnapshotReader<T> of(
RawSnapshotReader snapshot,
RecordSerde<T> serde,
BufferSupplier bufferSupplier,
int maxBatchSize
) {
return new SnapshotReader<>(
snapshot.snapshotId(),
new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, maxBatchSize)
);
}
/**
* Returns the next non-control Batch
*/
private Optional<Batch<T>> nextBatch() {
while (iterator.hasNext()) {
Batch<T> batch = iterator.next();
if (!lastContainedLogTimestamp.isPresent()) {
// The Batch type doesn't support returning control batches. For now lets just use
// the append time of the first batch
lastContainedLogTimestamp = OptionalLong.of(batch.appendTimestamp());
}
if (!batch.records().isEmpty()) {
return Optional.of(batch);
}
}
return Optional.empty();
}
void close();
}

View File

@ -14,23 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.snapshot;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.record.ControlRecordUtils;
import java.util.Optional;
import java.util.List;
import java.util.function.Supplier;
/**
* A type for writing a snapshot for a given end offset and epoch.
@ -45,135 +34,28 @@ import java.util.function.Supplier;
*
* @see org.apache.kafka.raft.KafkaRaftClient#createSnapshot(long, int, long)
*/
final public class SnapshotWriter<T> implements AutoCloseable {
final private RawSnapshotWriter snapshot;
final private BatchAccumulator<T> accumulator;
final private Time time;
final private long lastContainedLogTimestamp;
private SnapshotWriter(
RawSnapshotWriter snapshot,
int maxBatchSize,
MemoryPool memoryPool,
Time time,
long lastContainedLogTimestamp,
CompressionType compressionType,
RecordSerde<T> serde
) {
this.snapshot = snapshot;
this.time = time;
this.lastContainedLogTimestamp = lastContainedLogTimestamp;
this.accumulator = new BatchAccumulator<>(
snapshot.snapshotId().epoch,
0,
Integer.MAX_VALUE,
maxBatchSize,
memoryPool,
time,
compressionType,
serde
);
}
/**
* Adds a {@link SnapshotHeaderRecord} to snapshot
*
* @throws IllegalStateException if the snapshot is not empty
*/
private void initializeSnapshotWithHeader() {
if (snapshot.sizeInBytes() != 0) {
String message = String.format(
"Initializing writer with a non-empty snapshot: id = '%s'.",
snapshot.snapshotId()
);
throw new IllegalStateException(message);
}
SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord()
.setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION)
.setLastContainedLogTimestamp(lastContainedLogTimestamp);
accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds());
accumulator.forceDrain();
}
/**
* Adds a {@link SnapshotFooterRecord} to the snapshot
*
* No more records should be appended to the snapshot after calling this method
*/
private void finalizeSnapshotWithFooter() {
SnapshotFooterRecord footerRecord = new SnapshotFooterRecord()
.setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds());
accumulator.forceDrain();
}
/**
* Create an instance of this class and initialize
* the underlying snapshot with {@link SnapshotHeaderRecord}
*
* @param supplier a lambda to create the low level snapshot writer
* @param maxBatchSize the maximum size in byte for a batch
* @param memoryPool the memory pool for buffer allocation
* @param snapshotTime the clock implementation
* @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot
* @param compressionType the compression algorithm to use
* @param serde the record serialization and deserialization implementation
* @return {@link Optional}{@link SnapshotWriter}
*/
public static <T> Optional<SnapshotWriter<T>> createWithHeader(
Supplier<Optional<RawSnapshotWriter>> supplier,
int maxBatchSize,
MemoryPool memoryPool,
Time snapshotTime,
long lastContainedLogTimestamp,
CompressionType compressionType,
RecordSerde<T> serde
) {
Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> {
return new SnapshotWriter<T>(
snapshot,
maxBatchSize,
memoryPool,
snapshotTime,
lastContainedLogTimestamp,
CompressionType.NONE,
serde);
});
writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader);
return writer;
}
public interface SnapshotWriter<T> extends AutoCloseable {
/**
* Returns the end offset and epoch for the snapshot.
*/
public OffsetAndEpoch snapshotId() {
return snapshot.snapshotId();
}
OffsetAndEpoch snapshotId();
/**
* Returns the last log offset which is represented in the snapshot.
*/
public long lastContainedLogOffset() {
return snapshot.snapshotId().offset - 1;
}
long lastContainedLogOffset();
/**
* Returns the epoch of the last log offset which is represented in the snapshot.
*/
public int lastContainedLogEpoch() {
return snapshot.snapshotId().epoch;
}
int lastContainedLogEpoch();
/**
* Returns true if the snapshot has been frozen, otherwise false is returned.
*
* Modification to the snapshot are not allowed once it is frozen.
*/
public boolean isFrozen() {
return snapshot.isFrozen();
}
boolean isFrozen();
/**
* Appends a list of values to the snapshot.
@ -183,52 +65,20 @@ final public class SnapshotWriter<T> implements AutoCloseable {
* @param records the list of records to append to the snapshot
* @throws IllegalStateException if append is called when isFrozen is true
*/
public void append(List<T> records) {
if (snapshot.isFrozen()) {
String message = String.format(
"Append not supported. Snapshot is already frozen: id = '%s'.",
snapshot.snapshotId()
);
throw new IllegalStateException(message);
}
accumulator.append(snapshot.snapshotId().epoch, records);
if (accumulator.needsDrain(time.milliseconds())) {
appendBatches(accumulator.drain());
}
}
void append(List<T> records);
/**
* Freezes the snapshot by flushing all pending writes and marking it as immutable.
*
* Also adds a {@link SnapshotFooterRecord} to the end of the snapshot
*/
public void freeze() {
finalizeSnapshotWithFooter();
appendBatches(accumulator.drain());
snapshot.freeze();
accumulator.close();
}
void freeze();
/**
* Closes the snapshot writer.
*
* If close is called without first calling freeze the snapshot is aborted.
*/
public void close() {
snapshot.close();
accumulator.close();
}
void close();
private void appendBatches(List<CompletedBatch<T>> batches) {
try {
for (CompletedBatch<T> batch : batches) {
snapshot.append(batch.data);
}
} finally {
batches.forEach(CompletedBatch::release);
}
}
}

View File

@ -32,8 +32,9 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.internals.StringSerde;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotWriterReaderTest;
import org.junit.jupiter.api.Test;
@ -1719,7 +1720,7 @@ final public class KafkaRaftClientSnapshotTest {
}
private static SnapshotWriter<String> snapshotWriter(RaftClientTestContext context, RawSnapshotWriter snapshot) {
return SnapshotWriter.createWithHeader(
return RecordsSnapshotWriter.createWithHeader(
() -> Optional.of(snapshot),
4 * 1024,
MemoryPool.NONE,

View File

@ -57,8 +57,8 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.internals.BatchBuilder;
import org.apache.kafka.raft.internals.StringSerde;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;

View File

@ -39,6 +39,7 @@ import org.apache.kafka.raft.MockLog.LogEntry;
import org.apache.kafka.raft.internals.BatchMemoryPool;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@ -1111,7 +1112,7 @@ public class RaftEventSimulationTest {
startOffset.set(snapshotId.offset);
try (SnapshotReader<Integer> snapshot =
SnapshotReader.of(log.readSnapshot(snapshotId).get(), node.intSerde, BufferSupplier.create(), Integer.MAX_VALUE)) {
RecordsSnapshotReader.of(log.readSnapshot(snapshotId).get(), node.intSerde, BufferSupplier.create(), Integer.MAX_VALUE)) {
// Expect only one batch with only one record
assertTrue(snapshot.hasNext());
Batch<Integer> batch = snapshot.next();

View File

@ -188,7 +188,7 @@ final public class SnapshotWriterReaderTest {
OffsetAndEpoch snapshotId,
int maxBatchSize
) {
return SnapshotReader.of(
return RecordsSnapshotReader.of(
context.log.readSnapshot(snapshotId).get(),
context.serde,
BufferSupplier.create(),
@ -246,7 +246,7 @@ final public class SnapshotWriterReaderTest {
public static void assertSnapshot(List<List<String>> batches, RawSnapshotReader reader) {
assertSnapshot(
batches,
SnapshotReader.of(reader, new StringSerde(), BufferSupplier.create(), Integer.MAX_VALUE)
RecordsSnapshotReader.of(reader, new StringSerde(), BufferSupplier.create(), Integer.MAX_VALUE)
);
}