KAFKA-19068 Eliminate the duplicate type check in creating ControlRecord (#19346)
CI / build (push) Waiting to run Details

jira: https://issues.apache.org/jira/browse/KAFKA-19068

`RecordsIterator#decodeControlRecord` do the type check and then
`ControlRecord` constructor does that again.

we should add a static method to ControlRecord to create `ControlRecord`
with type check, and then `ControlRecord` constructor should be changed
to private to ensure all instance is created by the static method.

Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
This commit is contained in:
Nick Guo 2025-05-11 00:07:00 +08:00 committed by GitHub
parent 61cb33f347
commit 707a44a6cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 48 additions and 80 deletions

View File

@ -26,7 +26,6 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
@ -253,7 +252,7 @@ public class MetadataLoaderTest {
100,
4000,
10,
List.of(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
List.of(ControlRecord.of(new SnapshotHeaderRecord()))
),
Batch.data(0, 0, 0, 0,
List.of(new ApiMessageAndVersion(new FeatureLevelRecord().
@ -386,7 +385,7 @@ public class MetadataLoaderTest {
100,
4000,
10,
List.of(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
List.of(ControlRecord.of(new SnapshotHeaderRecord()))
)
)
);
@ -485,7 +484,7 @@ public class MetadataLoaderTest {
100,
4000,
10,
List.of(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
List.of(ControlRecord.of(new SnapshotHeaderRecord()))
)
)
).setTime(time);

View File

@ -23,54 +23,49 @@ import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.ControlRecordUtils;
import java.nio.ByteBuffer;
import java.util.Objects;
public final class ControlRecord {
private final ControlRecordType recordType;
private final ApiMessage message;
private static void throwIllegalArgument(ControlRecordType recordType, ApiMessage message) {
throw new IllegalArgumentException(
String.format(
"Record type %s doesn't match message class %s",
recordType,
message.getClass()
)
);
public static ControlRecord of(ByteBuffer key, ByteBuffer value) {
ControlRecordType recordType = ControlRecordType.parse(key);
final ApiMessage message = switch (recordType) {
case LEADER_CHANGE -> ControlRecordUtils.deserializeLeaderChangeMessage(value);
case SNAPSHOT_HEADER -> ControlRecordUtils.deserializeSnapshotHeaderRecord(value);
case SNAPSHOT_FOOTER -> ControlRecordUtils.deserializeSnapshotFooterRecord(value);
case KRAFT_VERSION -> ControlRecordUtils.deserializeKRaftVersionRecord(value);
case KRAFT_VOTERS -> ControlRecordUtils.deserializeVotersRecord(value);
default -> throw new IllegalArgumentException(String.format("Unknown control record type %s", recordType));
};
return new ControlRecord(recordType, message);
}
public ControlRecord(ControlRecordType recordType, ApiMessage message) {
switch (recordType) {
case LEADER_CHANGE:
if (!(message instanceof LeaderChangeMessage)) {
throwIllegalArgument(recordType, message);
}
break;
case SNAPSHOT_HEADER:
if (!(message instanceof SnapshotHeaderRecord)) {
throwIllegalArgument(recordType, message);
}
break;
case SNAPSHOT_FOOTER:
if (!(message instanceof SnapshotFooterRecord)) {
throwIllegalArgument(recordType, message);
}
break;
case KRAFT_VERSION:
if (!(message instanceof KRaftVersionRecord)) {
throwIllegalArgument(recordType, message);
}
break;
case KRAFT_VOTERS:
if (!(message instanceof VotersRecord)) {
throwIllegalArgument(recordType, message);
}
break;
default:
throw new IllegalArgumentException(String.format("Unknown control record type %s", recordType));
//this is for test only
public static ControlRecord of(ApiMessage message) {
ControlRecordType recordType;
if (message instanceof LeaderChangeMessage) {
recordType = ControlRecordType.LEADER_CHANGE;
} else if (message instanceof SnapshotHeaderRecord) {
recordType = ControlRecordType.SNAPSHOT_HEADER;
} else if (message instanceof SnapshotFooterRecord) {
recordType = ControlRecordType.SNAPSHOT_FOOTER;
} else if (message instanceof KRaftVersionRecord) {
recordType = ControlRecordType.KRAFT_VERSION;
} else if (message instanceof VotersRecord) {
recordType = ControlRecordType.KRAFT_VOTERS;
} else {
throw new IllegalArgumentException(String.format("Unknown control record type %s", message.getClass()));
}
return new ControlRecord(recordType, message);
}
private ControlRecord(ControlRecordType recordType, ApiMessage message) {
this.recordType = recordType;
this.message = message;
}

View File

@ -16,10 +16,7 @@
*/
package org.apache.kafka.raft.internals;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
@ -370,17 +367,6 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
throw new IllegalArgumentException("Got an unexpected empty value in the record");
}
ControlRecordType type = ControlRecordType.parse(key.get());
final ApiMessage message = switch (type) {
case LEADER_CHANGE -> ControlRecordUtils.deserializeLeaderChangeMessage(value.get());
case SNAPSHOT_HEADER -> ControlRecordUtils.deserializeSnapshotHeaderRecord(value.get());
case SNAPSHOT_FOOTER -> ControlRecordUtils.deserializeSnapshotFooterRecord(value.get());
case KRAFT_VERSION -> ControlRecordUtils.deserializeKRaftVersionRecord(value.get());
case KRAFT_VOTERS -> ControlRecordUtils.deserializeVotersRecord(value.get());
default -> throw new IllegalArgumentException(String.format("Unknown control record type %s", type));
};
return new ControlRecord(type, message);
return ControlRecord.of(key.get(), value.get());
}
}

View File

@ -34,24 +34,16 @@ public final class ControlRecordTest {
@Test
void testCtr() {
// Valid constructions
new ControlRecord(ControlRecordType.LEADER_CHANGE, new LeaderChangeMessage());
new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord());
new ControlRecord(ControlRecordType.SNAPSHOT_FOOTER, new SnapshotFooterRecord());
new ControlRecord(ControlRecordType.KRAFT_VERSION, new KRaftVersionRecord());
new ControlRecord(ControlRecordType.KRAFT_VOTERS, new VotersRecord());
ControlRecord.of(new LeaderChangeMessage());
ControlRecord.of(new SnapshotHeaderRecord());
ControlRecord.of(new SnapshotFooterRecord());
ControlRecord.of(new KRaftVersionRecord());
ControlRecord.of(new VotersRecord());
// Invalid constructions
assertThrows(
IllegalArgumentException.class,
() -> new ControlRecord(ControlRecordType.ABORT, new SnapshotFooterRecord())
);
assertThrows(
IllegalArgumentException.class,
() -> new ControlRecord(ControlRecordType.LEADER_CHANGE, new SnapshotHeaderRecord())
);
assertThrows(
IllegalArgumentException.class,
() -> new ControlRecord(ControlRecordType.SNAPSHOT_FOOTER, Mockito.mock(ApiMessage.class))
() -> ControlRecord.of(Mockito.mock(ApiMessage.class))
);
}

View File

@ -95,26 +95,22 @@ public class KafkaRaftClientReconfigTest {
List<List<ControlRecord>> expectedBootstrapRecords = List.of(
List.of(
new ControlRecord(
ControlRecordType.SNAPSHOT_HEADER,
ControlRecord.of(
new SnapshotHeaderRecord()
.setVersion((short) 0)
.setLastContainedLogTimestamp(0)
),
new ControlRecord(
ControlRecordType.KRAFT_VERSION,
ControlRecord.of(
new KRaftVersionRecord()
.setVersion(ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION)
.setKRaftVersion((short) 1)
),
new ControlRecord(
ControlRecordType.KRAFT_VOTERS,
ControlRecord.of(
voters.toVotersRecord(ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION)
)
),
List.of(
new ControlRecord(
ControlRecordType.SNAPSHOT_FOOTER,
ControlRecord.of(
new SnapshotFooterRecord()
.setVersion((short) 0)
)

View File

@ -80,7 +80,7 @@ class RecordsBatchReaderTest {
public void testLeaderChangeControlBatch() {
// Confirm that the RecordsBatchReader is able to iterate over control batches
MemoryRecords records = RecordsIteratorTest.buildControlRecords(ControlRecordType.LEADER_CHANGE);
ControlRecord expectedRecord = new ControlRecord(ControlRecordType.LEADER_CHANGE, new LeaderChangeMessage());
ControlRecord expectedRecord = ControlRecord.of(new LeaderChangeMessage());
try (RecordsBatchReader<String> reader = RecordsBatchReader.of(
0,

View File

@ -278,7 +278,7 @@ public final class RecordsIteratorTest {
try (RecordsIterator<String> iterator = createIterator(records, BufferSupplier.NO_CACHING, true)) {
assertTrue(iterator.hasNext());
assertEquals(
List.of(new ControlRecord(type, expectedMessage)),
List.of(ControlRecord.of(expectedMessage)),
iterator.next().controlRecords()
);
assertFalse(iterator.hasNext());