KAFKA-16733: Add share group record support to OffsetsMessageParser (#17282)

This patch adds support for decoding the new KIP-932 record schemas in kafka-dump-log.sh

Reviewers: David Arthur <mumrah@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Andrew Schofield 2024-10-02 15:39:37 +01:00 committed by GitHub
parent 2547d750a3
commit 12a16ecf28
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 29 additions and 1 deletions

View File

@ -38,7 +38,7 @@ import org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordT
import org.apache.kafka.common.protocol.{ByteBufferAccessor, Message}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.generated.{ConsumerGroupCurrentMemberAssignmentKey, ConsumerGroupCurrentMemberAssignmentKeyJsonConverter, ConsumerGroupCurrentMemberAssignmentValue, ConsumerGroupCurrentMemberAssignmentValueJsonConverter, ConsumerGroupMemberMetadataKey, ConsumerGroupMemberMetadataKeyJsonConverter, ConsumerGroupMemberMetadataValue, ConsumerGroupMemberMetadataValueJsonConverter, ConsumerGroupMetadataKey, ConsumerGroupMetadataKeyJsonConverter, ConsumerGroupMetadataValue, ConsumerGroupMetadataValueJsonConverter, ConsumerGroupPartitionMetadataKey, ConsumerGroupPartitionMetadataKeyJsonConverter, ConsumerGroupPartitionMetadataValue, ConsumerGroupPartitionMetadataValueJsonConverter, ConsumerGroupTargetAssignmentMemberKey, ConsumerGroupTargetAssignmentMemberKeyJsonConverter, ConsumerGroupTargetAssignmentMemberValue, ConsumerGroupTargetAssignmentMemberValueJsonConverter, ConsumerGroupTargetAssignmentMetadataKey, ConsumerGroupTargetAssignmentMetadataKeyJsonConverter, ConsumerGroupTargetAssignmentMetadataValue, ConsumerGroupTargetAssignmentMetadataValueJsonConverter, GroupMetadataKey, GroupMetadataKeyJsonConverter, GroupMetadataValue, GroupMetadataValueJsonConverter, OffsetCommitKey, OffsetCommitKeyJsonConverter, OffsetCommitValue, OffsetCommitValueJsonConverter}
import org.apache.kafka.coordinator.group.generated.{ConsumerGroupCurrentMemberAssignmentKey, ConsumerGroupCurrentMemberAssignmentKeyJsonConverter, ConsumerGroupCurrentMemberAssignmentValue, ConsumerGroupCurrentMemberAssignmentValueJsonConverter, ConsumerGroupMemberMetadataKey, ConsumerGroupMemberMetadataKeyJsonConverter, ConsumerGroupMemberMetadataValue, ConsumerGroupMemberMetadataValueJsonConverter, ConsumerGroupMetadataKey, ConsumerGroupMetadataKeyJsonConverter, ConsumerGroupMetadataValue, ConsumerGroupMetadataValueJsonConverter, ConsumerGroupPartitionMetadataKey, ConsumerGroupPartitionMetadataKeyJsonConverter, ConsumerGroupPartitionMetadataValue, ConsumerGroupPartitionMetadataValueJsonConverter, ConsumerGroupTargetAssignmentMemberKey, ConsumerGroupTargetAssignmentMemberKeyJsonConverter, ConsumerGroupTargetAssignmentMemberValue, ConsumerGroupTargetAssignmentMemberValueJsonConverter, ConsumerGroupTargetAssignmentMetadataKey, ConsumerGroupTargetAssignmentMetadataKeyJsonConverter, ConsumerGroupTargetAssignmentMetadataValue, ConsumerGroupTargetAssignmentMetadataValueJsonConverter, GroupMetadataKey, GroupMetadataKeyJsonConverter, GroupMetadataValue, GroupMetadataValueJsonConverter, OffsetCommitKey, OffsetCommitKeyJsonConverter, OffsetCommitValue, OffsetCommitValueJsonConverter, ShareGroupCurrentMemberAssignmentKey, ShareGroupCurrentMemberAssignmentKeyJsonConverter, ShareGroupCurrentMemberAssignmentValue, ShareGroupCurrentMemberAssignmentValueJsonConverter, ShareGroupMemberMetadataKey, ShareGroupMemberMetadataKeyJsonConverter, ShareGroupMemberMetadataValue, ShareGroupMemberMetadataValueJsonConverter, ShareGroupMetadataKey, ShareGroupMetadataKeyJsonConverter, ShareGroupMetadataValue, ShareGroupMetadataValueJsonConverter, ShareGroupPartitionMetadataKey, ShareGroupPartitionMetadataKeyJsonConverter, ShareGroupPartitionMetadataValue, ShareGroupPartitionMetadataValueJsonConverter, ShareGroupStatePartitionMetadataKey, ShareGroupStatePartitionMetadataKeyJsonConverter, ShareGroupStatePartitionMetadataValue, ShareGroupStatePartitionMetadataValueJsonConverter, ShareGroupTargetAssignmentMemberKey, ShareGroupTargetAssignmentMemberKeyJsonConverter, ShareGroupTargetAssignmentMemberValue, ShareGroupTargetAssignmentMemberValueJsonConverter, ShareGroupTargetAssignmentMetadataKey, ShareGroupTargetAssignmentMetadataKeyJsonConverter, ShareGroupTargetAssignmentMetadataValue, ShareGroupTargetAssignmentMetadataValueJsonConverter}
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
import org.apache.kafka.metadata.MetadataRecordSerde
@ -439,6 +439,20 @@ object DumpLogSegments {
ConsumerGroupTargetAssignmentMemberKeyJsonConverter.write(m, version)
case m: ConsumerGroupCurrentMemberAssignmentKey =>
ConsumerGroupCurrentMemberAssignmentKeyJsonConverter.write(m, version)
case m: ShareGroupMetadataKey =>
ShareGroupMetadataKeyJsonConverter.write(m, version)
case m: ShareGroupPartitionMetadataKey =>
ShareGroupPartitionMetadataKeyJsonConverter.write(m, version)
case m: ShareGroupMemberMetadataKey =>
ShareGroupMemberMetadataKeyJsonConverter.write(m, version)
case m: ShareGroupTargetAssignmentMetadataKey =>
ShareGroupTargetAssignmentMetadataKeyJsonConverter.write(m, version)
case m: ShareGroupTargetAssignmentMemberKey =>
ShareGroupTargetAssignmentMemberKeyJsonConverter.write(m, version)
case m: ShareGroupCurrentMemberAssignmentKey =>
ShareGroupCurrentMemberAssignmentKeyJsonConverter.write(m, version)
case m: ShareGroupStatePartitionMetadataKey =>
ShareGroupStatePartitionMetadataKeyJsonConverter.write(m, version)
case _ => throw new UnknownRecordTypeException(version)
}
@ -519,6 +533,20 @@ object DumpLogSegments {
ConsumerGroupTargetAssignmentMemberValueJsonConverter.write(m, version)
case m: ConsumerGroupCurrentMemberAssignmentValue =>
ConsumerGroupCurrentMemberAssignmentValueJsonConverter.write(m, version)
case m: ShareGroupMetadataValue =>
ShareGroupMetadataValueJsonConverter.write(m, version)
case m: ShareGroupPartitionMetadataValue =>
ShareGroupPartitionMetadataValueJsonConverter.write(m, version)
case m: ShareGroupMemberMetadataValue =>
ShareGroupMemberMetadataValueJsonConverter.write(m, version)
case m: ShareGroupTargetAssignmentMetadataValue =>
ShareGroupTargetAssignmentMetadataValueJsonConverter.write(m, version)
case m: ShareGroupTargetAssignmentMemberValue =>
ShareGroupTargetAssignmentMemberValueJsonConverter.write(m, version)
case m: ShareGroupCurrentMemberAssignmentValue =>
ShareGroupCurrentMemberAssignmentValueJsonConverter.write(m, version)
case m: ShareGroupStatePartitionMetadataValue =>
ShareGroupStatePartitionMetadataValueJsonConverter.write(m, version)
case _ => throw new IllegalStateException(s"Message value ${message.getClass.getSimpleName} is not supported.")
}