KAFKA-16915; LeaderChangeMessage supports directory id (#16668)

Extend LeaderChangeMessage schema to support version 1 of the message. The leader will continue to write version 0 of the schema. This is needed so that in the future the leader can write version 1 of the message and be guaranteed that all of the replicas in the cluster support version 1 of the schema.

Reviewers: José Armando García Sancio <jsancio@apache.org>
This commit is contained in:
Alyssa Huang 2024-07-28 08:12:42 -07:00 committed by GitHub
parent da32dcab2c
commit da8fe6355b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 17 additions and 14 deletions

View File

@ -16,7 +16,8 @@
{
"type": "data",
"name": "LeaderChangeMessage",
"validVersions": "0",
// Version 1 adds VoterDirectoryId (KIP-853)
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{"name": "Version", "type": "int16", "versions": "0+",
@ -30,7 +31,9 @@
],
"commonStructs": [
{ "name": "Voter", "versions": "0+", "fields": [
{"name": "VoterId", "type": "int32", "versions": "0+"}
{"name": "VoterId", "type": "int32", "versions": "0+"},
{"name": "VoterDirectoryId", "type": "uuid", "versions": "1+",
"about": "The directory id of the voter"}
]}
]
}

View File

@ -17,11 +17,10 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.message.VotersRecord.Voter;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
@ -29,6 +28,7 @@ import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -40,7 +40,7 @@ public class ControlRecordUtilsTest {
// If any of these asserts fail, please make sure that Kafka supports reading and
// writing the latest version for these records.
assertEquals(
LeaderChangeMessage.HIGHEST_SUPPORTED_VERSION,
(short) 0,
ControlRecordUtils.LEADER_CHANGE_CURRENT_VERSION
);
assertEquals(
@ -68,22 +68,22 @@ public class ControlRecordUtilsTest {
() -> testDeserializeRecord(ControlRecordType.COMMIT)
);
assertEquals(
"Expected LEADER_CHANGE control record type(2), but found COMMIT",
"Expected KRAFT_VOTERS control record type(6), but found COMMIT",
thrown.getMessage()
);
}
@Test
public void testDeserializeByteData() {
testDeserializeRecord(ControlRecordType.LEADER_CHANGE);
testDeserializeRecord(ControlRecordType.KRAFT_VOTERS);
}
private void testDeserializeRecord(ControlRecordType controlRecordType) {
final int leaderId = 1;
final int voterId = 2;
LeaderChangeMessage data = new LeaderChangeMessage()
.setLeaderId(leaderId)
.setVoters(Collections.singletonList(new Voter().setVoterId(voterId)));
final int voterId = 0;
final List<Voter> voters = Collections.singletonList(
new Voter().setVoterId(voterId)
);
VotersRecord data = new VotersRecord().setVoters(voters);
ByteBuffer valueBuffer = ByteBuffer.allocate(256);
data.write(new ByteBufferAccessor(valueBuffer), new ObjectSerializationCache(), data.highestSupportedVersion());
@ -95,9 +95,9 @@ public class ControlRecordUtilsTest {
256, (byte) 0, 0, 0L, 0, ByteBuffer.wrap(keyData), valueBuffer, null
);
LeaderChangeMessage deserializedData = ControlRecordUtils.deserializeLeaderChangeMessage(record);
VotersRecord deserializedData = ControlRecordUtils.deserializeVotersRecord(record);
assertEquals(leaderId, deserializedData.leaderId());
assertEquals(voters, deserializedData.voters());
assertEquals(Collections.singletonList(
new Voter().setVoterId(voterId)), deserializedData.voters());
}