From 672dd3ab6aea413eaa8170236f351a0f2a35a89c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Fri, 7 Apr 2023 09:25:54 -0700 Subject: [PATCH] KAFKA-13020; Implement reading Snapshot log append timestamp (#13345) The SnapshotReader exposes the "last contained log time". This is mainly used during snapshot cleanup. The previous implementation used the append time of the snapshot record. This is not accurate as this is the time when the snapshot was created and not the log append time of the last record included in the snapshot. The log append time of the last record included in the snapshot is store in the header control record of the snapshot. The header control record is the first record of the snapshot. To be able to read this record, this change extends the RecordsIterator to decode and expose the control records in the Records type. Reviewers: Colin Patrick McCabe --- checkstyle/import-control.xml | 1 + .../common/record/ControlRecordUtils.java | 20 +-- .../scala/kafka/raft/KafkaMetadataLog.scala | 34 +++-- .../main/scala/kafka/raft/RaftManager.scala | 1 + .../scala/kafka/tools/DumpLogSegments.scala | 4 +- .../kafka/raft/KafkaMetadataLogTest.scala | 1 + .../kafka/tools/DumpLogSegmentsTest.scala | 5 +- .../image/loader/MetadataLoaderTest.java | 48 ++++-- .../java/org/apache/kafka/raft/Batch.java | 39 ++++- .../org/apache/kafka/raft/ControlRecord.java | 104 +++++++++++++ .../kafka/raft/internals/RecordsIterator.java | 141 +++++++++++++----- .../kafka/snapshot/RecordsSnapshotReader.java | 24 ++- .../kafka/snapshot/RecordsSnapshotWriter.java | 2 +- .../org/apache/kafka/ControlRecordTest.java | 59 ++++++++ .../raft/KafkaRaftClientSnapshotTest.java | 4 +- .../raft/internals/RecordsIteratorTest.java | 91 +++++++++-- .../snapshot/SnapshotWriterReaderTest.java | 6 +- 17 files changed, 482 insertions(+), 102 deletions(-) create mode 100644 raft/src/main/java/org/apache/kafka/raft/ControlRecord.java create mode 100644 raft/src/test/java/org/apache/kafka/ControlRecordTest.java diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 18189e3eaad..23791d527f6 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -251,6 +251,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java index a9407217605..3b1fd21f787 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java @@ -37,39 +37,39 @@ public class ControlRecordUtils { throw new IllegalArgumentException( "Expected LEADER_CHANGE control record type(2), but found " + recordType.toString()); } - return deserializeLeaderChangeMessage(record.value().duplicate()); + return deserializeLeaderChangeMessage(record.value()); } public static LeaderChangeMessage deserializeLeaderChangeMessage(ByteBuffer data) { - ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.duplicate()); + ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice()); return new LeaderChangeMessage(byteBufferAccessor, LEADER_CHANGE_CURRENT_VERSION); } - public static SnapshotHeaderRecord deserializedSnapshotHeaderRecord(Record record) { + public static SnapshotHeaderRecord deserializeSnapshotHeaderRecord(Record record) { ControlRecordType recordType = ControlRecordType.parse(record.key()); if (recordType != ControlRecordType.SNAPSHOT_HEADER) { throw new IllegalArgumentException( "Expected SNAPSHOT_HEADER control record type(3), but found " + recordType.toString()); } - return deserializedSnapshotHeaderRecord(record.value().duplicate()); + return deserializeSnapshotHeaderRecord(record.value()); } - public static SnapshotHeaderRecord deserializedSnapshotHeaderRecord(ByteBuffer data) { - ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.duplicate()); + public static SnapshotHeaderRecord deserializeSnapshotHeaderRecord(ByteBuffer data) { + ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice()); return new SnapshotHeaderRecord(byteBufferAccessor, SNAPSHOT_HEADER_CURRENT_VERSION); } - public static SnapshotFooterRecord deserializedSnapshotFooterRecord(Record record) { + public static SnapshotFooterRecord deserializeSnapshotFooterRecord(Record record) { ControlRecordType recordType = ControlRecordType.parse(record.key()); if (recordType != ControlRecordType.SNAPSHOT_FOOTER) { throw new IllegalArgumentException( "Expected SNAPSHOT_FOOTER control record type(4), but found " + recordType.toString()); } - return deserializedSnapshotFooterRecord(record.value().duplicate()); + return deserializeSnapshotFooterRecord(record.value()); } - public static SnapshotFooterRecord deserializedSnapshotFooterRecord(ByteBuffer data) { - ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.duplicate()); + public static SnapshotFooterRecord deserializeSnapshotFooterRecord(ByteBuffer data) { + ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice()); return new SnapshotFooterRecord(byteBufferAccessor, SNAPSHOT_FOOTER_CURRENT_VERSION); } } diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index 277b3e1c900..69df7552e15 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -22,12 +22,15 @@ import kafka.server.{BrokerTopicStats, KafkaConfig, RequestLocal} import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.config.{AbstractConfig, TopicConfig} import org.apache.kafka.common.errors.InvalidConfigurationException -import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords, Records} -import org.apache.kafka.common.utils.{BufferSupplier, Time} +import org.apache.kafka.common.record.{MemoryRecords, Records} +import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier +import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch} +import org.apache.kafka.server.common.serialization.RecordSerde import org.apache.kafka.server.util.Scheduler -import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots} +import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, +RecordsSnapshotReader, SnapshotPath, Snapshots} import org.apache.kafka.storage.internals import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, LogConfig, LogDirFailureChannel, LogStartOffsetIncrementReason, ProducerStateManagerConfig} @@ -40,6 +43,7 @@ import scala.compat.java8.OptionConverters._ final class KafkaMetadataLog private ( val log: UnifiedLog, + recordSerde: RecordSerde[_], time: Time, scheduler: Scheduler, // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the @@ -363,17 +367,19 @@ final class KafkaMetadataLog private ( * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records */ private def readSnapshotTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = { - readSnapshot(snapshotId).asScala.flatMap { reader => - val batchIterator = reader.records().batchIterator() + readSnapshot(snapshotId).asScala.map { reader => + val recordsSnapshotReader = RecordsSnapshotReader.of( + reader, + recordSerde, + new GrowableBufferSupplier(), + KafkaRaftClient.MAX_BATCH_SIZE_BYTES, + true + ) - val firstBatch = batchIterator.next() - val records = firstBatch.streamingIterator(new BufferSupplier.GrowableBufferSupplier()) - if (firstBatch.isControlBatch) { - val header = ControlRecordUtils.deserializedSnapshotHeaderRecord(records.next()) - Some(header.lastContainedLogTimestamp()) - } else { - warn("Did not find control record at beginning of snapshot") - None + try { + recordsSnapshotReader.lastContainedLogTimestamp + } finally { + recordsSnapshotReader.close() } } } @@ -548,6 +554,7 @@ object KafkaMetadataLog extends Logging { topicPartition: TopicPartition, topicId: Uuid, dataDir: File, + recordSerde: RecordSerde[_], time: Time, scheduler: Scheduler, config: MetadataLogConfig @@ -597,6 +604,7 @@ object KafkaMetadataLog extends Logging { val metadataLog = new KafkaMetadataLog( log, + recordSerde, time, scheduler, recoverSnapshots(log), diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index ea0f453c9e5..e6e5aa0bb36 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -264,6 +264,7 @@ class KafkaRaftManager[T]( topicPartition, topicId, dataDir, + recordSerde, time, scheduler, config = MetadataLogConfig(config, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES) diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index bb773d86ade..8a37e659220 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -302,10 +302,10 @@ object DumpLogSegments { val endTxnMarker = EndTransactionMarker.deserialize(record) print(s" endTxnMarker: ${endTxnMarker.controlType} coordinatorEpoch: ${endTxnMarker.coordinatorEpoch}") case ControlRecordType.SNAPSHOT_HEADER => - val header = ControlRecordUtils.deserializedSnapshotHeaderRecord(record) + val header = ControlRecordUtils.deserializeSnapshotHeaderRecord(record) print(s" SnapshotHeader ${SnapshotHeaderRecordJsonConverter.write(header, header.version())}") case ControlRecordType.SNAPSHOT_FOOTER => - val footer = ControlRecordUtils.deserializedSnapshotFooterRecord(record) + val footer = ControlRecordUtils.deserializeSnapshotFooterRecord(record) print(s" SnapshotFooter ${SnapshotFooterRecordJsonConverter.write(footer, footer.version())}") case controlType => print(s" controlType: $controlType($controlTypeId)") diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala index 0f8f9ebc9fc..7852a0dc2a0 100644 --- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala +++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala @@ -1041,6 +1041,7 @@ object KafkaMetadataLogTest { KafkaRaftServer.MetadataPartition, KafkaRaftServer.MetadataTopicId, logDir, + new ByteArraySerde, time, time.scheduler, metadataLogConfig diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index ea6d591cf33..4114330b058 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -250,7 +250,7 @@ class DumpLogSegmentsTest { ) val records: Array[SimpleRecord] = metadataRecords.map(message => { - val serde = new MetadataRecordSerde() + val serde = MetadataRecordSerde.INSTANCE val cache = new ObjectSerializationCache val size = serde.recordSize(message, cache) val buf = ByteBuffer.allocate(size) @@ -303,6 +303,7 @@ class DumpLogSegmentsTest { KafkaRaftServer.MetadataPartition, KafkaRaftServer.MetadataTopicId, logDir, + MetadataRecordSerde.INSTANCE, time, time.scheduler, MetadataLogConfig( @@ -328,7 +329,7 @@ class DumpLogSegmentsTest { new MockTime, lastContainedLogTimestamp, CompressionType.NONE, - new MetadataRecordSerde + MetadataRecordSerde.INSTANCE, ).get() ) { snapshotWriter => snapshotWriter.append(metadataRecords.asJava) diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java index c7f651cf895..1738471692f 100644 --- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java @@ -18,9 +18,11 @@ package org.apache.kafka.image.loader; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.SnapshotHeaderRecord; import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; @@ -28,6 +30,7 @@ import org.apache.kafka.image.MetadataProvenance; import org.apache.kafka.image.publisher.MetadataPublisher; import org.apache.kafka.raft.Batch; import org.apache.kafka.raft.BatchReader; +import org.apache.kafka.raft.ControlRecord; import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -236,8 +239,17 @@ public class MetadataLoaderTest { loader.installPublishers(asList(publisher)).get(); if (loadSnapshot) { MockSnapshotReader snapshotReader = new MockSnapshotReader( - new MetadataProvenance(200, 100, 4000), - asList(Batch.control(200, 100, 4000, 10, 200))); + new MetadataProvenance(200, 100, 4000), + asList( + Batch.control( + 200, + 100, + 4000, + 10, + asList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord())) + ) + ) + ); loader.handleSnapshot(snapshotReader); } loader.waitForAllEventsToBeHandled(); @@ -329,8 +341,17 @@ public class MetadataLoaderTest { long offset ) throws Exception { MockSnapshotReader snapshotReader = new MockSnapshotReader( - new MetadataProvenance(offset, 100, 4000), - asList(Batch.control(200, 100, 4000, 10, 200))); + new MetadataProvenance(offset, 100, 4000), + asList( + Batch.control( + 200, + 100, + 4000, + 10, + asList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord())) + ) + ) + ); if (loader.time() instanceof MockTime) { snapshotReader.setTime((MockTime) loader.time()); } @@ -409,16 +430,25 @@ public class MetadataLoaderTest { loader.installPublishers(publishers).get(); loadTestSnapshot(loader, 200); publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS); - MockBatchReader batchReader = new MockBatchReader(300, asList( - Batch.control(300, 100, 4000, 10, 400))). - setTime(time); + MockBatchReader batchReader = new MockBatchReader( + 300, + asList( + Batch.control( + 300, + 100, + 4000, + 10, + asList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord())) + ) + ) + ).setTime(time); loader.handleCommit(batchReader); loader.waitForAllEventsToBeHandled(); assertTrue(batchReader.closed); - assertEquals(400L, loader.lastAppliedOffset()); + assertEquals(300L, loader.lastAppliedOffset()); } assertTrue(publishers.get(0).closed); - assertEquals(new LogDeltaManifest(new MetadataProvenance(400, 100, 4000), LeaderAndEpoch.UNKNOWN, 1, + assertEquals(new LogDeltaManifest(new MetadataProvenance(300, 100, 4000), LeaderAndEpoch.UNKNOWN, 1, 3000000L, 10), publishers.get(0).latestLogDeltaManifest); assertEquals(MetadataVersion.IBP_3_3_IV1, diff --git a/raft/src/main/java/org/apache/kafka/raft/Batch.java b/raft/src/main/java/org/apache/kafka/raft/Batch.java index 685a75821cb..353723166d9 100644 --- a/raft/src/main/java/org/apache/kafka/raft/Batch.java +++ b/raft/src/main/java/org/apache/kafka/raft/Batch.java @@ -33,6 +33,7 @@ public final class Batch implements Iterable { private final int sizeInBytes; private final long lastOffset; private final List records; + private final List controlRecords; private Batch( long baseOffset, @@ -40,7 +41,8 @@ public final class Batch implements Iterable { long appendTimestamp, int sizeInBytes, long lastOffset, - List records + List records, + List controlRecords ) { this.baseOffset = baseOffset; this.epoch = epoch; @@ -48,6 +50,7 @@ public final class Batch implements Iterable { this.sizeInBytes = sizeInBytes; this.lastOffset = lastOffset; this.records = records; + this.controlRecords = controlRecords; } /** @@ -78,6 +81,13 @@ public final class Batch implements Iterable { return records; } + /** + * The list of control records in the batch. + */ + public List controlRecords() { + return controlRecords; + } + /** * The epoch of the leader that appended the record batch. */ @@ -106,6 +116,7 @@ public final class Batch implements Iterable { ", sizeInBytes=" + sizeInBytes + ", lastOffset=" + lastOffset + ", records=" + records + + ", controlRecords=" + controlRecords + ')'; } @@ -119,7 +130,8 @@ public final class Batch implements Iterable { appendTimestamp == batch.appendTimestamp && sizeInBytes == batch.sizeInBytes && lastOffset == batch.lastOffset && - Objects.equals(records, batch.records); + Objects.equals(records, batch.records) && + Objects.equals(controlRecords, batch.controlRecords); } @Override @@ -130,7 +142,8 @@ public final class Batch implements Iterable { appendTimestamp, sizeInBytes, lastOffset, - records + records, + controlRecords ); } @@ -150,15 +163,26 @@ public final class Batch implements Iterable { int epoch, long appendTimestamp, int sizeInBytes, - long lastOffset + List records ) { + if (records.isEmpty()) { + throw new IllegalArgumentException( + String.format( + "Control batch must contain at least one record; baseOffset = %s; epoch = %s", + baseOffset, + epoch + ) + ); + } + return new Batch<>( baseOffset, epoch, appendTimestamp, sizeInBytes, - lastOffset, - Collections.emptyList() + baseOffset + records.size() - 1, + Collections.emptyList(), + records ); } @@ -194,7 +218,8 @@ public final class Batch implements Iterable { appendTimestamp, sizeInBytes, baseOffset + records.size() - 1, - records + records, + Collections.emptyList() ); } } diff --git a/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java b/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java new file mode 100644 index 00000000000..bf685770720 --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import java.util.Objects; +import org.apache.kafka.common.message.LeaderChangeMessage; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.record.ControlRecordType; + +public final class ControlRecord { + private final ControlRecordType recordType; + private final ApiMessage message; + + private static void throwIllegalArgument(ControlRecordType recordType, ApiMessage message) { + throw new IllegalArgumentException( + String.format( + "Record type %s doesn't match message class %s", + recordType, + message.getClass() + ) + ); + } + + public ControlRecord(ControlRecordType recordType, ApiMessage message) { + switch (recordType) { + case LEADER_CHANGE: + if (!(message instanceof LeaderChangeMessage)) { + throwIllegalArgument(recordType, message); + } + break; + case SNAPSHOT_HEADER: + if (!(message instanceof SnapshotHeaderRecord)) { + throwIllegalArgument(recordType, message); + } + break; + case SNAPSHOT_FOOTER: + if (!(message instanceof SnapshotFooterRecord)) { + throwIllegalArgument(recordType, message); + } + break; + default: + throw new IllegalArgumentException(String.format("Unknown control record type %s", recordType)); + } + + this.recordType = recordType; + this.message = message; + } + + public ControlRecordType type() { + return recordType; + } + + public short version() { + switch (recordType) { + case LEADER_CHANGE: + return ((LeaderChangeMessage) message).version(); + case SNAPSHOT_HEADER: + return ((SnapshotHeaderRecord) message).version(); + case SNAPSHOT_FOOTER: + return ((SnapshotFooterRecord) message).version(); + default: + throw new IllegalStateException(String.format("Unknown control record type %s", recordType)); + } + } + + public ApiMessage message() { + return message; + } + + @Override + public boolean equals(Object other) { + if (this == other) return true; + if (other == null || getClass() != other.getClass()) return false; + ControlRecord that = (ControlRecord) other; + return Objects.equals(recordType, that.recordType) && + Objects.equals(message, that.message); + } + + @Override + public int hashCode() { + return Objects.hash(recordType, message); + } + + @Override + public String toString() { + return String.format("ControlRecord(recordType=%s, message=%s)", recordType, message); + } +} diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java index efb1d69a34f..cb8f5762b9d 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java @@ -26,8 +26,11 @@ import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.Optional; - +import java.util.function.BiFunction; +import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.ControlRecordUtils; import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; @@ -37,6 +40,7 @@ import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.ControlRecord; import org.apache.kafka.server.common.serialization.RecordSerde; public final class RecordsIterator implements Iterator>, AutoCloseable { @@ -199,45 +203,56 @@ public final class RecordsIterator implements Iterator>, AutoCloseab batch.ensureValid(); } - final Batch result; - if (batch.isControlBatch()) { - result = Batch.control( - batch.baseOffset(), - batch.partitionLeaderEpoch(), - batch.maxTimestamp(), - batch.sizeInBytes(), - batch.lastOffset() - ); - } else { - Integer numRecords = batch.countOrNull(); - if (numRecords == null) { - throw new IllegalStateException("Expected a record count for the records batch"); - } + Integer numRecords = batch.countOrNull(); + if (numRecords == null) { + throw new IllegalStateException("Expected a record count for the records batch"); + } - List records = new ArrayList<>(numRecords); - DataInputStream input = new DataInputStream(batch.recordInputStream(bufferSupplier)); - try { + DataInputStream input = new DataInputStream(batch.recordInputStream(bufferSupplier)); + + final Batch result; + try { + if (batch.isControlBatch()) { + List records = new ArrayList<>(numRecords); for (int i = 0; i < numRecords; i++) { - T record = readRecord(input, batch.sizeInBytes()); + ControlRecord record = readRecord(input, batch.sizeInBytes(), RecordsIterator::decodeControlRecord); records.add(record); } - } finally { - Utils.closeQuietly(input, "DataInputStream"); - } - result = Batch.data( - batch.baseOffset(), - batch.partitionLeaderEpoch(), - batch.maxTimestamp(), - batch.sizeInBytes(), - records - ); + result = Batch.control( + batch.baseOffset(), + batch.partitionLeaderEpoch(), + batch.maxTimestamp(), + batch.sizeInBytes(), + records + ); + } else { + List records = new ArrayList<>(numRecords); + for (int i = 0; i < numRecords; i++) { + T record = readRecord(input, batch.sizeInBytes(), this::decodeDataRecord); + records.add(record); + } + + result = Batch.data( + batch.baseOffset(), + batch.partitionLeaderEpoch(), + batch.maxTimestamp(), + batch.sizeInBytes(), + records + ); + } + } finally { + Utils.closeQuietly(input, "DataInputStream"); } return result; } - private T readRecord(DataInputStream stream, int totalBatchSize) { + private U readRecord( + DataInputStream stream, + int totalBatchSize, + BiFunction, Optional, U> decoder + ) { // Read size of body in bytes int size; try { @@ -281,20 +296,22 @@ public final class RecordsIterator implements Iterator>, AutoCloseab // Read offset delta input.readVarint(); + // Read the key int keySize = input.readVarint(); - if (keySize != -1) { - throw new IllegalArgumentException("Got key size of " + keySize + ", but this is invalid because it " + - "is not -1 as expected."); + Optional key = Optional.empty(); + if (keySize >= 0) { + key = Optional.of(input.readByteBuffer(keySize)); } + // Read the value int valueSize = input.readVarint(); - if (valueSize < 1) { - throw new IllegalArgumentException("Got payload size of " + valueSize + ", but this is invalid because " + - "it is less than 1."); + Optional value = Optional.empty(); + if (valueSize >= 0) { + value = Optional.of(input.readByteBuffer(valueSize)); } // Read the metadata record body from the file input reader - T record = serde.read(input, valueSize); + U record = decoder.apply(key, value); // Read the number of headers. Currently, this must be a single byte set to 0. int numHeaders = buf.array()[size - 1]; @@ -302,9 +319,59 @@ public final class RecordsIterator implements Iterator>, AutoCloseab throw new IllegalArgumentException("Got numHeaders of " + numHeaders + ", but this is invalid because " + "it is not 0 as expected."); } + return record; } finally { bufferSupplier.release(buf); } } + + private T decodeDataRecord(Optional key, Optional value) { + if (key.isPresent()) { + throw new IllegalArgumentException("Got key in the record when no key was expected"); + } + + if (!value.isPresent()) { + throw new IllegalArgumentException("Missing value in the record when a value was expected"); + } else if (value.get().remaining() == 0) { + throw new IllegalArgumentException("Got an unexpected empty value in the record"); + } + + ByteBuffer valueBuffer = value.get(); + + return serde.read(new ByteBufferAccessor(valueBuffer), valueBuffer.remaining()); + } + + private static ControlRecord decodeControlRecord(Optional key, Optional value) { + if (!key.isPresent()) { + throw new IllegalArgumentException("Missing key in the record when a key was expected"); + } else if (key.get().remaining() == 0) { + throw new IllegalArgumentException("Got an unexpected empty key in the record"); + } + + if (!value.isPresent()) { + throw new IllegalArgumentException("Missing value in the record when a value was expected"); + } else if (value.get().remaining() == 0) { + throw new IllegalArgumentException("Got an unexpected empty value in the record"); + } + + ControlRecordType type = ControlRecordType.parse(key.get()); + + final ApiMessage message; + switch (type) { + case LEADER_CHANGE: + message = ControlRecordUtils.deserializeLeaderChangeMessage(value.get()); + break; + case SNAPSHOT_HEADER: + message = ControlRecordUtils.deserializeSnapshotHeaderRecord(value.get()); + break; + case SNAPSHOT_FOOTER: + message = ControlRecordUtils.deserializeSnapshotFooterRecord(value.get()); + break; + default: + throw new IllegalArgumentException(String.format("Unknown control record type %s", type)); + } + + return new ControlRecord(type, message); + } } diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java index 71380038ad7..afd1928cfb6 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java @@ -20,12 +20,13 @@ package org.apache.kafka.snapshot; import java.util.NoSuchElementException; import java.util.Optional; import java.util.OptionalLong; - +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.raft.Batch; import org.apache.kafka.raft.OffsetAndEpoch; -import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.raft.internals.RecordsIterator; +import org.apache.kafka.server.common.serialization.RecordSerde; public final class RecordsSnapshotReader implements SnapshotReader { private final OffsetAndEpoch snapshotId; @@ -121,9 +122,22 @@ public final class RecordsSnapshotReader implements SnapshotReader { Batch batch = iterator.next(); if (!lastContainedLogTimestamp.isPresent()) { - // The Batch type doesn't support returning control batches. For now lets just use - // the append time of the first batch - lastContainedLogTimestamp = OptionalLong.of(batch.appendTimestamp()); + // This must be the first batch which is expected to be a control batch with one record for + // the snapshot header. + if (batch.controlRecords().isEmpty()) { + throw new IllegalStateException("First batch is not a control batch with at least one record"); + } else if (!ControlRecordType.SNAPSHOT_HEADER.equals(batch.controlRecords().get(0).type())) { + throw new IllegalStateException( + String.format( + "First control record is not a snapshot header (%s)", + batch.controlRecords().get(0).type() + ) + ); + } + + lastContainedLogTimestamp = OptionalLong.of( + ((SnapshotHeaderRecord) batch.controlRecords().get(0).message()).lastContainedLogTimestamp() + ); } if (!batch.records().isEmpty()) { diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java index 05d3fde09d0..eeacf608a9f 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java @@ -131,7 +131,7 @@ final public class RecordsSnapshotWriter implements SnapshotWriter { ); } - public static SnapshotWriter createWithHeader( + public static RecordsSnapshotWriter createWithHeader( RawSnapshotWriter rawSnapshotWriter, int maxBatchSize, MemoryPool memoryPool, diff --git a/raft/src/test/java/org/apache/kafka/ControlRecordTest.java b/raft/src/test/java/org/apache/kafka/ControlRecordTest.java new file mode 100644 index 00000000000..5ed500995e2 --- /dev/null +++ b/raft/src/test/java/org/apache/kafka/ControlRecordTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.message.LeaderChangeMessage; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.record.ControlRecordType; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public final class ControlRecordTest { + @Test + void testCtr() { + // Valid constructions + new ControlRecord(ControlRecordType.LEADER_CHANGE, new LeaderChangeMessage()); + new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()); + new ControlRecord(ControlRecordType.SNAPSHOT_FOOTER, new SnapshotFooterRecord()); + + // Invalid constructions + assertThrows( + IllegalArgumentException.class, + () -> new ControlRecord(ControlRecordType.ABORT, new SnapshotFooterRecord()) + ); + assertThrows( + IllegalArgumentException.class, + () -> new ControlRecord(ControlRecordType.LEADER_CHANGE, new SnapshotHeaderRecord()) + ); + assertThrows( + IllegalArgumentException.class, + () -> new ControlRecord(ControlRecordType.SNAPSHOT_FOOTER, Mockito.mock(ApiMessage.class)) + ); + } + + @Test + void testControlRecordTypeValues() { + // If this test fails then it means that ControlRecordType was changed. Please review the + // implementation for ControlRecord to see if it needs to be updated based on the changes + // to ControlRecordType. + assertEquals(6, ControlRecordType.values().length); + } +} diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index 23fe3fd0694..4e9a377d5fd 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -1756,14 +1756,14 @@ final public class KafkaRaftClientSnapshotTest { private static SnapshotWriter snapshotWriter(RaftClientTestContext context, RawSnapshotWriter snapshot) { return RecordsSnapshotWriter.createWithHeader( - () -> Optional.of(snapshot), + snapshot, 4 * 1024, MemoryPool.NONE, context.time, 0, CompressionType.NONE, new StringSerde() - ).get(); + ); } private final static class MemorySnapshotWriter implements RawSnapshotWriter { diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java index 9676831b30e..67f16c9ac8f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.raft.internals; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.IdentityHashMap; import java.util.List; @@ -26,20 +27,30 @@ import java.util.NoSuchElementException; import java.util.Objects; import java.util.Random; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; import net.jqwik.api.ForAll; import net.jqwik.api.Property; import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.message.SnapshotHeaderRecord; import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.snapshot.MockRawSnapshotWriter; +import org.apache.kafka.snapshot.RecordsSnapshotWriter; import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -129,6 +140,62 @@ public final class RecordsIteratorTest { moreFileRecords.close(); } + @Test + public void testControlRecordIteration() { + AtomicReference buffer = new AtomicReference<>(null); + try (RecordsSnapshotWriter snapshot = RecordsSnapshotWriter.createWithHeader( + new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10), snapshotBuf -> buffer.set(snapshotBuf)), + 4 * 1024, + MemoryPool.NONE, + new MockTime(), + 0, + CompressionType.NONE, + STRING_SERDE + ) + ) { + snapshot.append(Arrays.asList("a", "b", "c")); + snapshot.append(Arrays.asList("d", "e", "f")); + snapshot.append(Arrays.asList("g", "h", "i")); + snapshot.freeze(); + } + + try (RecordsIterator iterator = createIterator( + MemoryRecords.readableRecords(buffer.get()), + BufferSupplier.NO_CACHING, + true + ) + ) { + // Check snapshot header control record + Batch batch = iterator.next(); + + assertEquals(1, batch.controlRecords().size()); + assertEquals(ControlRecordType.SNAPSHOT_HEADER, batch.controlRecords().get(0).type()); + assertEquals(new SnapshotHeaderRecord(), batch.controlRecords().get(0).message()); + + // Consume the iterator until we find a control record + do { + batch = iterator.next(); + } + while (batch.controlRecords().isEmpty()); + + // Check snapshot footer control record + assertEquals(1, batch.controlRecords().size()); + assertEquals(ControlRecordType.SNAPSHOT_FOOTER, batch.controlRecords().get(0).type()); + assertEquals(new SnapshotFooterRecord(), batch.controlRecords().get(0).message()); + + // Snapshot footer must be last record + assertFalse(iterator.hasNext()); + } + } + + @Test + void testControlRecordTypeValues() { + // If this test fails then it means that ControlRecordType was changed. Please review the + // implementation for RecordsIterator to see if it needs to be updated based on the changes + // to ControlRecordType. + assertEquals(6, ControlRecordType.values().length); + } + private void testIterator( List> expectedBatches, Records records, @@ -136,21 +203,21 @@ public final class RecordsIteratorTest { ) { Set allocatedBuffers = Collections.newSetFromMap(new IdentityHashMap<>()); - RecordsIterator iterator = createIterator( - records, - mockBufferSupplier(allocatedBuffers), - validateCrc - ); + try (RecordsIterator iterator = createIterator( + records, + mockBufferSupplier(allocatedBuffers), + validateCrc + ) + ) { + for (TestBatch batch : expectedBatches) { + assertTrue(iterator.hasNext()); + assertEquals(batch, TestBatch.from(iterator.next())); + } - for (TestBatch batch : expectedBatches) { - assertTrue(iterator.hasNext()); - assertEquals(batch, TestBatch.from(iterator.next())); + assertFalse(iterator.hasNext()); + assertThrows(NoSuchElementException.class, iterator::next); } - assertFalse(iterator.hasNext()); - assertThrows(NoSuchElementException.class, iterator::next); - - iterator.close(); assertEquals(Collections.emptySet(), allocatedBuffers); } diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java index bd8258cb708..9417e5d44cd 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java @@ -71,6 +71,8 @@ final public class SnapshotWriterReaderTest { // Verify that an empty snapshot has only the Header and Footer try (SnapshotReader reader = readSnapshot(context, id, Integer.MAX_VALUE)) { + assertEquals(magicTimestamp, reader.lastContainedLogTimestamp()); + RawSnapshotReader snapshot = context.log.readSnapshot(id).get(); int recordCount = validateDelimiters(snapshot, magicTimestamp); assertEquals((recordsPerBatch * batches) + delimiterCount, recordCount); @@ -218,7 +220,7 @@ final public class SnapshotWriterReaderTest { Record record = records.next(); countRecords += 1; - SnapshotHeaderRecord headerRecord = ControlRecordUtils.deserializedSnapshotHeaderRecord(record); + SnapshotHeaderRecord headerRecord = ControlRecordUtils.deserializeSnapshotHeaderRecord(record); assertEquals(headerRecord.version(), ControlRecordUtils.SNAPSHOT_HEADER_CURRENT_VERSION); assertEquals(headerRecord.lastContainedLogTimestamp(), lastContainedLogTime); @@ -238,7 +240,7 @@ final public class SnapshotWriterReaderTest { // Verify existence of the footer record in the end assertTrue(batch.isControlBatch()); - SnapshotFooterRecord footerRecord = ControlRecordUtils.deserializedSnapshotFooterRecord(record); + SnapshotFooterRecord footerRecord = ControlRecordUtils.deserializeSnapshotFooterRecord(record); assertEquals(footerRecord.version(), ControlRecordUtils.SNAPSHOT_FOOTER_CURRENT_VERSION); return countRecords;