From 12a16ecf289a6495f36bd9ed4c7bc4db73c62a99 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Wed, 2 Oct 2024 15:39:37 +0100 Subject: [PATCH] KAFKA-16733: Add share group record support to OffsetsMessageParser (#17282) This patch adds support for decoding the new KIP-932 record schemas in kafka-dump-log.sh Reviewers: David Arthur , Chia-Ping Tsai --- .../scala/kafka/tools/DumpLogSegments.scala | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 92cdec988ec..57f84e39825 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -38,7 +38,7 @@ import org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordT import org.apache.kafka.common.protocol.{ByteBufferAccessor, Message} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils -import org.apache.kafka.coordinator.group.generated.{ConsumerGroupCurrentMemberAssignmentKey, ConsumerGroupCurrentMemberAssignmentKeyJsonConverter, ConsumerGroupCurrentMemberAssignmentValue, ConsumerGroupCurrentMemberAssignmentValueJsonConverter, ConsumerGroupMemberMetadataKey, ConsumerGroupMemberMetadataKeyJsonConverter, ConsumerGroupMemberMetadataValue, ConsumerGroupMemberMetadataValueJsonConverter, ConsumerGroupMetadataKey, ConsumerGroupMetadataKeyJsonConverter, ConsumerGroupMetadataValue, ConsumerGroupMetadataValueJsonConverter, ConsumerGroupPartitionMetadataKey, ConsumerGroupPartitionMetadataKeyJsonConverter, ConsumerGroupPartitionMetadataValue, ConsumerGroupPartitionMetadataValueJsonConverter, ConsumerGroupTargetAssignmentMemberKey, ConsumerGroupTargetAssignmentMemberKeyJsonConverter, ConsumerGroupTargetAssignmentMemberValue, ConsumerGroupTargetAssignmentMemberValueJsonConverter, ConsumerGroupTargetAssignmentMetadataKey, ConsumerGroupTargetAssignmentMetadataKeyJsonConverter, ConsumerGroupTargetAssignmentMetadataValue, ConsumerGroupTargetAssignmentMetadataValueJsonConverter, GroupMetadataKey, GroupMetadataKeyJsonConverter, GroupMetadataValue, GroupMetadataValueJsonConverter, OffsetCommitKey, OffsetCommitKeyJsonConverter, OffsetCommitValue, OffsetCommitValueJsonConverter} +import org.apache.kafka.coordinator.group.generated.{ConsumerGroupCurrentMemberAssignmentKey, ConsumerGroupCurrentMemberAssignmentKeyJsonConverter, ConsumerGroupCurrentMemberAssignmentValue, ConsumerGroupCurrentMemberAssignmentValueJsonConverter, ConsumerGroupMemberMetadataKey, ConsumerGroupMemberMetadataKeyJsonConverter, ConsumerGroupMemberMetadataValue, ConsumerGroupMemberMetadataValueJsonConverter, ConsumerGroupMetadataKey, ConsumerGroupMetadataKeyJsonConverter, ConsumerGroupMetadataValue, ConsumerGroupMetadataValueJsonConverter, ConsumerGroupPartitionMetadataKey, ConsumerGroupPartitionMetadataKeyJsonConverter, ConsumerGroupPartitionMetadataValue, ConsumerGroupPartitionMetadataValueJsonConverter, ConsumerGroupTargetAssignmentMemberKey, ConsumerGroupTargetAssignmentMemberKeyJsonConverter, ConsumerGroupTargetAssignmentMemberValue, ConsumerGroupTargetAssignmentMemberValueJsonConverter, ConsumerGroupTargetAssignmentMetadataKey, ConsumerGroupTargetAssignmentMetadataKeyJsonConverter, ConsumerGroupTargetAssignmentMetadataValue, ConsumerGroupTargetAssignmentMetadataValueJsonConverter, GroupMetadataKey, GroupMetadataKeyJsonConverter, GroupMetadataValue, GroupMetadataValueJsonConverter, OffsetCommitKey, OffsetCommitKeyJsonConverter, OffsetCommitValue, OffsetCommitValueJsonConverter, ShareGroupCurrentMemberAssignmentKey, ShareGroupCurrentMemberAssignmentKeyJsonConverter, ShareGroupCurrentMemberAssignmentValue, ShareGroupCurrentMemberAssignmentValueJsonConverter, ShareGroupMemberMetadataKey, ShareGroupMemberMetadataKeyJsonConverter, ShareGroupMemberMetadataValue, ShareGroupMemberMetadataValueJsonConverter, ShareGroupMetadataKey, ShareGroupMetadataKeyJsonConverter, ShareGroupMetadataValue, ShareGroupMetadataValueJsonConverter, ShareGroupPartitionMetadataKey, ShareGroupPartitionMetadataKeyJsonConverter, ShareGroupPartitionMetadataValue, ShareGroupPartitionMetadataValueJsonConverter, ShareGroupStatePartitionMetadataKey, ShareGroupStatePartitionMetadataKeyJsonConverter, ShareGroupStatePartitionMetadataValue, ShareGroupStatePartitionMetadataValueJsonConverter, ShareGroupTargetAssignmentMemberKey, ShareGroupTargetAssignmentMemberKeyJsonConverter, ShareGroupTargetAssignmentMemberValue, ShareGroupTargetAssignmentMemberValueJsonConverter, ShareGroupTargetAssignmentMetadataKey, ShareGroupTargetAssignmentMetadataKeyJsonConverter, ShareGroupTargetAssignmentMetadataValue, ShareGroupTargetAssignmentMetadataValueJsonConverter} import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde import org.apache.kafka.metadata.MetadataRecordSerde @@ -439,6 +439,20 @@ object DumpLogSegments { ConsumerGroupTargetAssignmentMemberKeyJsonConverter.write(m, version) case m: ConsumerGroupCurrentMemberAssignmentKey => ConsumerGroupCurrentMemberAssignmentKeyJsonConverter.write(m, version) + case m: ShareGroupMetadataKey => + ShareGroupMetadataKeyJsonConverter.write(m, version) + case m: ShareGroupPartitionMetadataKey => + ShareGroupPartitionMetadataKeyJsonConverter.write(m, version) + case m: ShareGroupMemberMetadataKey => + ShareGroupMemberMetadataKeyJsonConverter.write(m, version) + case m: ShareGroupTargetAssignmentMetadataKey => + ShareGroupTargetAssignmentMetadataKeyJsonConverter.write(m, version) + case m: ShareGroupTargetAssignmentMemberKey => + ShareGroupTargetAssignmentMemberKeyJsonConverter.write(m, version) + case m: ShareGroupCurrentMemberAssignmentKey => + ShareGroupCurrentMemberAssignmentKeyJsonConverter.write(m, version) + case m: ShareGroupStatePartitionMetadataKey => + ShareGroupStatePartitionMetadataKeyJsonConverter.write(m, version) case _ => throw new UnknownRecordTypeException(version) } @@ -519,6 +533,20 @@ object DumpLogSegments { ConsumerGroupTargetAssignmentMemberValueJsonConverter.write(m, version) case m: ConsumerGroupCurrentMemberAssignmentValue => ConsumerGroupCurrentMemberAssignmentValueJsonConverter.write(m, version) + case m: ShareGroupMetadataValue => + ShareGroupMetadataValueJsonConverter.write(m, version) + case m: ShareGroupPartitionMetadataValue => + ShareGroupPartitionMetadataValueJsonConverter.write(m, version) + case m: ShareGroupMemberMetadataValue => + ShareGroupMemberMetadataValueJsonConverter.write(m, version) + case m: ShareGroupTargetAssignmentMetadataValue => + ShareGroupTargetAssignmentMetadataValueJsonConverter.write(m, version) + case m: ShareGroupTargetAssignmentMemberValue => + ShareGroupTargetAssignmentMemberValueJsonConverter.write(m, version) + case m: ShareGroupCurrentMemberAssignmentValue => + ShareGroupCurrentMemberAssignmentValueJsonConverter.write(m, version) + case m: ShareGroupStatePartitionMetadataValue => + ShareGroupStatePartitionMetadataValueJsonConverter.write(m, version) case _ => throw new IllegalStateException(s"Message value ${message.getClass.getSimpleName} is not supported.") }