From 707a44a6cb700e4c029a8a386dc1edde1c5c7253 Mon Sep 17 00:00:00 2001 From: Nick Guo Date: Sun, 11 May 2025 00:07:00 +0800 Subject: [PATCH] KAFKA-19068 Eliminate the duplicate type check in creating ControlRecord (#19346) jira: https://issues.apache.org/jira/browse/KAFKA-19068 `RecordsIterator#decodeControlRecord` do the type check and then `ControlRecord` constructor does that again. we should add a static method to ControlRecord to create `ControlRecord` with type check, and then `ControlRecord` constructor should be changed to private to ensure all instance is created by the static method. Reviewers: PoAn Yang , Chia-Ping Tsai --- .../image/loader/MetadataLoaderTest.java | 7 +- .../org/apache/kafka/raft/ControlRecord.java | 69 +++++++++---------- .../kafka/raft/internals/RecordsIterator.java | 16 +---- .../apache/kafka/raft/ControlRecordTest.java | 20 ++---- .../raft/KafkaRaftClientReconfigTest.java | 12 ++-- .../internals/RecordsBatchReaderTest.java | 2 +- .../raft/internals/RecordsIteratorTest.java | 2 +- 7 files changed, 48 insertions(+), 80 deletions(-) diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java index beaeefdc38c..31960252bd8 100644 --- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; import org.apache.kafka.common.metadata.TopicRecord; -import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; @@ -253,7 +252,7 @@ public class MetadataLoaderTest { 100, 4000, 10, - List.of(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord())) + List.of(ControlRecord.of(new SnapshotHeaderRecord())) ), Batch.data(0, 0, 0, 0, List.of(new ApiMessageAndVersion(new FeatureLevelRecord(). @@ -386,7 +385,7 @@ public class MetadataLoaderTest { 100, 4000, 10, - List.of(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord())) + List.of(ControlRecord.of(new SnapshotHeaderRecord())) ) ) ); @@ -485,7 +484,7 @@ public class MetadataLoaderTest { 100, 4000, 10, - List.of(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord())) + List.of(ControlRecord.of(new SnapshotHeaderRecord())) ) ) ).setTime(time); diff --git a/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java b/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java index 9588f3c1138..16a0a95ffa7 100644 --- a/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java +++ b/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java @@ -23,54 +23,49 @@ import org.apache.kafka.common.message.SnapshotHeaderRecord; import org.apache.kafka.common.message.VotersRecord; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.ControlRecordUtils; +import java.nio.ByteBuffer; import java.util.Objects; + public final class ControlRecord { private final ControlRecordType recordType; private final ApiMessage message; - private static void throwIllegalArgument(ControlRecordType recordType, ApiMessage message) { - throw new IllegalArgumentException( - String.format( - "Record type %s doesn't match message class %s", - recordType, - message.getClass() - ) - ); + public static ControlRecord of(ByteBuffer key, ByteBuffer value) { + ControlRecordType recordType = ControlRecordType.parse(key); + final ApiMessage message = switch (recordType) { + case LEADER_CHANGE -> ControlRecordUtils.deserializeLeaderChangeMessage(value); + case SNAPSHOT_HEADER -> ControlRecordUtils.deserializeSnapshotHeaderRecord(value); + case SNAPSHOT_FOOTER -> ControlRecordUtils.deserializeSnapshotFooterRecord(value); + case KRAFT_VERSION -> ControlRecordUtils.deserializeKRaftVersionRecord(value); + case KRAFT_VOTERS -> ControlRecordUtils.deserializeVotersRecord(value); + default -> throw new IllegalArgumentException(String.format("Unknown control record type %s", recordType)); + }; + return new ControlRecord(recordType, message); } - public ControlRecord(ControlRecordType recordType, ApiMessage message) { - switch (recordType) { - case LEADER_CHANGE: - if (!(message instanceof LeaderChangeMessage)) { - throwIllegalArgument(recordType, message); - } - break; - case SNAPSHOT_HEADER: - if (!(message instanceof SnapshotHeaderRecord)) { - throwIllegalArgument(recordType, message); - } - break; - case SNAPSHOT_FOOTER: - if (!(message instanceof SnapshotFooterRecord)) { - throwIllegalArgument(recordType, message); - } - break; - case KRAFT_VERSION: - if (!(message instanceof KRaftVersionRecord)) { - throwIllegalArgument(recordType, message); - } - break; - case KRAFT_VOTERS: - if (!(message instanceof VotersRecord)) { - throwIllegalArgument(recordType, message); - } - break; - default: - throw new IllegalArgumentException(String.format("Unknown control record type %s", recordType)); + //this is for test only + public static ControlRecord of(ApiMessage message) { + ControlRecordType recordType; + if (message instanceof LeaderChangeMessage) { + recordType = ControlRecordType.LEADER_CHANGE; + } else if (message instanceof SnapshotHeaderRecord) { + recordType = ControlRecordType.SNAPSHOT_HEADER; + } else if (message instanceof SnapshotFooterRecord) { + recordType = ControlRecordType.SNAPSHOT_FOOTER; + } else if (message instanceof KRaftVersionRecord) { + recordType = ControlRecordType.KRAFT_VERSION; + } else if (message instanceof VotersRecord) { + recordType = ControlRecordType.KRAFT_VOTERS; + } else { + throw new IllegalArgumentException(String.format("Unknown control record type %s", message.getClass())); } + return new ControlRecord(recordType, message); + } + private ControlRecord(ControlRecordType recordType, ApiMessage message) { this.recordType = recordType; this.message = message; } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java index b912d748bca..b1e711bf5e4 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java @@ -16,10 +16,7 @@ */ package org.apache.kafka.raft.internals; -import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.ByteBufferAccessor; -import org.apache.kafka.common.record.ControlRecordType; -import org.apache.kafka.common.record.ControlRecordUtils; import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; @@ -370,17 +367,6 @@ public final class RecordsIterator implements Iterator>, AutoCloseab throw new IllegalArgumentException("Got an unexpected empty value in the record"); } - ControlRecordType type = ControlRecordType.parse(key.get()); - - final ApiMessage message = switch (type) { - case LEADER_CHANGE -> ControlRecordUtils.deserializeLeaderChangeMessage(value.get()); - case SNAPSHOT_HEADER -> ControlRecordUtils.deserializeSnapshotHeaderRecord(value.get()); - case SNAPSHOT_FOOTER -> ControlRecordUtils.deserializeSnapshotFooterRecord(value.get()); - case KRAFT_VERSION -> ControlRecordUtils.deserializeKRaftVersionRecord(value.get()); - case KRAFT_VOTERS -> ControlRecordUtils.deserializeVotersRecord(value.get()); - default -> throw new IllegalArgumentException(String.format("Unknown control record type %s", type)); - }; - - return new ControlRecord(type, message); + return ControlRecord.of(key.get(), value.get()); } } diff --git a/raft/src/test/java/org/apache/kafka/raft/ControlRecordTest.java b/raft/src/test/java/org/apache/kafka/raft/ControlRecordTest.java index fed7bf21664..b6416c69be7 100644 --- a/raft/src/test/java/org/apache/kafka/raft/ControlRecordTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/ControlRecordTest.java @@ -34,24 +34,16 @@ public final class ControlRecordTest { @Test void testCtr() { // Valid constructions - new ControlRecord(ControlRecordType.LEADER_CHANGE, new LeaderChangeMessage()); - new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()); - new ControlRecord(ControlRecordType.SNAPSHOT_FOOTER, new SnapshotFooterRecord()); - new ControlRecord(ControlRecordType.KRAFT_VERSION, new KRaftVersionRecord()); - new ControlRecord(ControlRecordType.KRAFT_VOTERS, new VotersRecord()); + ControlRecord.of(new LeaderChangeMessage()); + ControlRecord.of(new SnapshotHeaderRecord()); + ControlRecord.of(new SnapshotFooterRecord()); + ControlRecord.of(new KRaftVersionRecord()); + ControlRecord.of(new VotersRecord()); // Invalid constructions assertThrows( IllegalArgumentException.class, - () -> new ControlRecord(ControlRecordType.ABORT, new SnapshotFooterRecord()) - ); - assertThrows( - IllegalArgumentException.class, - () -> new ControlRecord(ControlRecordType.LEADER_CHANGE, new SnapshotHeaderRecord()) - ); - assertThrows( - IllegalArgumentException.class, - () -> new ControlRecord(ControlRecordType.SNAPSHOT_FOOTER, Mockito.mock(ApiMessage.class)) + () -> ControlRecord.of(Mockito.mock(ApiMessage.class)) ); } diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java index cdc351d3181..b000fbbcd59 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java @@ -95,26 +95,22 @@ public class KafkaRaftClientReconfigTest { List> expectedBootstrapRecords = List.of( List.of( - new ControlRecord( - ControlRecordType.SNAPSHOT_HEADER, + ControlRecord.of( new SnapshotHeaderRecord() .setVersion((short) 0) .setLastContainedLogTimestamp(0) ), - new ControlRecord( - ControlRecordType.KRAFT_VERSION, + ControlRecord.of( new KRaftVersionRecord() .setVersion(ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION) .setKRaftVersion((short) 1) ), - new ControlRecord( - ControlRecordType.KRAFT_VOTERS, + ControlRecord.of( voters.toVotersRecord(ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION) ) ), List.of( - new ControlRecord( - ControlRecordType.SNAPSHOT_FOOTER, + ControlRecord.of( new SnapshotFooterRecord() .setVersion((short) 0) ) diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java index a32575e93b2..ee59d918dc3 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java @@ -80,7 +80,7 @@ class RecordsBatchReaderTest { public void testLeaderChangeControlBatch() { // Confirm that the RecordsBatchReader is able to iterate over control batches MemoryRecords records = RecordsIteratorTest.buildControlRecords(ControlRecordType.LEADER_CHANGE); - ControlRecord expectedRecord = new ControlRecord(ControlRecordType.LEADER_CHANGE, new LeaderChangeMessage()); + ControlRecord expectedRecord = ControlRecord.of(new LeaderChangeMessage()); try (RecordsBatchReader reader = RecordsBatchReader.of( 0, diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java index 8016a1e5381..0d0ce4f127c 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java @@ -278,7 +278,7 @@ public final class RecordsIteratorTest { try (RecordsIterator iterator = createIterator(records, BufferSupplier.NO_CACHING, true)) { assertTrue(iterator.hasNext()); assertEquals( - List.of(new ControlRecord(type, expectedMessage)), + List.of(ControlRecord.of(expectedMessage)), iterator.next().controlRecords() ); assertFalse(iterator.hasNext());