From 87334e6c2e994e454dad8464a3c4e878a3189458 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 10 Jan 2025 11:17:30 +0100 Subject: [PATCH] KAFKA-18308; Update CoordinatorSerde (#18455) This patch updates the GroupCoordinatorSerde and the ShareGroupCoordinatorSerde to leverage the CoordinatorRecordType to deserialize records. With this, newly added record are automatically picked up. In other words, the serdes work with all defined records without doing anything. Reviewers: Andrew Schofield --- .../runtime/CoordinatorRecordSerde.java | 6 +- .../CoordinatorRecordTypeGenerator.java | 56 +++++++- .../group/GroupCoordinatorRecordSerde.java | 129 ++---------------- .../GroupCoordinatorRecordSerdeTest.java | 51 +------ .../share/ShareCoordinatorRecordSerde.java | 30 ++-- .../ShareCoordinatorRecordSerdeTest.java | 7 +- .../TransactionCoordinatorRecordSerde.java | 6 +- 7 files changed, 90 insertions(+), 195 deletions(-) diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java index 56f9a6cae13..9e5afc3b89f 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java @@ -43,7 +43,7 @@ public abstract class CoordinatorRecordSerde implements Serializer entry : records.entrySet()) { + buffer.printf("case %d:%n", entry.getKey()); + buffer.incrementIndent(); + buffer.printf("return new %s();%n", + MessageGenerator.capitalizeFirst(entry.getValue().key.name())); + buffer.decrementIndent(); + } + buffer.printf("default:%n"); + buffer.incrementIndent(); + headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS); + buffer.printf("throw new UnsupportedVersionException(\"Unknown record id \"" + + " + id);%n"); + buffer.decrementIndent(); + buffer.decrementIndent(); + buffer.printf("}%n"); + buffer.decrementIndent(); + buffer.printf("}%n"); + } + + private void generateNewRecordValue() { + headerGenerator.addImport(MessageGenerator.API_MESSAGE_CLASS); + buffer.printf("public ApiMessage newRecordValue() {%n"); + buffer.incrementIndent(); + buffer.printf("switch (id) {%n"); + buffer.incrementIndent(); + for (Map.Entry entry : records.entrySet()) { + buffer.printf("case %d:%n", entry.getKey()); + buffer.incrementIndent(); + buffer.printf("return new %s();%n", + MessageGenerator.capitalizeFirst(entry.getValue().value.name())); + buffer.decrementIndent(); + } + buffer.printf("default:%n"); + buffer.incrementIndent(); + headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS); + buffer.printf("throw new UnsupportedVersionException(\"Unknown record id \"" + " + id);%n"); buffer.decrementIndent(); buffer.decrementIndent(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java index afa489502d8..9143079aa11 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java @@ -16,138 +16,31 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiMessage; -import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; -import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; -import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; -import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; -import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; /** * Please ensure any new record added here stays in sync with DumpLogSegments. */ public class GroupCoordinatorRecordSerde extends CoordinatorRecordSerde { - // This method is temporary until the share coordinator is converted to - // using the new coordinator records. @Override - public byte[] serializeKey(CoordinatorRecord record) { - // Record does not accept a null key. - return MessageUtil.toCoordinatorTypePrefixedBytes( - record.key().version(), - record.key().message() - ); - } - - @Override - protected ApiMessage apiMessageKeyFor(short recordVersion) { - switch (recordVersion) { - case 0: - case 1: - return new OffsetCommitKey(); - case 2: - return new GroupMetadataKey(); - case 3: - return new ConsumerGroupMetadataKey(); - case 4: - return new ConsumerGroupPartitionMetadataKey(); - case 5: - return new ConsumerGroupMemberMetadataKey(); - case 6: - return new ConsumerGroupTargetAssignmentMetadataKey(); - case 7: - return new ConsumerGroupTargetAssignmentMemberKey(); - case 8: - return new ConsumerGroupCurrentMemberAssignmentKey(); - case 9: - return new ShareGroupPartitionMetadataKey(); - case 10: - return new ShareGroupMemberMetadataKey(); - case 11: - return new ShareGroupMetadataKey(); - case 12: - return new ShareGroupTargetAssignmentMetadataKey(); - case 13: - return new ShareGroupTargetAssignmentMemberKey(); - case 14: - return new ShareGroupCurrentMemberAssignmentKey(); - case 15: - return new ShareGroupStatePartitionMetadataKey(); - case 16: - return new ConsumerGroupRegularExpressionKey(); - default: - throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion); + protected ApiMessage apiMessageKeyFor(short recordType) { + try { + return CoordinatorRecordType.fromId(recordType).newRecordKey(); + } catch (UnsupportedVersionException ex) { + throw new CoordinatorLoader.UnknownRecordTypeException(recordType); } } @Override protected ApiMessage apiMessageValueFor(short recordVersion) { - switch (recordVersion) { - case 0: - case 1: - return new OffsetCommitValue(); - case 2: - return new GroupMetadataValue(); - case 3: - return new ConsumerGroupMetadataValue(); - case 4: - return new ConsumerGroupPartitionMetadataValue(); - case 5: - return new ConsumerGroupMemberMetadataValue(); - case 6: - return new ConsumerGroupTargetAssignmentMetadataValue(); - case 7: - return new ConsumerGroupTargetAssignmentMemberValue(); - case 8: - return new ConsumerGroupCurrentMemberAssignmentValue(); - case 9: - return new ShareGroupPartitionMetadataValue(); - case 10: - return new ShareGroupMemberMetadataValue(); - case 11: - return new ShareGroupMetadataValue(); - case 12: - return new ShareGroupTargetAssignmentMetadataValue(); - case 13: - return new ShareGroupTargetAssignmentMemberValue(); - case 14: - return new ShareGroupCurrentMemberAssignmentValue(); - case 15: - return new ShareGroupStatePartitionMetadataValue(); - case 16: - return new ConsumerGroupRegularExpressionValue(); - default: - throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion); + try { + return CoordinatorRecordType.fromId(recordVersion).newRecordValue(); + } catch (UnsupportedVersionException ex) { + throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion); } } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java index 94eb326af38..319cc9358a2 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java @@ -20,38 +20,9 @@ import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; -import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; -import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; -import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Test; @@ -245,23 +216,9 @@ public class GroupCoordinatorRecordSerdeTest { @Test public void testDeserializeAllRecordTypes() { - roundTrip((short) 0, new OffsetCommitKey(), new OffsetCommitValue()); - roundTrip((short) 1, new OffsetCommitKey(), new OffsetCommitValue()); - roundTrip((short) 2, new GroupMetadataKey(), new GroupMetadataValue()); - roundTrip((short) 3, new ConsumerGroupMetadataKey(), new ConsumerGroupMetadataValue()); - roundTrip((short) 4, new ConsumerGroupPartitionMetadataKey(), new ConsumerGroupPartitionMetadataValue()); - roundTrip((short) 5, new ConsumerGroupMemberMetadataKey(), new ConsumerGroupMemberMetadataValue()); - roundTrip((short) 6, new ConsumerGroupTargetAssignmentMetadataKey(), new ConsumerGroupTargetAssignmentMetadataValue()); - roundTrip((short) 7, new ConsumerGroupTargetAssignmentMemberKey(), new ConsumerGroupTargetAssignmentMemberValue()); - roundTrip((short) 8, new ConsumerGroupCurrentMemberAssignmentKey(), new ConsumerGroupCurrentMemberAssignmentValue()); - roundTrip((short) 9, new ShareGroupPartitionMetadataKey(), new ShareGroupPartitionMetadataValue()); - roundTrip((short) 10, new ShareGroupMemberMetadataKey(), new ShareGroupMemberMetadataValue()); - roundTrip((short) 11, new ShareGroupMetadataKey(), new ShareGroupMetadataValue()); - roundTrip((short) 12, new ShareGroupTargetAssignmentMetadataKey(), new ShareGroupTargetAssignmentMetadataValue()); - roundTrip((short) 13, new ShareGroupTargetAssignmentMemberKey(), new ShareGroupTargetAssignmentMemberValue()); - roundTrip((short) 14, new ShareGroupCurrentMemberAssignmentKey(), new ShareGroupCurrentMemberAssignmentValue()); - roundTrip((short) 15, new ShareGroupStatePartitionMetadataKey(), new ShareGroupStatePartitionMetadataValue()); - roundTrip((short) 16, new ConsumerGroupRegularExpressionKey(), new ConsumerGroupRegularExpressionValue()); + for (CoordinatorRecordType record : CoordinatorRecordType.values()) { + roundTrip(record.id(), record.newRecordKey(), record.newRecordValue()); + } } private void roundTrip( diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java index 28f59e57d33..1fbfabb98f2 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java @@ -17,36 +17,28 @@ package org.apache.kafka.coordinator.share; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde; -import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey; -import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue; -import org.apache.kafka.coordinator.share.generated.ShareUpdateKey; -import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; +import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType; public class ShareCoordinatorRecordSerde extends CoordinatorRecordSerde { @Override - protected ApiMessage apiMessageKeyFor(short recordVersion) { - switch (recordVersion) { - case 0: - return new ShareSnapshotKey(); - case 1: - return new ShareUpdateKey(); - default: - throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion); + protected ApiMessage apiMessageKeyFor(short recordType) { + try { + return CoordinatorRecordType.fromId(recordType).newRecordKey(); + } catch (UnsupportedVersionException ex) { + throw new CoordinatorLoader.UnknownRecordTypeException(recordType); } } @Override protected ApiMessage apiMessageValueFor(short recordVersion) { - switch (recordVersion) { - case 0: - return new ShareSnapshotValue(); - case 1: - return new ShareUpdateValue(); - default: - throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion); + try { + return CoordinatorRecordType.fromId(recordVersion).newRecordValue(); + } catch (UnsupportedVersionException ex) { + throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion); } } } diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java index c11df0c19bb..3f40924028b 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java @@ -24,8 +24,6 @@ import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType; import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey; import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue; -import org.apache.kafka.coordinator.share.generated.ShareUpdateKey; -import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.BeforeEach; @@ -199,8 +197,9 @@ public class ShareCoordinatorRecordSerdeTest { @Test public void testDeserializeAllRecordTypes() { - roundTrip((short) 0, new ShareSnapshotKey(), new ShareSnapshotValue()); - roundTrip((short) 1, new ShareUpdateKey(), new ShareUpdateValue()); + for (CoordinatorRecordType record : CoordinatorRecordType.values()) { + roundTrip(record.id(), record.newRecordKey(), record.newRecordValue()); + } } private void roundTrip( diff --git a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerde.java b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerde.java index f8d1c72f4ef..07387948c6c 100644 --- a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerde.java +++ b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerde.java @@ -25,12 +25,12 @@ import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; public class TransactionCoordinatorRecordSerde extends CoordinatorRecordSerde { @Override - protected ApiMessage apiMessageKeyFor(short recordVersion) { - switch (recordVersion) { + protected ApiMessage apiMessageKeyFor(short recordType) { + switch (recordType) { case 0: return new TransactionLogKey(); default: - throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion); + throw new CoordinatorLoader.UnknownRecordTypeException(recordType); } }