KAFKA-18308; Update CoordinatorSerde (#18455)

This patch updates the GroupCoordinatorSerde and the ShareGroupCoordinatorSerde to leverage the CoordinatorRecordType to deserialize records. With this, newly added record are automatically picked up. In other words, the serdes work with all defined records without doing anything.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
David Jacot 2025-01-10 11:17:30 +01:00 committed by GitHub
parent 2b7c039971
commit 87334e6c2e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 90 additions and 195 deletions

View File

@ -43,7 +43,7 @@ public abstract class CoordinatorRecordSerde implements Serializer<CoordinatorRe
@Override @Override
public byte[] serializeKey(CoordinatorRecord record) { public byte[] serializeKey(CoordinatorRecord record) {
// Record does not accept a null key. // Record does not accept a null key.
return MessageUtil.toVersionPrefixedBytes( return MessageUtil.toCoordinatorTypePrefixedBytes(
record.key().version(), record.key().version(),
record.key().message() record.key().message()
); );
@ -106,10 +106,10 @@ public abstract class CoordinatorRecordSerde implements Serializer<CoordinatorRe
* Concrete child class must provide implementation which returns appropriate * Concrete child class must provide implementation which returns appropriate
* type of {@link ApiMessage} objects representing the key. * type of {@link ApiMessage} objects representing the key.
* *
* @param recordVersion - short representing version * @param recordType - short representing type
* @return ApiMessage object * @return ApiMessage object
*/ */
protected abstract ApiMessage apiMessageKeyFor(short recordVersion); protected abstract ApiMessage apiMessageKeyFor(short recordType);
/** /**
* Concrete child class must provide implementation which returns appropriate * Concrete child class must provide implementation which returns appropriate

View File

@ -97,6 +97,10 @@ public class CoordinatorRecordTypeGenerator implements TypeClassGenerator {
buffer.printf("%n"); buffer.printf("%n");
generateFromApiKey(); generateFromApiKey();
buffer.printf("%n"); buffer.printf("%n");
generateNewRecordKey();
buffer.printf("%n");
generateNewRecordValue();
buffer.printf("%n");
generateAccessor("id", "short"); generateAccessor("id", "short");
buffer.printf("%n"); buffer.printf("%n");
generateAccessor("lowestSupportedVersion", "short"); generateAccessor("lowestSupportedVersion", "short");
@ -171,7 +175,57 @@ public class CoordinatorRecordTypeGenerator implements TypeClassGenerator {
buffer.printf("default:%n"); buffer.printf("default:%n");
buffer.incrementIndent(); buffer.incrementIndent();
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS); headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
buffer.printf("throw new UnsupportedVersionException(\"Unknown metadata id \"" + buffer.printf("throw new UnsupportedVersionException(\"Unknown record id \"" +
" + id);%n");
buffer.decrementIndent();
buffer.decrementIndent();
buffer.printf("}%n");
buffer.decrementIndent();
buffer.printf("}%n");
}
private void generateNewRecordKey() {
headerGenerator.addImport(MessageGenerator.API_MESSAGE_CLASS);
buffer.printf("public ApiMessage newRecordKey() {%n");
buffer.incrementIndent();
buffer.printf("switch (id) {%n");
buffer.incrementIndent();
for (Map.Entry<Short, CoordinatorRecord> entry : records.entrySet()) {
buffer.printf("case %d:%n", entry.getKey());
buffer.incrementIndent();
buffer.printf("return new %s();%n",
MessageGenerator.capitalizeFirst(entry.getValue().key.name()));
buffer.decrementIndent();
}
buffer.printf("default:%n");
buffer.incrementIndent();
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
buffer.printf("throw new UnsupportedVersionException(\"Unknown record id \"" +
" + id);%n");
buffer.decrementIndent();
buffer.decrementIndent();
buffer.printf("}%n");
buffer.decrementIndent();
buffer.printf("}%n");
}
private void generateNewRecordValue() {
headerGenerator.addImport(MessageGenerator.API_MESSAGE_CLASS);
buffer.printf("public ApiMessage newRecordValue() {%n");
buffer.incrementIndent();
buffer.printf("switch (id) {%n");
buffer.incrementIndent();
for (Map.Entry<Short, CoordinatorRecord> entry : records.entrySet()) {
buffer.printf("case %d:%n", entry.getKey());
buffer.incrementIndent();
buffer.printf("return new %s();%n",
MessageGenerator.capitalizeFirst(entry.getValue().value.name()));
buffer.decrementIndent();
}
buffer.printf("default:%n");
buffer.incrementIndent();
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
buffer.printf("throw new UnsupportedVersionException(\"Unknown record id \"" +
" + id);%n"); " + id);%n");
buffer.decrementIndent(); buffer.decrementIndent();
buffer.decrementIndent(); buffer.decrementIndent();

View File

@ -16,138 +16,31 @@
*/ */
package org.apache.kafka.coordinator.group; package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
/** /**
* Please ensure any new record added here stays in sync with DumpLogSegments. * Please ensure any new record added here stays in sync with DumpLogSegments.
*/ */
public class GroupCoordinatorRecordSerde extends CoordinatorRecordSerde { public class GroupCoordinatorRecordSerde extends CoordinatorRecordSerde {
// This method is temporary until the share coordinator is converted to
// using the new coordinator records.
@Override @Override
public byte[] serializeKey(CoordinatorRecord record) { protected ApiMessage apiMessageKeyFor(short recordType) {
// Record does not accept a null key. try {
return MessageUtil.toCoordinatorTypePrefixedBytes( return CoordinatorRecordType.fromId(recordType).newRecordKey();
record.key().version(), } catch (UnsupportedVersionException ex) {
record.key().message() throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
);
}
@Override
protected ApiMessage apiMessageKeyFor(short recordVersion) {
switch (recordVersion) {
case 0:
case 1:
return new OffsetCommitKey();
case 2:
return new GroupMetadataKey();
case 3:
return new ConsumerGroupMetadataKey();
case 4:
return new ConsumerGroupPartitionMetadataKey();
case 5:
return new ConsumerGroupMemberMetadataKey();
case 6:
return new ConsumerGroupTargetAssignmentMetadataKey();
case 7:
return new ConsumerGroupTargetAssignmentMemberKey();
case 8:
return new ConsumerGroupCurrentMemberAssignmentKey();
case 9:
return new ShareGroupPartitionMetadataKey();
case 10:
return new ShareGroupMemberMetadataKey();
case 11:
return new ShareGroupMetadataKey();
case 12:
return new ShareGroupTargetAssignmentMetadataKey();
case 13:
return new ShareGroupTargetAssignmentMemberKey();
case 14:
return new ShareGroupCurrentMemberAssignmentKey();
case 15:
return new ShareGroupStatePartitionMetadataKey();
case 16:
return new ConsumerGroupRegularExpressionKey();
default:
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
} }
} }
@Override @Override
protected ApiMessage apiMessageValueFor(short recordVersion) { protected ApiMessage apiMessageValueFor(short recordVersion) {
switch (recordVersion) { try {
case 0: return CoordinatorRecordType.fromId(recordVersion).newRecordValue();
case 1: } catch (UnsupportedVersionException ex) {
return new OffsetCommitValue(); throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
case 2:
return new GroupMetadataValue();
case 3:
return new ConsumerGroupMetadataValue();
case 4:
return new ConsumerGroupPartitionMetadataValue();
case 5:
return new ConsumerGroupMemberMetadataValue();
case 6:
return new ConsumerGroupTargetAssignmentMetadataValue();
case 7:
return new ConsumerGroupTargetAssignmentMemberValue();
case 8:
return new ConsumerGroupCurrentMemberAssignmentValue();
case 9:
return new ShareGroupPartitionMetadataValue();
case 10:
return new ShareGroupMemberMetadataValue();
case 11:
return new ShareGroupMetadataValue();
case 12:
return new ShareGroupTargetAssignmentMetadataValue();
case 13:
return new ShareGroupTargetAssignmentMemberValue();
case 14:
return new ShareGroupCurrentMemberAssignmentValue();
case 15:
return new ShareGroupStatePartitionMetadataValue();
case 16:
return new ConsumerGroupRegularExpressionValue();
default:
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
} }
} }
} }

View File

@ -20,38 +20,9 @@ import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -245,23 +216,9 @@ public class GroupCoordinatorRecordSerdeTest {
@Test @Test
public void testDeserializeAllRecordTypes() { public void testDeserializeAllRecordTypes() {
roundTrip((short) 0, new OffsetCommitKey(), new OffsetCommitValue()); for (CoordinatorRecordType record : CoordinatorRecordType.values()) {
roundTrip((short) 1, new OffsetCommitKey(), new OffsetCommitValue()); roundTrip(record.id(), record.newRecordKey(), record.newRecordValue());
roundTrip((short) 2, new GroupMetadataKey(), new GroupMetadataValue()); }
roundTrip((short) 3, new ConsumerGroupMetadataKey(), new ConsumerGroupMetadataValue());
roundTrip((short) 4, new ConsumerGroupPartitionMetadataKey(), new ConsumerGroupPartitionMetadataValue());
roundTrip((short) 5, new ConsumerGroupMemberMetadataKey(), new ConsumerGroupMemberMetadataValue());
roundTrip((short) 6, new ConsumerGroupTargetAssignmentMetadataKey(), new ConsumerGroupTargetAssignmentMetadataValue());
roundTrip((short) 7, new ConsumerGroupTargetAssignmentMemberKey(), new ConsumerGroupTargetAssignmentMemberValue());
roundTrip((short) 8, new ConsumerGroupCurrentMemberAssignmentKey(), new ConsumerGroupCurrentMemberAssignmentValue());
roundTrip((short) 9, new ShareGroupPartitionMetadataKey(), new ShareGroupPartitionMetadataValue());
roundTrip((short) 10, new ShareGroupMemberMetadataKey(), new ShareGroupMemberMetadataValue());
roundTrip((short) 11, new ShareGroupMetadataKey(), new ShareGroupMetadataValue());
roundTrip((short) 12, new ShareGroupTargetAssignmentMetadataKey(), new ShareGroupTargetAssignmentMetadataValue());
roundTrip((short) 13, new ShareGroupTargetAssignmentMemberKey(), new ShareGroupTargetAssignmentMemberValue());
roundTrip((short) 14, new ShareGroupCurrentMemberAssignmentKey(), new ShareGroupCurrentMemberAssignmentValue());
roundTrip((short) 15, new ShareGroupStatePartitionMetadataKey(), new ShareGroupStatePartitionMetadataValue());
roundTrip((short) 16, new ConsumerGroupRegularExpressionKey(), new ConsumerGroupRegularExpressionValue());
} }
private void roundTrip( private void roundTrip(

View File

@ -17,36 +17,28 @@
package org.apache.kafka.coordinator.share; package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey; import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
public class ShareCoordinatorRecordSerde extends CoordinatorRecordSerde { public class ShareCoordinatorRecordSerde extends CoordinatorRecordSerde {
@Override @Override
protected ApiMessage apiMessageKeyFor(short recordVersion) { protected ApiMessage apiMessageKeyFor(short recordType) {
switch (recordVersion) { try {
case 0: return CoordinatorRecordType.fromId(recordType).newRecordKey();
return new ShareSnapshotKey(); } catch (UnsupportedVersionException ex) {
case 1: throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
return new ShareUpdateKey();
default:
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
} }
} }
@Override @Override
protected ApiMessage apiMessageValueFor(short recordVersion) { protected ApiMessage apiMessageValueFor(short recordVersion) {
switch (recordVersion) { try {
case 0: return CoordinatorRecordType.fromId(recordVersion).newRecordValue();
return new ShareSnapshotValue(); } catch (UnsupportedVersionException ex) {
case 1: throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
return new ShareUpdateValue();
default:
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
} }
} }
} }

View File

@ -24,8 +24,6 @@ import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType; import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey; import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue; import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -199,8 +197,9 @@ public class ShareCoordinatorRecordSerdeTest {
@Test @Test
public void testDeserializeAllRecordTypes() { public void testDeserializeAllRecordTypes() {
roundTrip((short) 0, new ShareSnapshotKey(), new ShareSnapshotValue()); for (CoordinatorRecordType record : CoordinatorRecordType.values()) {
roundTrip((short) 1, new ShareUpdateKey(), new ShareUpdateValue()); roundTrip(record.id(), record.newRecordKey(), record.newRecordValue());
}
} }
private void roundTrip( private void roundTrip(

View File

@ -25,12 +25,12 @@ import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
public class TransactionCoordinatorRecordSerde extends CoordinatorRecordSerde { public class TransactionCoordinatorRecordSerde extends CoordinatorRecordSerde {
@Override @Override
protected ApiMessage apiMessageKeyFor(short recordVersion) { protected ApiMessage apiMessageKeyFor(short recordType) {
switch (recordVersion) { switch (recordType) {
case 0: case 0:
return new TransactionLogKey(); return new TransactionLogKey();
default: default:
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion); throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
} }
} }