KAFKA-13020; Implement reading Snapshot log append timestamp (#13345)

The SnapshotReader exposes the "last contained log time". This is mainly used during snapshot cleanup. The previous implementation used the append time of the snapshot record. This is not accurate as this is the time when the snapshot was created and not the log append time of the last record included in the snapshot.

The log append time of the last record included in the snapshot is store in the header control record of the snapshot. The header control record is the first record of the snapshot.

To be able to read this record, this change extends the RecordsIterator to decode and expose the control records in the Records type.

Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
This commit is contained in:
José Armando García Sancio 2023-04-07 09:25:54 -07:00 committed by GitHub
parent d5e216d618
commit 672dd3ab6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 482 additions and 102 deletions

View File

@ -251,6 +251,7 @@
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.image" />

View File

@ -37,39 +37,39 @@ public class ControlRecordUtils {
throw new IllegalArgumentException(
"Expected LEADER_CHANGE control record type(2), but found " + recordType.toString());
}
return deserializeLeaderChangeMessage(record.value().duplicate());
return deserializeLeaderChangeMessage(record.value());
}
public static LeaderChangeMessage deserializeLeaderChangeMessage(ByteBuffer data) {
ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.duplicate());
ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice());
return new LeaderChangeMessage(byteBufferAccessor, LEADER_CHANGE_CURRENT_VERSION);
}
public static SnapshotHeaderRecord deserializedSnapshotHeaderRecord(Record record) {
public static SnapshotHeaderRecord deserializeSnapshotHeaderRecord(Record record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());
if (recordType != ControlRecordType.SNAPSHOT_HEADER) {
throw new IllegalArgumentException(
"Expected SNAPSHOT_HEADER control record type(3), but found " + recordType.toString());
}
return deserializedSnapshotHeaderRecord(record.value().duplicate());
return deserializeSnapshotHeaderRecord(record.value());
}
public static SnapshotHeaderRecord deserializedSnapshotHeaderRecord(ByteBuffer data) {
ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.duplicate());
public static SnapshotHeaderRecord deserializeSnapshotHeaderRecord(ByteBuffer data) {
ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice());
return new SnapshotHeaderRecord(byteBufferAccessor, SNAPSHOT_HEADER_CURRENT_VERSION);
}
public static SnapshotFooterRecord deserializedSnapshotFooterRecord(Record record) {
public static SnapshotFooterRecord deserializeSnapshotFooterRecord(Record record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());
if (recordType != ControlRecordType.SNAPSHOT_FOOTER) {
throw new IllegalArgumentException(
"Expected SNAPSHOT_FOOTER control record type(4), but found " + recordType.toString());
}
return deserializedSnapshotFooterRecord(record.value().duplicate());
return deserializeSnapshotFooterRecord(record.value());
}
public static SnapshotFooterRecord deserializedSnapshotFooterRecord(ByteBuffer data) {
ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.duplicate());
public static SnapshotFooterRecord deserializeSnapshotFooterRecord(ByteBuffer data) {
ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice());
return new SnapshotFooterRecord(byteBufferAccessor, SNAPSHOT_FOOTER_CURRENT_VERSION);
}
}

View File

@ -22,12 +22,15 @@ import kafka.server.{BrokerTopicStats, KafkaConfig, RequestLocal}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.config.{AbstractConfig, TopicConfig}
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords, Records}
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter,
RecordsSnapshotReader, SnapshotPath, Snapshots}
import org.apache.kafka.storage.internals
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, LogConfig, LogDirFailureChannel, LogStartOffsetIncrementReason, ProducerStateManagerConfig}
@ -40,6 +43,7 @@ import scala.compat.java8.OptionConverters._
final class KafkaMetadataLog private (
val log: UnifiedLog,
recordSerde: RecordSerde[_],
time: Time,
scheduler: Scheduler,
// Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the
@ -363,17 +367,19 @@ final class KafkaMetadataLog private (
* Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
*/
private def readSnapshotTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {
readSnapshot(snapshotId).asScala.flatMap { reader =>
val batchIterator = reader.records().batchIterator()
readSnapshot(snapshotId).asScala.map { reader =>
val recordsSnapshotReader = RecordsSnapshotReader.of(
reader,
recordSerde,
new GrowableBufferSupplier(),
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
true
)
val firstBatch = batchIterator.next()
val records = firstBatch.streamingIterator(new BufferSupplier.GrowableBufferSupplier())
if (firstBatch.isControlBatch) {
val header = ControlRecordUtils.deserializedSnapshotHeaderRecord(records.next())
Some(header.lastContainedLogTimestamp())
} else {
warn("Did not find control record at beginning of snapshot")
None
try {
recordsSnapshotReader.lastContainedLogTimestamp
} finally {
recordsSnapshotReader.close()
}
}
}
@ -548,6 +554,7 @@ object KafkaMetadataLog extends Logging {
topicPartition: TopicPartition,
topicId: Uuid,
dataDir: File,
recordSerde: RecordSerde[_],
time: Time,
scheduler: Scheduler,
config: MetadataLogConfig
@ -597,6 +604,7 @@ object KafkaMetadataLog extends Logging {
val metadataLog = new KafkaMetadataLog(
log,
recordSerde,
time,
scheduler,
recoverSnapshots(log),

View File

@ -264,6 +264,7 @@ class KafkaRaftManager[T](
topicPartition,
topicId,
dataDir,
recordSerde,
time,
scheduler,
config = MetadataLogConfig(config, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)

View File

@ -302,10 +302,10 @@ object DumpLogSegments {
val endTxnMarker = EndTransactionMarker.deserialize(record)
print(s" endTxnMarker: ${endTxnMarker.controlType} coordinatorEpoch: ${endTxnMarker.coordinatorEpoch}")
case ControlRecordType.SNAPSHOT_HEADER =>
val header = ControlRecordUtils.deserializedSnapshotHeaderRecord(record)
val header = ControlRecordUtils.deserializeSnapshotHeaderRecord(record)
print(s" SnapshotHeader ${SnapshotHeaderRecordJsonConverter.write(header, header.version())}")
case ControlRecordType.SNAPSHOT_FOOTER =>
val footer = ControlRecordUtils.deserializedSnapshotFooterRecord(record)
val footer = ControlRecordUtils.deserializeSnapshotFooterRecord(record)
print(s" SnapshotFooter ${SnapshotFooterRecordJsonConverter.write(footer, footer.version())}")
case controlType =>
print(s" controlType: $controlType($controlTypeId)")

View File

@ -1041,6 +1041,7 @@ object KafkaMetadataLogTest {
KafkaRaftServer.MetadataPartition,
KafkaRaftServer.MetadataTopicId,
logDir,
new ByteArraySerde,
time,
time.scheduler,
metadataLogConfig

View File

@ -250,7 +250,7 @@ class DumpLogSegmentsTest {
)
val records: Array[SimpleRecord] = metadataRecords.map(message => {
val serde = new MetadataRecordSerde()
val serde = MetadataRecordSerde.INSTANCE
val cache = new ObjectSerializationCache
val size = serde.recordSize(message, cache)
val buf = ByteBuffer.allocate(size)
@ -303,6 +303,7 @@ class DumpLogSegmentsTest {
KafkaRaftServer.MetadataPartition,
KafkaRaftServer.MetadataTopicId,
logDir,
MetadataRecordSerde.INSTANCE,
time,
time.scheduler,
MetadataLogConfig(
@ -328,7 +329,7 @@ class DumpLogSegmentsTest {
new MockTime,
lastContainedLogTimestamp,
CompressionType.NONE,
new MetadataRecordSerde
MetadataRecordSerde.INSTANCE,
).get()
) { snapshotWriter =>
snapshotWriter.append(metadataRecords.asJava)

View File

@ -18,9 +18,11 @@
package org.apache.kafka.image.loader;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
@ -28,6 +30,7 @@ import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.ControlRecord;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@ -237,7 +240,16 @@ public class MetadataLoaderTest {
if (loadSnapshot) {
MockSnapshotReader snapshotReader = new MockSnapshotReader(
new MetadataProvenance(200, 100, 4000),
asList(Batch.control(200, 100, 4000, 10, 200)));
asList(
Batch.control(
200,
100,
4000,
10,
asList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
)
)
);
loader.handleSnapshot(snapshotReader);
}
loader.waitForAllEventsToBeHandled();
@ -330,7 +342,16 @@ public class MetadataLoaderTest {
) throws Exception {
MockSnapshotReader snapshotReader = new MockSnapshotReader(
new MetadataProvenance(offset, 100, 4000),
asList(Batch.control(200, 100, 4000, 10, 200)));
asList(
Batch.control(
200,
100,
4000,
10,
asList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
)
)
);
if (loader.time() instanceof MockTime) {
snapshotReader.setTime((MockTime) loader.time());
}
@ -409,16 +430,25 @@ public class MetadataLoaderTest {
loader.installPublishers(publishers).get();
loadTestSnapshot(loader, 200);
publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
MockBatchReader batchReader = new MockBatchReader(300, asList(
Batch.control(300, 100, 4000, 10, 400))).
setTime(time);
MockBatchReader batchReader = new MockBatchReader(
300,
asList(
Batch.control(
300,
100,
4000,
10,
asList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
)
)
).setTime(time);
loader.handleCommit(batchReader);
loader.waitForAllEventsToBeHandled();
assertTrue(batchReader.closed);
assertEquals(400L, loader.lastAppliedOffset());
assertEquals(300L, loader.lastAppliedOffset());
}
assertTrue(publishers.get(0).closed);
assertEquals(new LogDeltaManifest(new MetadataProvenance(400, 100, 4000), LeaderAndEpoch.UNKNOWN, 1,
assertEquals(new LogDeltaManifest(new MetadataProvenance(300, 100, 4000), LeaderAndEpoch.UNKNOWN, 1,
3000000L, 10),
publishers.get(0).latestLogDeltaManifest);
assertEquals(MetadataVersion.IBP_3_3_IV1,

View File

@ -33,6 +33,7 @@ public final class Batch<T> implements Iterable<T> {
private final int sizeInBytes;
private final long lastOffset;
private final List<T> records;
private final List<ControlRecord> controlRecords;
private Batch(
long baseOffset,
@ -40,7 +41,8 @@ public final class Batch<T> implements Iterable<T> {
long appendTimestamp,
int sizeInBytes,
long lastOffset,
List<T> records
List<T> records,
List<ControlRecord> controlRecords
) {
this.baseOffset = baseOffset;
this.epoch = epoch;
@ -48,6 +50,7 @@ public final class Batch<T> implements Iterable<T> {
this.sizeInBytes = sizeInBytes;
this.lastOffset = lastOffset;
this.records = records;
this.controlRecords = controlRecords;
}
/**
@ -78,6 +81,13 @@ public final class Batch<T> implements Iterable<T> {
return records;
}
/**
* The list of control records in the batch.
*/
public List<ControlRecord> controlRecords() {
return controlRecords;
}
/**
* The epoch of the leader that appended the record batch.
*/
@ -106,6 +116,7 @@ public final class Batch<T> implements Iterable<T> {
", sizeInBytes=" + sizeInBytes +
", lastOffset=" + lastOffset +
", records=" + records +
", controlRecords=" + controlRecords +
')';
}
@ -119,7 +130,8 @@ public final class Batch<T> implements Iterable<T> {
appendTimestamp == batch.appendTimestamp &&
sizeInBytes == batch.sizeInBytes &&
lastOffset == batch.lastOffset &&
Objects.equals(records, batch.records);
Objects.equals(records, batch.records) &&
Objects.equals(controlRecords, batch.controlRecords);
}
@Override
@ -130,7 +142,8 @@ public final class Batch<T> implements Iterable<T> {
appendTimestamp,
sizeInBytes,
lastOffset,
records
records,
controlRecords
);
}
@ -150,15 +163,26 @@ public final class Batch<T> implements Iterable<T> {
int epoch,
long appendTimestamp,
int sizeInBytes,
long lastOffset
List<ControlRecord> records
) {
if (records.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"Control batch must contain at least one record; baseOffset = %s; epoch = %s",
baseOffset,
epoch
)
);
}
return new Batch<>(
baseOffset,
epoch,
appendTimestamp,
sizeInBytes,
lastOffset,
Collections.emptyList()
baseOffset + records.size() - 1,
Collections.emptyList(),
records
);
}
@ -194,7 +218,8 @@ public final class Batch<T> implements Iterable<T> {
appendTimestamp,
sizeInBytes,
baseOffset + records.size() - 1,
records
records,
Collections.emptyList()
);
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft;
import java.util.Objects;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.record.ControlRecordType;
public final class ControlRecord {
private final ControlRecordType recordType;
private final ApiMessage message;
private static void throwIllegalArgument(ControlRecordType recordType, ApiMessage message) {
throw new IllegalArgumentException(
String.format(
"Record type %s doesn't match message class %s",
recordType,
message.getClass()
)
);
}
public ControlRecord(ControlRecordType recordType, ApiMessage message) {
switch (recordType) {
case LEADER_CHANGE:
if (!(message instanceof LeaderChangeMessage)) {
throwIllegalArgument(recordType, message);
}
break;
case SNAPSHOT_HEADER:
if (!(message instanceof SnapshotHeaderRecord)) {
throwIllegalArgument(recordType, message);
}
break;
case SNAPSHOT_FOOTER:
if (!(message instanceof SnapshotFooterRecord)) {
throwIllegalArgument(recordType, message);
}
break;
default:
throw new IllegalArgumentException(String.format("Unknown control record type %s", recordType));
}
this.recordType = recordType;
this.message = message;
}
public ControlRecordType type() {
return recordType;
}
public short version() {
switch (recordType) {
case LEADER_CHANGE:
return ((LeaderChangeMessage) message).version();
case SNAPSHOT_HEADER:
return ((SnapshotHeaderRecord) message).version();
case SNAPSHOT_FOOTER:
return ((SnapshotFooterRecord) message).version();
default:
throw new IllegalStateException(String.format("Unknown control record type %s", recordType));
}
}
public ApiMessage message() {
return message;
}
@Override
public boolean equals(Object other) {
if (this == other) return true;
if (other == null || getClass() != other.getClass()) return false;
ControlRecord that = (ControlRecord) other;
return Objects.equals(recordType, that.recordType) &&
Objects.equals(message, that.message);
}
@Override
public int hashCode() {
return Objects.hash(recordType, message);
}
@Override
public String toString() {
return String.format("ControlRecord(recordType=%s, message=%s)", recordType, message);
}
}

View File

@ -26,8 +26,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.function.BiFunction;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
@ -37,6 +40,7 @@ import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.ControlRecord;
import org.apache.kafka.server.common.serialization.RecordSerde;
public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseable {
@ -199,30 +203,34 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
batch.ensureValid();
}
final Batch<T> result;
if (batch.isControlBatch()) {
result = Batch.control(
batch.baseOffset(),
batch.partitionLeaderEpoch(),
batch.maxTimestamp(),
batch.sizeInBytes(),
batch.lastOffset()
);
} else {
Integer numRecords = batch.countOrNull();
if (numRecords == null) {
throw new IllegalStateException("Expected a record count for the records batch");
}
List<T> records = new ArrayList<>(numRecords);
DataInputStream input = new DataInputStream(batch.recordInputStream(bufferSupplier));
final Batch<T> result;
try {
if (batch.isControlBatch()) {
List<ControlRecord> records = new ArrayList<>(numRecords);
for (int i = 0; i < numRecords; i++) {
T record = readRecord(input, batch.sizeInBytes());
ControlRecord record = readRecord(input, batch.sizeInBytes(), RecordsIterator::decodeControlRecord);
records.add(record);
}
} finally {
Utils.closeQuietly(input, "DataInputStream");
result = Batch.control(
batch.baseOffset(),
batch.partitionLeaderEpoch(),
batch.maxTimestamp(),
batch.sizeInBytes(),
records
);
} else {
List<T> records = new ArrayList<>(numRecords);
for (int i = 0; i < numRecords; i++) {
T record = readRecord(input, batch.sizeInBytes(), this::decodeDataRecord);
records.add(record);
}
result = Batch.data(
@ -233,11 +241,18 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
records
);
}
} finally {
Utils.closeQuietly(input, "DataInputStream");
}
return result;
}
private T readRecord(DataInputStream stream, int totalBatchSize) {
private <U> U readRecord(
DataInputStream stream,
int totalBatchSize,
BiFunction<Optional<ByteBuffer>, Optional<ByteBuffer>, U> decoder
) {
// Read size of body in bytes
int size;
try {
@ -281,20 +296,22 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
// Read offset delta
input.readVarint();
// Read the key
int keySize = input.readVarint();
if (keySize != -1) {
throw new IllegalArgumentException("Got key size of " + keySize + ", but this is invalid because it " +
"is not -1 as expected.");
Optional<ByteBuffer> key = Optional.empty();
if (keySize >= 0) {
key = Optional.of(input.readByteBuffer(keySize));
}
// Read the value
int valueSize = input.readVarint();
if (valueSize < 1) {
throw new IllegalArgumentException("Got payload size of " + valueSize + ", but this is invalid because " +
"it is less than 1.");
Optional<ByteBuffer> value = Optional.empty();
if (valueSize >= 0) {
value = Optional.of(input.readByteBuffer(valueSize));
}
// Read the metadata record body from the file input reader
T record = serde.read(input, valueSize);
U record = decoder.apply(key, value);
// Read the number of headers. Currently, this must be a single byte set to 0.
int numHeaders = buf.array()[size - 1];
@ -302,9 +319,59 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
throw new IllegalArgumentException("Got numHeaders of " + numHeaders + ", but this is invalid because " +
"it is not 0 as expected.");
}
return record;
} finally {
bufferSupplier.release(buf);
}
}
private T decodeDataRecord(Optional<ByteBuffer> key, Optional<ByteBuffer> value) {
if (key.isPresent()) {
throw new IllegalArgumentException("Got key in the record when no key was expected");
}
if (!value.isPresent()) {
throw new IllegalArgumentException("Missing value in the record when a value was expected");
} else if (value.get().remaining() == 0) {
throw new IllegalArgumentException("Got an unexpected empty value in the record");
}
ByteBuffer valueBuffer = value.get();
return serde.read(new ByteBufferAccessor(valueBuffer), valueBuffer.remaining());
}
private static ControlRecord decodeControlRecord(Optional<ByteBuffer> key, Optional<ByteBuffer> value) {
if (!key.isPresent()) {
throw new IllegalArgumentException("Missing key in the record when a key was expected");
} else if (key.get().remaining() == 0) {
throw new IllegalArgumentException("Got an unexpected empty key in the record");
}
if (!value.isPresent()) {
throw new IllegalArgumentException("Missing value in the record when a value was expected");
} else if (value.get().remaining() == 0) {
throw new IllegalArgumentException("Got an unexpected empty value in the record");
}
ControlRecordType type = ControlRecordType.parse(key.get());
final ApiMessage message;
switch (type) {
case LEADER_CHANGE:
message = ControlRecordUtils.deserializeLeaderChangeMessage(value.get());
break;
case SNAPSHOT_HEADER:
message = ControlRecordUtils.deserializeSnapshotHeaderRecord(value.get());
break;
case SNAPSHOT_FOOTER:
message = ControlRecordUtils.deserializeSnapshotFooterRecord(value.get());
break;
default:
throw new IllegalArgumentException(String.format("Unknown control record type %s", type));
}
return new ControlRecord(type, message);
}
}

View File

@ -20,12 +20,13 @@ package org.apache.kafka.snapshot;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalLong;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.raft.internals.RecordsIterator;
import org.apache.kafka.server.common.serialization.RecordSerde;
public final class RecordsSnapshotReader<T> implements SnapshotReader<T> {
private final OffsetAndEpoch snapshotId;
@ -121,9 +122,22 @@ public final class RecordsSnapshotReader<T> implements SnapshotReader<T> {
Batch<T> batch = iterator.next();
if (!lastContainedLogTimestamp.isPresent()) {
// The Batch type doesn't support returning control batches. For now lets just use
// the append time of the first batch
lastContainedLogTimestamp = OptionalLong.of(batch.appendTimestamp());
// This must be the first batch which is expected to be a control batch with one record for
// the snapshot header.
if (batch.controlRecords().isEmpty()) {
throw new IllegalStateException("First batch is not a control batch with at least one record");
} else if (!ControlRecordType.SNAPSHOT_HEADER.equals(batch.controlRecords().get(0).type())) {
throw new IllegalStateException(
String.format(
"First control record is not a snapshot header (%s)",
batch.controlRecords().get(0).type()
)
);
}
lastContainedLogTimestamp = OptionalLong.of(
((SnapshotHeaderRecord) batch.controlRecords().get(0).message()).lastContainedLogTimestamp()
);
}
if (!batch.records().isEmpty()) {

View File

@ -131,7 +131,7 @@ final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {
);
}
public static <T> SnapshotWriter<T> createWithHeader(
public static <T> RecordsSnapshotWriter<T> createWithHeader(
RawSnapshotWriter rawSnapshotWriter,
int maxBatchSize,
MemoryPool memoryPool,

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.record.ControlRecordType;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
public final class ControlRecordTest {
@Test
void testCtr() {
// Valid constructions
new ControlRecord(ControlRecordType.LEADER_CHANGE, new LeaderChangeMessage());
new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord());
new ControlRecord(ControlRecordType.SNAPSHOT_FOOTER, new SnapshotFooterRecord());
// Invalid constructions
assertThrows(
IllegalArgumentException.class,
() -> new ControlRecord(ControlRecordType.ABORT, new SnapshotFooterRecord())
);
assertThrows(
IllegalArgumentException.class,
() -> new ControlRecord(ControlRecordType.LEADER_CHANGE, new SnapshotHeaderRecord())
);
assertThrows(
IllegalArgumentException.class,
() -> new ControlRecord(ControlRecordType.SNAPSHOT_FOOTER, Mockito.mock(ApiMessage.class))
);
}
@Test
void testControlRecordTypeValues() {
// If this test fails then it means that ControlRecordType was changed. Please review the
// implementation for ControlRecord to see if it needs to be updated based on the changes
// to ControlRecordType.
assertEquals(6, ControlRecordType.values().length);
}
}

View File

@ -1756,14 +1756,14 @@ final public class KafkaRaftClientSnapshotTest {
private static SnapshotWriter<String> snapshotWriter(RaftClientTestContext context, RawSnapshotWriter snapshot) {
return RecordsSnapshotWriter.createWithHeader(
() -> Optional.of(snapshot),
snapshot,
4 * 1024,
MemoryPool.NONE,
context.time,
0,
CompressionType.NONE,
new StringSerde()
).get();
);
}
private final static class MemorySnapshotWriter implements RawSnapshotWriter {

View File

@ -19,6 +19,7 @@ package org.apache.kafka.raft.internals;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
@ -26,20 +27,30 @@ import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.jqwik.api.ForAll;
import net.jqwik.api.Property;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@ -129,6 +140,62 @@ public final class RecordsIteratorTest {
moreFileRecords.close();
}
@Test
public void testControlRecordIteration() {
AtomicReference<ByteBuffer> buffer = new AtomicReference<>(null);
try (RecordsSnapshotWriter<String> snapshot = RecordsSnapshotWriter.createWithHeader(
new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10), snapshotBuf -> buffer.set(snapshotBuf)),
4 * 1024,
MemoryPool.NONE,
new MockTime(),
0,
CompressionType.NONE,
STRING_SERDE
)
) {
snapshot.append(Arrays.asList("a", "b", "c"));
snapshot.append(Arrays.asList("d", "e", "f"));
snapshot.append(Arrays.asList("g", "h", "i"));
snapshot.freeze();
}
try (RecordsIterator<String> iterator = createIterator(
MemoryRecords.readableRecords(buffer.get()),
BufferSupplier.NO_CACHING,
true
)
) {
// Check snapshot header control record
Batch<String> batch = iterator.next();
assertEquals(1, batch.controlRecords().size());
assertEquals(ControlRecordType.SNAPSHOT_HEADER, batch.controlRecords().get(0).type());
assertEquals(new SnapshotHeaderRecord(), batch.controlRecords().get(0).message());
// Consume the iterator until we find a control record
do {
batch = iterator.next();
}
while (batch.controlRecords().isEmpty());
// Check snapshot footer control record
assertEquals(1, batch.controlRecords().size());
assertEquals(ControlRecordType.SNAPSHOT_FOOTER, batch.controlRecords().get(0).type());
assertEquals(new SnapshotFooterRecord(), batch.controlRecords().get(0).message());
// Snapshot footer must be last record
assertFalse(iterator.hasNext());
}
}
@Test
void testControlRecordTypeValues() {
// If this test fails then it means that ControlRecordType was changed. Please review the
// implementation for RecordsIterator to see if it needs to be updated based on the changes
// to ControlRecordType.
assertEquals(6, ControlRecordType.values().length);
}
private void testIterator(
List<TestBatch<String>> expectedBatches,
Records records,
@ -136,12 +203,12 @@ public final class RecordsIteratorTest {
) {
Set<ByteBuffer> allocatedBuffers = Collections.newSetFromMap(new IdentityHashMap<>());
RecordsIterator<String> iterator = createIterator(
try (RecordsIterator<String> iterator = createIterator(
records,
mockBufferSupplier(allocatedBuffers),
validateCrc
);
)
) {
for (TestBatch<String> batch : expectedBatches) {
assertTrue(iterator.hasNext());
assertEquals(batch, TestBatch.from(iterator.next()));
@ -149,8 +216,8 @@ public final class RecordsIteratorTest {
assertFalse(iterator.hasNext());
assertThrows(NoSuchElementException.class, iterator::next);
}
iterator.close();
assertEquals(Collections.emptySet(), allocatedBuffers);
}

View File

@ -71,6 +71,8 @@ final public class SnapshotWriterReaderTest {
// Verify that an empty snapshot has only the Header and Footer
try (SnapshotReader<String> reader = readSnapshot(context, id, Integer.MAX_VALUE)) {
assertEquals(magicTimestamp, reader.lastContainedLogTimestamp());
RawSnapshotReader snapshot = context.log.readSnapshot(id).get();
int recordCount = validateDelimiters(snapshot, magicTimestamp);
assertEquals((recordsPerBatch * batches) + delimiterCount, recordCount);
@ -218,7 +220,7 @@ final public class SnapshotWriterReaderTest {
Record record = records.next();
countRecords += 1;
SnapshotHeaderRecord headerRecord = ControlRecordUtils.deserializedSnapshotHeaderRecord(record);
SnapshotHeaderRecord headerRecord = ControlRecordUtils.deserializeSnapshotHeaderRecord(record);
assertEquals(headerRecord.version(), ControlRecordUtils.SNAPSHOT_HEADER_CURRENT_VERSION);
assertEquals(headerRecord.lastContainedLogTimestamp(), lastContainedLogTime);
@ -238,7 +240,7 @@ final public class SnapshotWriterReaderTest {
// Verify existence of the footer record in the end
assertTrue(batch.isControlBatch());
SnapshotFooterRecord footerRecord = ControlRecordUtils.deserializedSnapshotFooterRecord(record);
SnapshotFooterRecord footerRecord = ControlRecordUtils.deserializeSnapshotFooterRecord(record);
assertEquals(footerRecord.version(), ControlRecordUtils.SNAPSHOT_FOOTER_CURRENT_VERSION);
return countRecords;