KAFKA-18457; Update DumpLogSegments to use coordinator record json converters (#18480)

This patch updates the ShareGroupStateMessageParser and OffsetsMessageParser used by the DumpLogSegments command line tool to use the recently introduced json converters for those records. It basically means that new records are automatically supported.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Jacot 2025-01-13 11:28:54 +01:00 committed by GitHub
parent 33556aedc3
commit 273719227e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 25 additions and 108 deletions

View File

@ -33,14 +33,14 @@ import org.apache.kafka.common.message.SnapshotFooterRecordJsonConverter
import org.apache.kafka.common.message.SnapshotHeaderRecordJsonConverter
import org.apache.kafka.common.message.VotersRecordJsonConverter
import org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordType}
import org.apache.kafka.common.protocol.{ByteBufferAccessor, Message}
import org.apache.kafka.common.protocol.{ApiMessage, ByteBufferAccessor, Message}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.generated.{ConsumerGroupCurrentMemberAssignmentKey, ConsumerGroupCurrentMemberAssignmentKeyJsonConverter, ConsumerGroupCurrentMemberAssignmentValue, ConsumerGroupCurrentMemberAssignmentValueJsonConverter, ConsumerGroupMemberMetadataKey, ConsumerGroupMemberMetadataKeyJsonConverter, ConsumerGroupMemberMetadataValue, ConsumerGroupMemberMetadataValueJsonConverter, ConsumerGroupMetadataKey, ConsumerGroupMetadataKeyJsonConverter, ConsumerGroupMetadataValue, ConsumerGroupMetadataValueJsonConverter, ConsumerGroupPartitionMetadataKey, ConsumerGroupPartitionMetadataKeyJsonConverter, ConsumerGroupPartitionMetadataValue, ConsumerGroupPartitionMetadataValueJsonConverter, ConsumerGroupRegularExpressionKey, ConsumerGroupRegularExpressionKeyJsonConverter, ConsumerGroupRegularExpressionValue, ConsumerGroupRegularExpressionValueJsonConverter, ConsumerGroupTargetAssignmentMemberKey, ConsumerGroupTargetAssignmentMemberKeyJsonConverter, ConsumerGroupTargetAssignmentMemberValue, ConsumerGroupTargetAssignmentMemberValueJsonConverter, ConsumerGroupTargetAssignmentMetadataKey, ConsumerGroupTargetAssignmentMetadataKeyJsonConverter, ConsumerGroupTargetAssignmentMetadataValue, ConsumerGroupTargetAssignmentMetadataValueJsonConverter, GroupMetadataKey, GroupMetadataKeyJsonConverter, GroupMetadataValue, GroupMetadataValueJsonConverter, OffsetCommitKey, OffsetCommitKeyJsonConverter, OffsetCommitValue, OffsetCommitValueJsonConverter, ShareGroupCurrentMemberAssignmentKey, ShareGroupCurrentMemberAssignmentKeyJsonConverter, ShareGroupCurrentMemberAssignmentValue, ShareGroupCurrentMemberAssignmentValueJsonConverter, ShareGroupMemberMetadataKey, ShareGroupMemberMetadataKeyJsonConverter, ShareGroupMemberMetadataValue, ShareGroupMemberMetadataValueJsonConverter, ShareGroupMetadataKey, ShareGroupMetadataKeyJsonConverter, ShareGroupMetadataValue, ShareGroupMetadataValueJsonConverter, ShareGroupPartitionMetadataKey, ShareGroupPartitionMetadataKeyJsonConverter, ShareGroupPartitionMetadataValue, ShareGroupPartitionMetadataValueJsonConverter, ShareGroupStatePartitionMetadataKey, ShareGroupStatePartitionMetadataKeyJsonConverter, ShareGroupStatePartitionMetadataValue, ShareGroupStatePartitionMetadataValueJsonConverter, ShareGroupTargetAssignmentMemberKey, ShareGroupTargetAssignmentMemberKeyJsonConverter, ShareGroupTargetAssignmentMemberValue, ShareGroupTargetAssignmentMemberValueJsonConverter, ShareGroupTargetAssignmentMetadataKey, ShareGroupTargetAssignmentMetadataKeyJsonConverter, ShareGroupTargetAssignmentMetadataValue, ShareGroupTargetAssignmentMetadataValueJsonConverter}
import org.apache.kafka.coordinator.group.generated.{CoordinatorRecordJsonConverters => GroupCoordinatorRecordJsonConverters, CoordinatorRecordType => GroupCoordinatorRecordType, GroupMetadataValue, GroupMetadataValueJsonConverter}
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, ShareSnapshotKeyJsonConverter, ShareSnapshotValue, ShareSnapshotValueJsonConverter, ShareUpdateKey, ShareUpdateKeyJsonConverter, ShareUpdateValue, ShareUpdateValueJsonConverter}
import org.apache.kafka.coordinator.share.generated.{CoordinatorRecordJsonConverters => ShareCoordinatorRecordJsonConverters}
import org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogKeyJsonConverter, TransactionLogValue, TransactionLogValueJsonConverter}
import org.apache.kafka.metadata.MetadataRecordSerde
@ -423,46 +423,10 @@ object DumpLogSegments {
class OffsetsMessageParser extends MessageParser[String, String] {
private val serde = new GroupCoordinatorRecordSerde()
private def prepareKey(message: Message, recordType: Short): String = {
val messageAsJson = message match {
case m: OffsetCommitKey =>
OffsetCommitKeyJsonConverter.write(m, 0.toShort)
case m: GroupMetadataKey =>
GroupMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ConsumerGroupMetadataKey =>
ConsumerGroupMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ConsumerGroupPartitionMetadataKey =>
ConsumerGroupPartitionMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ConsumerGroupMemberMetadataKey =>
ConsumerGroupMemberMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ConsumerGroupTargetAssignmentMetadataKey =>
ConsumerGroupTargetAssignmentMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ConsumerGroupTargetAssignmentMemberKey =>
ConsumerGroupTargetAssignmentMemberKeyJsonConverter.write(m, 0.toShort)
case m: ConsumerGroupCurrentMemberAssignmentKey =>
ConsumerGroupCurrentMemberAssignmentKeyJsonConverter.write(m, 0.toShort)
case m: ConsumerGroupRegularExpressionKey =>
ConsumerGroupRegularExpressionKeyJsonConverter.write(m, 0.toShort)
case m: ShareGroupMetadataKey =>
ShareGroupMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ShareGroupPartitionMetadataKey =>
ShareGroupPartitionMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ShareGroupMemberMetadataKey =>
ShareGroupMemberMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ShareGroupTargetAssignmentMetadataKey =>
ShareGroupTargetAssignmentMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ShareGroupTargetAssignmentMemberKey =>
ShareGroupTargetAssignmentMemberKeyJsonConverter.write(m, 0.toShort)
case m: ShareGroupCurrentMemberAssignmentKey =>
ShareGroupCurrentMemberAssignmentKeyJsonConverter.write(m, 0.toShort)
case m: ShareGroupStatePartitionMetadataKey =>
ShareGroupStatePartitionMetadataKeyJsonConverter.write(m, 0.toShort)
case _ => throw new UnknownRecordTypeException(recordType)
}
private def prepareKey(message: ApiMessage): String = {
val json = new ObjectNode(JsonNodeFactory.instance)
json.set("type", new TextNode(recordType.toString))
json.set("data", messageAsJson)
json.set("type", new TextNode(message.apiKey.toString))
json.set("data", GroupCoordinatorRecordJsonConverters.writeRecordKeyAsJson(message))
json.toString
}
@ -519,41 +483,11 @@ object DumpLogSegments {
json
}
private def prepareValue(message: Message, version: Short): String = {
val messageAsJson = message match {
case m: OffsetCommitValue =>
OffsetCommitValueJsonConverter.write(m, version)
case m: GroupMetadataValue =>
prepareGroupMetadataValue(m, version)
case m: ConsumerGroupMetadataValue =>
ConsumerGroupMetadataValueJsonConverter.write(m, version)
case m: ConsumerGroupPartitionMetadataValue =>
ConsumerGroupPartitionMetadataValueJsonConverter.write(m, version)
case m: ConsumerGroupMemberMetadataValue =>
ConsumerGroupMemberMetadataValueJsonConverter.write(m, version)
case m: ConsumerGroupTargetAssignmentMetadataValue =>
ConsumerGroupTargetAssignmentMetadataValueJsonConverter.write(m, version)
case m: ConsumerGroupTargetAssignmentMemberValue =>
ConsumerGroupTargetAssignmentMemberValueJsonConverter.write(m, version)
case m: ConsumerGroupCurrentMemberAssignmentValue =>
ConsumerGroupCurrentMemberAssignmentValueJsonConverter.write(m, version)
case m: ConsumerGroupRegularExpressionValue =>
ConsumerGroupRegularExpressionValueJsonConverter.write(m, version)
case m: ShareGroupMetadataValue =>
ShareGroupMetadataValueJsonConverter.write(m, version)
case m: ShareGroupPartitionMetadataValue =>
ShareGroupPartitionMetadataValueJsonConverter.write(m, version)
case m: ShareGroupMemberMetadataValue =>
ShareGroupMemberMetadataValueJsonConverter.write(m, version)
case m: ShareGroupTargetAssignmentMetadataValue =>
ShareGroupTargetAssignmentMetadataValueJsonConverter.write(m, version)
case m: ShareGroupTargetAssignmentMemberValue =>
ShareGroupTargetAssignmentMemberValueJsonConverter.write(m, version)
case m: ShareGroupCurrentMemberAssignmentValue =>
ShareGroupCurrentMemberAssignmentValueJsonConverter.write(m, version)
case m: ShareGroupStatePartitionMetadataValue =>
ShareGroupStatePartitionMetadataValueJsonConverter.write(m, version)
case _ => throw new IllegalStateException(s"Message value ${message.getClass.getSimpleName} is not supported.")
private def prepareValue(message: ApiMessage, version: Short): String = {
val messageAsJson = if (message.apiKey == GroupCoordinatorRecordType.GROUP_METADATA.id) {
prepareGroupMetadataValue(message.asInstanceOf[GroupMetadataValue], version)
} else {
GroupCoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version)
}
val json = new ObjectNode(JsonNodeFactory.instance)
@ -570,7 +504,7 @@ object DumpLogSegments {
try {
val r = serde.deserialize(record.key, record.value)
(
Some(prepareKey(r.key.message, r.key.version)),
Some(prepareKey(r.key.message)),
Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("<DELETE>"))
)
} catch {
@ -691,34 +625,17 @@ object DumpLogSegments {
class ShareGroupStateMessageParser extends MessageParser[String, String] {
private val serde = new ShareCoordinatorRecordSerde()
private def prepareKey(message: Message, version: Short): String = {
val messageAsJson = message match {
case m: ShareSnapshotKey =>
ShareSnapshotKeyJsonConverter.write(m, version)
case m: ShareUpdateKey =>
ShareUpdateKeyJsonConverter.write(m, version)
case _ => throw new UnknownRecordTypeException(version)
}
jsonString(messageAsJson, version)
}
private def prepareValue(message: Message, version: Short): String = {
val messageAsJson = message match {
case m: ShareSnapshotValue =>
ShareSnapshotValueJsonConverter.write(m, version)
case m: ShareUpdateValue =>
ShareUpdateValueJsonConverter.write(m, version)
case _ => throw new IllegalStateException(s"Message value ${message.getClass.getSimpleName} is not supported.")
}
jsonString(messageAsJson, version)
}
private def jsonString(jsonNode: JsonNode, version: Short): String = {
private def prepareKey(message: ApiMessage): String = {
val json = new ObjectNode(JsonNodeFactory.instance)
json.set("type", new TextNode(version.toString))
json.set("data", jsonNode)
json.set("type", new TextNode(message.apiKey.toString))
json.set("data", ShareCoordinatorRecordJsonConverters.writeRecordKeyAsJson(message))
json.toString
}
private def prepareValue(message: ApiMessage, version: Short): String = {
val json = new ObjectNode(JsonNodeFactory.instance)
json.set("version", new TextNode(version.toString))
json.set("data", ShareCoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version))
json.toString
}
@ -730,7 +647,7 @@ object DumpLogSegments {
try {
val r = serde.deserialize(record.key, record.value)
(
Some(prepareKey(r.key.message, r.key.version)),
Some(prepareKey(r.key.message)),
Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("<DELETE>"))
)
} catch {

View File

@ -1112,7 +1112,7 @@ class DumpLogSegmentsTest {
assertEquals(
(
Some("{\"type\":\"0\",\"data\":{\"groupId\":\"gs1\",\"topicId\":\"Uj5wn_FqTXirEASvVZRY1w\",\"partition\":0}}"),
Some("{\"type\":\"0\",\"data\":{\"snapshotEpoch\":0,\"stateEpoch\":0,\"leaderEpoch\":0,\"startOffset\":0,\"stateBatches\":[{\"firstOffset\":0,\"lastOffset\":4,\"deliveryState\":2,\"deliveryCount\":1}]}}")
Some("{\"version\":\"0\",\"data\":{\"snapshotEpoch\":0,\"stateEpoch\":0,\"leaderEpoch\":0,\"startOffset\":0,\"stateBatches\":[{\"firstOffset\":0,\"lastOffset\":4,\"deliveryState\":2,\"deliveryCount\":1}]}}")
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(new ShareSnapshotKey()
@ -1140,7 +1140,7 @@ class DumpLogSegmentsTest {
assertEquals(
(
Some("{\"type\":\"1\",\"data\":{\"groupId\":\"gs1\",\"topicId\":\"Uj5wn_FqTXirEASvVZRY1w\",\"partition\":0}}"),
Some("{\"type\":\"0\",\"data\":{\"snapshotEpoch\":0,\"leaderEpoch\":0,\"startOffset\":0,\"stateBatches\":[{\"firstOffset\":0,\"lastOffset\":4,\"deliveryState\":2,\"deliveryCount\":1}]}}")
Some("{\"version\":\"0\",\"data\":{\"snapshotEpoch\":0,\"leaderEpoch\":0,\"startOffset\":0,\"stateBatches\":[{\"firstOffset\":0,\"lastOffset\":4,\"deliveryState\":2,\"deliveryCount\":1}]}}")
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(new ShareUpdateKey()