KAFKA-13845: Add support for reading KRaft snapshots in kafka-dump-log (#12084)

The kafka-dump-log command should accept files with a suffix of ".checkpoint". It should also decode and print using JSON the snapshot header and footer control records.

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
This commit is contained in:
dengziming 2022-06-02 05:49:00 +08:00 committed by GitHub
parent 7d1b0926fa
commit 1d6e3d6cb3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 106 additions and 21 deletions

View File

@ -28,9 +28,9 @@ import java.nio.ByteBuffer;
*/
public class ControlRecordUtils {
public static final short LEADER_CHANGE_SCHEMA_HIGHEST_VERSION = new LeaderChangeMessage().highestSupportedVersion();
public static final short SNAPSHOT_HEADER_HIGHEST_VERSION = new SnapshotHeaderRecord().highestSupportedVersion();
public static final short SNAPSHOT_FOOTER_HIGHEST_VERSION = new SnapshotFooterRecord().highestSupportedVersion();
public static final short LEADER_CHANGE_SCHEMA_HIGHEST_VERSION = LeaderChangeMessage.HIGHEST_SUPPORTED_VERSION;
public static final short SNAPSHOT_HEADER_HIGHEST_VERSION = SnapshotHeaderRecord.HIGHEST_SUPPORTED_VERSION;
public static final short SNAPSHOT_FOOTER_HIGHEST_VERSION = SnapshotFooterRecord.HIGHEST_SUPPORTED_VERSION;
public static LeaderChangeMessage deserializeLeaderChangeMessage(Record record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());

View File

@ -18,7 +18,6 @@
package kafka.tools
import java.io._
import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, ObjectNode, TextNode}
import kafka.coordinator.group.GroupMetadataManager
import kafka.coordinator.transaction.TransactionLog
@ -26,11 +25,13 @@ import kafka.log._
import kafka.serializer.Decoder
import kafka.utils._
import kafka.utils.Implicits._
import org.apache.kafka.common.message.{SnapshotFooterRecordJsonConverter, SnapshotHeaderRecordJsonConverter}
import org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordType}
import org.apache.kafka.common.protocol.ByteBufferAccessor
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.snapshot.Snapshots
import scala.jdk.CollectionConverters._
import scala.collection.mutable
@ -57,7 +58,7 @@ object DumpLogSegments {
val filename = file.getName
val suffix = filename.substring(filename.lastIndexOf("."))
suffix match {
case UnifiedLog.LogFileSuffix =>
case UnifiedLog.LogFileSuffix | Snapshots.SUFFIX =>
dumpLog(file, opts.shouldPrintDataLog, nonConsecutivePairsForLogFilesMap, opts.isDeepIteration,
opts.messageParser, opts.skipRecordMetadata, opts.maxBytes)
case UnifiedLog.IndexFileSuffix =>
@ -248,8 +249,13 @@ object DumpLogSegments {
parser: MessageParser[_, _],
skipRecordMetadata: Boolean,
maxBytes: Int): Unit = {
val startOffset = file.getName.split("\\.")(0).toLong
println("Starting offset: " + startOffset)
if (file.getName.endsWith(UnifiedLog.LogFileSuffix)) {
val startOffset = file.getName.split("\\.")(0).toLong
println(s"Log starting offset: $startOffset")
} else if (file.getName.endsWith(Snapshots.SUFFIX)) {
val path = Snapshots.parse(file.toPath).get()
println(s"Snapshot end offset: ${path.snapshotId.offset}, epoch: ${path.snapshotId.epoch}")
}
val fileRecords = FileRecords.open(file, false).slice(0, maxBytes)
try {
var validBytes = 0L
@ -288,6 +294,12 @@ object DumpLogSegments {
case ControlRecordType.ABORT | ControlRecordType.COMMIT =>
val endTxnMarker = EndTransactionMarker.deserialize(record)
print(s" endTxnMarker: ${endTxnMarker.controlType} coordinatorEpoch: ${endTxnMarker.coordinatorEpoch}")
case ControlRecordType.SNAPSHOT_HEADER =>
val header = ControlRecordUtils.deserializedSnapshotHeaderRecord(record)
print(s" SnapshotHeader ${SnapshotHeaderRecordJsonConverter.write(header, header.version())}")
case ControlRecordType.SNAPSHOT_FOOTER =>
val footer = ControlRecordUtils.deserializedSnapshotFooterRecord(record)
print(s" SnapshotFooter ${SnapshotFooterRecordJsonConverter.write(footer, footer.version())}")
case controlType =>
print(s" controlType: $controlType($controlTypeId)")
}

View File

@ -22,17 +22,21 @@ import java.nio.ByteBuffer
import java.util
import java.util.Properties
import kafka.log.{AppendOrigin, UnifiedLog, LogConfig, LogManager, LogTestUtils}
import kafka.server.{BrokerTopicStats, FetchLogEnd, LogDirFailureChannel}
import kafka.log.{AppendOrigin, Defaults, LogConfig, LogManager, LogTestUtils, UnifiedLog}
import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
import kafka.server.{BrokerTopicStats, FetchLogEnd, KafkaRaftServer, LogDirFailureChannel}
import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRecord, TopicRecord}
import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache}
import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, MemoryRecords, RecordVersion, SimpleRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.snapshot.RecordsSnapshotWriter
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -49,6 +53,7 @@ class DumpLogSegmentsTest {
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
val segmentName = "00000000000000000000"
val logFilePath = s"$logDir/$segmentName.log"
val snapshotPath = s"$logDir/00000000000000000000-0000000000.checkpoint"
val indexFilePath = s"$logDir/$segmentName.index"
val timeIndexFilePath = s"$logDir/$segmentName.timeindex"
val time = new MockTime(0, 0)
@ -256,13 +261,14 @@ class DumpLogSegmentsTest {
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, records:_*), leaderEpoch = 1)
log.flush(false)
var output = runDumpLogSegments(Array("--cluster-metadata-decoder", "false", "--files", logFilePath))
assert(output.contains("TOPIC_RECORD"))
assert(output.contains("BROKER_RECORD"))
var output = runDumpLogSegments(Array("--cluster-metadata-decoder", "--files", logFilePath))
assertTrue(output.contains("Log starting offset: 0"))
assertTrue(output.contains("TOPIC_RECORD"))
assertTrue(output.contains("BROKER_RECORD"))
output = runDumpLogSegments(Array("--cluster-metadata-decoder", "--skip-record-metadata", "false", "--files", logFilePath))
assert(output.contains("TOPIC_RECORD"))
assert(output.contains("BROKER_RECORD"))
output = runDumpLogSegments(Array("--cluster-metadata-decoder", "--skip-record-metadata", "--files", logFilePath))
assertTrue(output.contains("TOPIC_RECORD"))
assertTrue(output.contains("BROKER_RECORD"))
// Bogus metadata record
val buf = ByteBuffer.allocate(4)
@ -272,10 +278,77 @@ class DumpLogSegmentsTest {
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(null, buf.array)), leaderEpoch = 2)
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, records:_*), leaderEpoch = 2)
output = runDumpLogSegments(Array("--cluster-metadata-decoder", "--skip-record-metadata", "false", "--files", logFilePath))
assert(output.contains("TOPIC_RECORD"))
assert(output.contains("BROKER_RECORD"))
assert(output.contains("skipping"))
output = runDumpLogSegments(Array("--cluster-metadata-decoder", "--skip-record-metadata", "--files", logFilePath))
assertTrue(output.contains("TOPIC_RECORD"))
assertTrue(output.contains("BROKER_RECORD"))
assertTrue(output.contains("skipping"))
}
@Test
def testDumpMetadataSnapshot(): Unit = {
val metadataRecords = Seq(
new ApiMessageAndVersion(
new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(10), 0.toShort),
new ApiMessageAndVersion(
new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(20), 0.toShort),
new ApiMessageAndVersion(
new TopicRecord().setName("test-topic").setTopicId(Uuid.randomUuid()), 0.toShort),
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(Uuid.randomUuid()).setLeader(1).
setPartitionId(0).setIsr(util.Arrays.asList(0, 1, 2)), 0.toShort)
)
val metadataLog = KafkaMetadataLog(
KafkaRaftServer.MetadataPartition,
KafkaRaftServer.MetadataTopicId,
logDir,
time,
time.scheduler,
MetadataLogConfig(
logSegmentBytes = 100 * 1024,
logSegmentMinBytes = 100 * 1024,
logSegmentMillis = 10 * 1000,
retentionMaxBytes = 100 * 1024,
retentionMillis = 60 * 1000,
maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
fileDeleteDelayMs = Defaults.FileDeleteDelayMs,
nodeId = 1
)
)
val lastContainedLogTimestamp = 10000
TestUtils.resource(
RecordsSnapshotWriter.createWithHeader(
() => metadataLog.createNewSnapshot(new OffsetAndEpoch(0, 0)),
1024,
MemoryPool.NONE,
new MockTime,
lastContainedLogTimestamp,
CompressionType.NONE,
new MetadataRecordSerde
).get()
) { snapshotWriter =>
snapshotWriter.append(metadataRecords.asJava)
snapshotWriter.freeze()
}
var output = runDumpLogSegments(Array("--cluster-metadata-decoder", "--files", snapshotPath))
assertTrue(output.contains("Snapshot end offset: 0, epoch: 0"))
assertTrue(output.contains("TOPIC_RECORD"))
assertTrue(output.contains("BROKER_RECORD"))
assertTrue(output.contains("SnapshotHeader"))
assertTrue(output.contains("SnapshotFooter"))
assertTrue(output.contains(s""""lastContainedLogTimestamp":$lastContainedLogTimestamp"""))
output = runDumpLogSegments(Array("--cluster-metadata-decoder", "--skip-record-metadata", "--files", snapshotPath))
assertTrue(output.contains("Snapshot end offset: 0, epoch: 0"))
assertTrue(output.contains("TOPIC_RECORD"))
assertTrue(output.contains("BROKER_RECORD"))
assertFalse(output.contains("SnapshotHeader"))
assertFalse(output.contains("SnapshotFooter"))
assertFalse(output.contains(s""""lastContainedLogTimestamp": $lastContainedLogTimestamp"""))
}
@Test

View File

@ -30,7 +30,7 @@ import java.util.Optional;
public final class Snapshots {
private static final Logger log = LoggerFactory.getLogger(Snapshots.class);
private static final String SUFFIX = ".checkpoint";
public static final String SUFFIX = ".checkpoint";
private static final String PARTIAL_SUFFIX = String.format("%s.part", SUFFIX);
private static final String DELETE_SUFFIX = String.format("%s.deleted", SUFFIX);

View File

@ -238,7 +238,7 @@ final public class SnapshotWriterReaderTest {
assertTrue(batch.isControlBatch());
SnapshotFooterRecord footerRecord = ControlRecordUtils.deserializedSnapshotFooterRecord(record);
assertEquals(footerRecord.version(), ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION);
assertEquals(footerRecord.version(), ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
return countRecords;
}