diff --git a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala index 6cefccd7b52..e34a5ed6edb 100644 --- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala +++ b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala @@ -21,9 +21,9 @@ import java.util.Collections import kafka.testkit.KafkaClusterTestKit import kafka.testkit.TestKitNodes import kafka.utils.TestUtils -import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.BufferSupplier import org.apache.kafka.metadata.MetadataRecordSerde -import org.apache.kafka.snapshot.SnapshotReader +import org.apache.kafka.snapshot.RecordsSnapshotReader import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNotEquals import org.junit.jupiter.api.Assertions.assertTrue @@ -76,7 +76,7 @@ class RaftClusterSnapshotTest { // For every controller and broker perform some sanity checks against the lastest snapshot for ((_, raftManager) <- cluster.raftManagers().asScala) { TestUtils.resource( - SnapshotReader.of( + RecordsSnapshotReader.of( raftManager.replicatedLog.latestSnapshot.get(), new MetadataRecordSerde(), BufferSupplier.create(), diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala index 888fec5ad25..82426611425 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala @@ -30,7 +30,7 @@ import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.queue.EventQueue import org.apache.kafka.raft.OffsetAndEpoch import org.apache.kafka.server.common.ApiMessageAndVersion -import org.apache.kafka.snapshot.{MockRawSnapshotWriter, SnapshotWriter} +import org.apache.kafka.snapshot.{MockRawSnapshotWriter, RecordsSnapshotWriter, SnapshotWriter} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Test @@ -50,7 +50,7 @@ class BrokerMetadataSnapshotterTest { committedEpoch: Int, lastContainedLogTime: Long): SnapshotWriter[ApiMessageAndVersion] = { val offsetAndEpoch = new OffsetAndEpoch(committedOffset, committedEpoch) - SnapshotWriter.createWithHeader( + RecordsSnapshotWriter.createWithHeader( () => { Optional.of( new MockRawSnapshotWriter(offsetAndEpoch, consumeSnapshotBuffer(committedOffset, committedEpoch)) diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index c274de6ba77..a102cf60880 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -77,8 +77,9 @@ import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.metalog.LocalLogManagerTestEnv; import org.apache.kafka.raft.Batch; import org.apache.kafka.server.common.ApiMessageAndVersion; -import org.apache.kafka.snapshot.RawSnapshotReader; import org.apache.kafka.snapshot.SnapshotReader; +import org.apache.kafka.snapshot.RawSnapshotReader; +import org.apache.kafka.snapshot.RecordsSnapshotReader; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -493,7 +494,7 @@ public class QuorumControllerTest { } private SnapshotReader createSnapshotReader(RawSnapshotReader reader) { - return SnapshotReader.of( + return RecordsSnapshotReader.of( reader, new MetadataRecordSerde(), BufferSupplier.create(), diff --git a/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java b/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java index a7ac1196c9f..f6c836c35a5 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java @@ -30,9 +30,10 @@ import org.apache.kafka.controller.SnapshotGenerator.Section; import org.apache.kafka.metadata.MetadataRecordSerde; import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.snapshot.SnapshotWriter; import org.apache.kafka.snapshot.MockRawSnapshotWriter; import org.apache.kafka.snapshot.RawSnapshotWriter; -import org.apache.kafka.snapshot.SnapshotWriter; +import org.apache.kafka.snapshot.RecordsSnapshotWriter; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -94,7 +95,7 @@ public class SnapshotGeneratorTest { long committedOffset, long lastContainedLogTime ) { - return SnapshotWriter.createWithHeader( + return RecordsSnapshotWriter.createWithHeader( () -> createNewSnapshot(new OffsetAndEpoch(committedOffset + 1, 1)), 1024, MemoryPool.NONE, diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java index 531ecc8fa46..855fd468cba 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java @@ -37,8 +37,10 @@ import org.apache.kafka.snapshot.MockRawSnapshotReader; import org.apache.kafka.snapshot.MockRawSnapshotWriter; import org.apache.kafka.snapshot.RawSnapshotReader; import org.apache.kafka.snapshot.RawSnapshotWriter; -import org.apache.kafka.snapshot.SnapshotReader; +import org.apache.kafka.snapshot.RecordsSnapshotReader; +import org.apache.kafka.snapshot.RecordsSnapshotWriter; import org.apache.kafka.snapshot.SnapshotWriter; +import org.apache.kafka.snapshot.SnapshotReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -470,7 +472,7 @@ public final class LocalLogManager implements RaftClient, if (snapshot.isPresent()) { log.trace("Node {}: handling snapshot with id {}.", nodeId, snapshot.get().snapshotId()); listenerData.handleSnapshot( - SnapshotReader.of( + RecordsSnapshotReader.of( snapshot.get(), new MetadataRecordSerde(), BufferSupplier.create(), @@ -707,7 +709,7 @@ public final class LocalLogManager implements RaftClient, long lastContainedLogTimestamp ) { OffsetAndEpoch snapshotId = new OffsetAndEpoch(committedOffset + 1, committedEpoch); - return SnapshotWriter.createWithHeader( + return RecordsSnapshotWriter.createWithHeader( () -> createNewSnapshot(snapshotId), 1024, MemoryPool.NONE, diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 180573d738b..9a4d414dc22 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -72,8 +72,10 @@ import org.apache.kafka.raft.internals.ThresholdPurgatory; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.RawSnapshotReader; import org.apache.kafka.snapshot.RawSnapshotWriter; -import org.apache.kafka.snapshot.SnapshotReader; +import org.apache.kafka.snapshot.RecordsSnapshotReader; +import org.apache.kafka.snapshot.RecordsSnapshotWriter; import org.apache.kafka.snapshot.SnapshotWriter; +import org.apache.kafka.snapshot.SnapshotReader; import org.slf4j.Logger; import java.util.Collections; @@ -331,7 +333,7 @@ public class KafkaRaftClient implements RaftClient { private Optional> latestSnapshot() { return log.latestSnapshot().map(reader -> - SnapshotReader.of(reader, serde, BufferSupplier.create(), MAX_BATCH_SIZE_BYTES) + RecordsSnapshotReader.of(reader, serde, BufferSupplier.create(), MAX_BATCH_SIZE_BYTES) ); } @@ -2347,7 +2349,7 @@ public class KafkaRaftClient implements RaftClient { int committedEpoch, long lastContainedLogTime ) { - return SnapshotWriter.createWithHeader( + return RecordsSnapshotWriter.createWithHeader( () -> log.createNewSnapshot(new OffsetAndEpoch(committedOffset + 1, committedEpoch)), MAX_BATCH_SIZE_BYTES, memoryPool, diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java new file mode 100644 index 00000000000..89ad2632229 --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.OptionalLong; + +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.RecordsIterator; + +public final class RecordsSnapshotReader implements SnapshotReader { + private final OffsetAndEpoch snapshotId; + private final RecordsIterator iterator; + + private Optional> nextBatch = Optional.empty(); + private OptionalLong lastContainedLogTimestamp = OptionalLong.empty(); + + private RecordsSnapshotReader( + OffsetAndEpoch snapshotId, + RecordsIterator iterator + ) { + this.snapshotId = snapshotId; + this.iterator = iterator; + } + + @Override + public OffsetAndEpoch snapshotId() { + return snapshotId; + } + + @Override + public long lastContainedLogOffset() { + return snapshotId.offset - 1; + } + + @Override + public int lastContainedLogEpoch() { + return snapshotId.epoch; + } + + @Override + public long lastContainedLogTimestamp() { + if (!lastContainedLogTimestamp.isPresent()) { + nextBatch.ifPresent(batch -> { + throw new IllegalStateException( + String.format( + "nextBatch was present when last contained log timestamp was not present", + batch + ) + ); + }); + nextBatch = nextBatch(); + } + + return lastContainedLogTimestamp.getAsLong(); + } + + @Override + public boolean hasNext() { + if (!nextBatch.isPresent()) { + nextBatch = nextBatch(); + } + + return nextBatch.isPresent(); + } + + @Override + public Batch next() { + if (!hasNext()) { + throw new NoSuchElementException("Snapshot reader doesn't have any more elements"); + } + + Batch batch = nextBatch.get(); + nextBatch = Optional.empty(); + + return batch; + } + + @Override + public void close() { + iterator.close(); + } + + public static RecordsSnapshotReader of( + RawSnapshotReader snapshot, + RecordSerde serde, + BufferSupplier bufferSupplier, + int maxBatchSize + ) { + return new RecordsSnapshotReader<>( + snapshot.snapshotId(), + new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, maxBatchSize) + ); + } + + /** + * Returns the next non-control Batch + */ + private Optional> nextBatch() { + while (iterator.hasNext()) { + Batch batch = iterator.next(); + + if (!lastContainedLogTimestamp.isPresent()) { + // The Batch type doesn't support returning control batches. For now lets just use + // the append time of the first batch + lastContainedLogTimestamp = OptionalLong.of(batch.appendTimestamp()); + } + + if (!batch.records().isEmpty()) { + return Optional.of(batch); + } + } + + return Optional.empty(); + } +} diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java new file mode 100644 index 00000000000..a0246b97efa --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.BatchAccumulator; +import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch; +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.record.ControlRecordUtils; + +import java.util.Optional; +import java.util.List; +import java.util.function.Supplier; + +final public class RecordsSnapshotWriter implements SnapshotWriter { + final private RawSnapshotWriter snapshot; + final private BatchAccumulator accumulator; + final private Time time; + final private long lastContainedLogTimestamp; + + private RecordsSnapshotWriter( + RawSnapshotWriter snapshot, + int maxBatchSize, + MemoryPool memoryPool, + Time time, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde serde + ) { + this.snapshot = snapshot; + this.time = time; + this.lastContainedLogTimestamp = lastContainedLogTimestamp; + + this.accumulator = new BatchAccumulator<>( + snapshot.snapshotId().epoch, + 0, + Integer.MAX_VALUE, + maxBatchSize, + memoryPool, + time, + compressionType, + serde + ); + } + + /** + * Adds a {@link SnapshotHeaderRecord} to snapshot + * + * @throws IllegalStateException if the snapshot is not empty + */ + private void initializeSnapshotWithHeader() { + if (snapshot.sizeInBytes() != 0) { + String message = String.format( + "Initializing writer with a non-empty snapshot: id = '%s'.", + snapshot.snapshotId() + ); + throw new IllegalStateException(message); + } + + SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION) + .setLastContainedLogTimestamp(lastContainedLogTimestamp); + accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Adds a {@link SnapshotFooterRecord} to the snapshot + * + * No more records should be appended to the snapshot after calling this method + */ + private void finalizeSnapshotWithFooter() { + SnapshotFooterRecord footerRecord = new SnapshotFooterRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION); + accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Create an instance of this class and initialize + * the underlying snapshot with {@link SnapshotHeaderRecord} + * + * @param supplier a lambda to create the low level snapshot writer + * @param maxBatchSize the maximum size in byte for a batch + * @param memoryPool the memory pool for buffer allocation + * @param snapshotTime the clock implementation + * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot + * @param compressionType the compression algorithm to use + * @param serde the record serialization and deserialization implementation + * @return {@link Optional}{@link RecordsSnapshotWriter} + */ + public static Optional> createWithHeader( + Supplier> supplier, + int maxBatchSize, + MemoryPool memoryPool, + Time snapshotTime, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde serde + ) { + return supplier.get().map(snapshot -> { + RecordsSnapshotWriter writer = new RecordsSnapshotWriter<>( + snapshot, + maxBatchSize, + memoryPool, + snapshotTime, + lastContainedLogTimestamp, + CompressionType.NONE, + serde); + writer.initializeSnapshotWithHeader(); + + return writer; + }); + } + + @Override + public OffsetAndEpoch snapshotId() { + return snapshot.snapshotId(); + } + + @Override + public long lastContainedLogOffset() { + return snapshot.snapshotId().offset - 1; + } + + @Override + public int lastContainedLogEpoch() { + return snapshot.snapshotId().epoch; + } + + @Override + public boolean isFrozen() { + return snapshot.isFrozen(); + } + + @Override + public void append(List records) { + if (snapshot.isFrozen()) { + String message = String.format( + "Append not supported. Snapshot is already frozen: id = '%s'.", + snapshot.snapshotId() + ); + + throw new IllegalStateException(message); + } + + accumulator.append(snapshot.snapshotId().epoch, records); + + if (accumulator.needsDrain(time.milliseconds())) { + appendBatches(accumulator.drain()); + } + } + + @Override + public void freeze() { + finalizeSnapshotWithFooter(); + appendBatches(accumulator.drain()); + snapshot.freeze(); + accumulator.close(); + } + + @Override + public void close() { + snapshot.close(); + accumulator.close(); + } + + private void appendBatches(List> batches) { + try { + for (CompletedBatch batch : batches) { + snapshot.append(batch.data); + } + } finally { + batches.forEach(CompletedBatch::release); + } + } +} diff --git a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java index 8c6e8e618ea..c41444620c1 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java @@ -14,19 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.snapshot; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.Optional; -import java.util.OptionalLong; - -import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.raft.Batch; import org.apache.kafka.raft.OffsetAndEpoch; -import org.apache.kafka.server.common.serialization.RecordSerde; -import org.apache.kafka.raft.internals.RecordsIterator; + +import java.util.Iterator; /** * A type for reading an immutable snapshot. @@ -43,119 +36,29 @@ import org.apache.kafka.raft.internals.RecordsIterator; * offsets and epoch from the log are included in this snapshot. Both of these values are * inclusive. */ -public final class SnapshotReader implements AutoCloseable, Iterator> { - private final OffsetAndEpoch snapshotId; - private final RecordsIterator iterator; - - private Optional> nextBatch = Optional.empty(); - private OptionalLong lastContainedLogTimestamp = OptionalLong.empty(); - - private SnapshotReader( - OffsetAndEpoch snapshotId, - RecordsIterator iterator - ) { - this.snapshotId = snapshotId; - this.iterator = iterator; - } - +public interface SnapshotReader extends AutoCloseable, Iterator> { /** * Returns the end offset and epoch for the snapshot. */ - public OffsetAndEpoch snapshotId() { - return snapshotId; - } + OffsetAndEpoch snapshotId(); /** * Returns the last log offset which is represented in the snapshot. */ - public long lastContainedLogOffset() { - return snapshotId.offset - 1; - } + long lastContainedLogOffset(); /** * Returns the epoch of the last log offset which is represented in the snapshot. */ - public int lastContainedLogEpoch() { - return snapshotId.epoch; - } + int lastContainedLogEpoch(); /** * Returns the timestamp of the last log offset which is represented in the snapshot. */ - public long lastContainedLogTimestamp() { - if (!lastContainedLogTimestamp.isPresent()) { - nextBatch.ifPresent(batch -> { - throw new IllegalStateException( - String.format( - "nextBatch was present when last contained log timestamp was not present", - batch - ) - ); - }); - nextBatch = nextBatch(); - } - - return lastContainedLogTimestamp.getAsLong(); - } - - @Override - public boolean hasNext() { - if (!nextBatch.isPresent()) { - nextBatch = nextBatch(); - } - - return nextBatch.isPresent(); - } - - @Override - public Batch next() { - if (!hasNext()) { - throw new NoSuchElementException("Snapshot reader doesn't have any more elements"); - } - - Batch batch = nextBatch.get(); - nextBatch = Optional.empty(); - - return batch; - } + long lastContainedLogTimestamp(); /** * Closes the snapshot reader. */ - public void close() { - iterator.close(); - } - - public static SnapshotReader of( - RawSnapshotReader snapshot, - RecordSerde serde, - BufferSupplier bufferSupplier, - int maxBatchSize - ) { - return new SnapshotReader<>( - snapshot.snapshotId(), - new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, maxBatchSize) - ); - } - - /** - * Returns the next non-control Batch - */ - private Optional> nextBatch() { - while (iterator.hasNext()) { - Batch batch = iterator.next(); - - if (!lastContainedLogTimestamp.isPresent()) { - // The Batch type doesn't support returning control batches. For now lets just use - // the append time of the first batch - lastContainedLogTimestamp = OptionalLong.of(batch.appendTimestamp()); - } - - if (!batch.records().isEmpty()) { - return Optional.of(batch); - } - } - - return Optional.empty(); - } + void close(); } diff --git a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java index d905528b3c4..77b29d94498 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java @@ -14,23 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.snapshot; -import org.apache.kafka.common.memory.MemoryPool; -import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.raft.OffsetAndEpoch; -import org.apache.kafka.server.common.serialization.RecordSerde; -import org.apache.kafka.raft.internals.BatchAccumulator; -import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch; -import org.apache.kafka.common.message.SnapshotHeaderRecord; import org.apache.kafka.common.message.SnapshotFooterRecord; -import org.apache.kafka.common.record.ControlRecordUtils; -import java.util.Optional; import java.util.List; -import java.util.function.Supplier; /** * A type for writing a snapshot for a given end offset and epoch. @@ -45,135 +34,28 @@ import java.util.function.Supplier; * * @see org.apache.kafka.raft.KafkaRaftClient#createSnapshot(long, int, long) */ -final public class SnapshotWriter implements AutoCloseable { - final private RawSnapshotWriter snapshot; - final private BatchAccumulator accumulator; - final private Time time; - final private long lastContainedLogTimestamp; - - private SnapshotWriter( - RawSnapshotWriter snapshot, - int maxBatchSize, - MemoryPool memoryPool, - Time time, - long lastContainedLogTimestamp, - CompressionType compressionType, - RecordSerde serde - ) { - this.snapshot = snapshot; - this.time = time; - this.lastContainedLogTimestamp = lastContainedLogTimestamp; - - this.accumulator = new BatchAccumulator<>( - snapshot.snapshotId().epoch, - 0, - Integer.MAX_VALUE, - maxBatchSize, - memoryPool, - time, - compressionType, - serde - ); - } - - /** - * Adds a {@link SnapshotHeaderRecord} to snapshot - * - * @throws IllegalStateException if the snapshot is not empty - */ - private void initializeSnapshotWithHeader() { - if (snapshot.sizeInBytes() != 0) { - String message = String.format( - "Initializing writer with a non-empty snapshot: id = '%s'.", - snapshot.snapshotId() - ); - throw new IllegalStateException(message); - } - - SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord() - .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION) - .setLastContainedLogTimestamp(lastContainedLogTimestamp); - accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds()); - accumulator.forceDrain(); - } - - /** - * Adds a {@link SnapshotFooterRecord} to the snapshot - * - * No more records should be appended to the snapshot after calling this method - */ - private void finalizeSnapshotWithFooter() { - SnapshotFooterRecord footerRecord = new SnapshotFooterRecord() - .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION); - accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds()); - accumulator.forceDrain(); - } - - /** - * Create an instance of this class and initialize - * the underlying snapshot with {@link SnapshotHeaderRecord} - * - * @param supplier a lambda to create the low level snapshot writer - * @param maxBatchSize the maximum size in byte for a batch - * @param memoryPool the memory pool for buffer allocation - * @param snapshotTime the clock implementation - * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot - * @param compressionType the compression algorithm to use - * @param serde the record serialization and deserialization implementation - * @return {@link Optional}{@link SnapshotWriter} - */ - public static Optional> createWithHeader( - Supplier> supplier, - int maxBatchSize, - MemoryPool memoryPool, - Time snapshotTime, - long lastContainedLogTimestamp, - CompressionType compressionType, - RecordSerde serde - ) { - Optional> writer = supplier.get().map(snapshot -> { - return new SnapshotWriter( - snapshot, - maxBatchSize, - memoryPool, - snapshotTime, - lastContainedLogTimestamp, - CompressionType.NONE, - serde); - }); - writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader); - return writer; - } - +public interface SnapshotWriter extends AutoCloseable { /** * Returns the end offset and epoch for the snapshot. */ - public OffsetAndEpoch snapshotId() { - return snapshot.snapshotId(); - } + OffsetAndEpoch snapshotId(); /** * Returns the last log offset which is represented in the snapshot. */ - public long lastContainedLogOffset() { - return snapshot.snapshotId().offset - 1; - } + long lastContainedLogOffset(); /** * Returns the epoch of the last log offset which is represented in the snapshot. */ - public int lastContainedLogEpoch() { - return snapshot.snapshotId().epoch; - } + int lastContainedLogEpoch(); /** * Returns true if the snapshot has been frozen, otherwise false is returned. * * Modification to the snapshot are not allowed once it is frozen. */ - public boolean isFrozen() { - return snapshot.isFrozen(); - } + boolean isFrozen(); /** * Appends a list of values to the snapshot. @@ -183,52 +65,20 @@ final public class SnapshotWriter implements AutoCloseable { * @param records the list of records to append to the snapshot * @throws IllegalStateException if append is called when isFrozen is true */ - public void append(List records) { - if (snapshot.isFrozen()) { - String message = String.format( - "Append not supported. Snapshot is already frozen: id = '%s'.", - snapshot.snapshotId() - ); - - throw new IllegalStateException(message); - } - - accumulator.append(snapshot.snapshotId().epoch, records); - - if (accumulator.needsDrain(time.milliseconds())) { - appendBatches(accumulator.drain()); - } - } + void append(List records); /** * Freezes the snapshot by flushing all pending writes and marking it as immutable. * * Also adds a {@link SnapshotFooterRecord} to the end of the snapshot */ - public void freeze() { - finalizeSnapshotWithFooter(); - appendBatches(accumulator.drain()); - snapshot.freeze(); - accumulator.close(); - } + void freeze(); /** * Closes the snapshot writer. * * If close is called without first calling freeze the snapshot is aborted. */ - public void close() { - snapshot.close(); - accumulator.close(); - } + void close(); - private void appendBatches(List> batches) { - try { - for (CompletedBatch batch : batches) { - snapshot.append(batch.data); - } - } finally { - batches.forEach(CompletedBatch::release); - } - } } diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index 42db40c7eff..754c2e2c55e 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -32,8 +32,9 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.internals.StringSerde; import org.apache.kafka.snapshot.RawSnapshotReader; import org.apache.kafka.snapshot.RawSnapshotWriter; -import org.apache.kafka.snapshot.SnapshotReader; import org.apache.kafka.snapshot.SnapshotWriter; +import org.apache.kafka.snapshot.SnapshotReader; +import org.apache.kafka.snapshot.RecordsSnapshotWriter; import org.apache.kafka.snapshot.SnapshotWriterReaderTest; import org.junit.jupiter.api.Test; @@ -1719,7 +1720,7 @@ final public class KafkaRaftClientSnapshotTest { } private static SnapshotWriter snapshotWriter(RaftClientTestContext context, RawSnapshotWriter snapshot) { - return SnapshotWriter.createWithHeader( + return RecordsSnapshotWriter.createWithHeader( () -> Optional.of(snapshot), 4 * 1024, MemoryPool.NONE, diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 00c351cc26a..d48e41fb31d 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -57,8 +57,8 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.internals.BatchBuilder; import org.apache.kafka.raft.internals.StringSerde; import org.apache.kafka.server.common.serialization.RecordSerde; -import org.apache.kafka.snapshot.RawSnapshotWriter; import org.apache.kafka.snapshot.SnapshotReader; +import org.apache.kafka.snapshot.RawSnapshotWriter; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index 120eca31e43..4f79dc18cc6 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.raft.MockLog.LogEntry; import org.apache.kafka.raft.internals.BatchMemoryPool; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.SnapshotReader; +import org.apache.kafka.snapshot.RecordsSnapshotReader; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -1111,7 +1112,7 @@ public class RaftEventSimulationTest { startOffset.set(snapshotId.offset); try (SnapshotReader snapshot = - SnapshotReader.of(log.readSnapshot(snapshotId).get(), node.intSerde, BufferSupplier.create(), Integer.MAX_VALUE)) { + RecordsSnapshotReader.of(log.readSnapshot(snapshotId).get(), node.intSerde, BufferSupplier.create(), Integer.MAX_VALUE)) { // Expect only one batch with only one record assertTrue(snapshot.hasNext()); Batch batch = snapshot.next(); diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java index c0f80c53ee2..d251e363594 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java @@ -188,7 +188,7 @@ final public class SnapshotWriterReaderTest { OffsetAndEpoch snapshotId, int maxBatchSize ) { - return SnapshotReader.of( + return RecordsSnapshotReader.of( context.log.readSnapshot(snapshotId).get(), context.serde, BufferSupplier.create(), @@ -246,7 +246,7 @@ final public class SnapshotWriterReaderTest { public static void assertSnapshot(List> batches, RawSnapshotReader reader) { assertSnapshot( batches, - SnapshotReader.of(reader, new StringSerde(), BufferSupplier.create(), Integer.MAX_VALUE) + RecordsSnapshotReader.of(reader, new StringSerde(), BufferSupplier.create(), Integer.MAX_VALUE) ); }