KAFKA-18616; Refactor DumpLogSegments's MessageParsers (#18688)

All the work that we have done to automate and to simplify the coordinator records allows us to simplify the related MessageParsers in DumpLogSegments. They can all share the same based implementation. This is nice because it ensures that we handle all those records similarly.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Jacot 2025-01-24 13:59:30 +01:00 committed by GitHub
parent 17846fe743
commit 80d2a8a42d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 67 additions and 126 deletions

View File

@ -36,8 +36,9 @@ import org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordT
import org.apache.kafka.common.protocol.{ApiMessage, ByteBufferAccessor} import org.apache.kafka.common.protocol.{ApiMessage, ByteBufferAccessor}
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.generated.{CoordinatorRecordJsonConverters => GroupCoordinatorRecordJsonConverters, CoordinatorRecordType => GroupCoordinatorRecordType, GroupMetadataValue, GroupMetadataValueJsonConverter} import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, GroupMetadataValueJsonConverter, CoordinatorRecordJsonConverters => GroupCoordinatorRecordJsonConverters, CoordinatorRecordType => GroupCoordinatorRecordType}
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.generated.{CoordinatorRecordJsonConverters => ShareCoordinatorRecordJsonConverters} import org.apache.kafka.coordinator.share.generated.{CoordinatorRecordJsonConverters => ShareCoordinatorRecordJsonConverters}
@ -419,17 +420,65 @@ object DumpLogSegments {
} }
} }
// Package private for testing. abstract class CoordinatorRecordMessageParser(serde: CoordinatorRecordSerde) extends MessageParser[String, String] {
class OffsetsMessageParser extends MessageParser[String, String] { override def parse(record: Record): (Option[String], Option[String]) = {
private val serde = new GroupCoordinatorRecordSerde() if (!record.hasKey)
throw new RuntimeException(s"Failed to decode message at offset ${record.offset} using the " +
"specified decoder (message had a missing key)")
try {
val r = serde.deserialize(record.key, record.value)
(
Some(prepareKey(r.key)),
Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("<DELETE>"))
)
} catch {
case e: UnknownRecordTypeException =>
(
Some(s"Unknown record type ${e.unknownType} at offset ${record.offset}, skipping."),
None
)
case e: Throwable =>
(
Some(s"Error at offset ${record.offset}, skipping. ${e.getMessage}"),
None
)
}
}
private def prepareKey(message: ApiMessage): String = { private def prepareKey(message: ApiMessage): String = {
val json = new ObjectNode(JsonNodeFactory.instance) val json = new ObjectNode(JsonNodeFactory.instance)
json.set("type", new TextNode(message.apiKey.toString)) json.set("type", new TextNode(message.apiKey.toString))
json.set("data", GroupCoordinatorRecordJsonConverters.writeRecordKeyAsJson(message)) json.set("data", keyAsJson(message))
json.toString json.toString
} }
private def prepareValue(message: ApiMessage, version: Short): String = {
val json = new ObjectNode(JsonNodeFactory.instance)
json.set("version", new TextNode(version.toString))
json.set("data", valueAsJson(message, version))
json.toString
}
protected def keyAsJson(message: ApiMessage): JsonNode
protected def valueAsJson(message: ApiMessage, version: Short): JsonNode
}
// Package private for testing.
class OffsetsMessageParser extends CoordinatorRecordMessageParser(new GroupCoordinatorRecordSerde()) {
protected def keyAsJson(message: ApiMessage): JsonNode = {
GroupCoordinatorRecordJsonConverters.writeRecordKeyAsJson(message)
}
protected def valueAsJson(message: ApiMessage, version: Short): JsonNode = {
if (message.apiKey == GroupCoordinatorRecordType.GROUP_METADATA.id) {
prepareGroupMetadataValue(message.asInstanceOf[GroupMetadataValue], version)
} else {
GroupCoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version)
}
}
private def prepareGroupMetadataValue(message: GroupMetadataValue, version: Short): JsonNode = { private def prepareGroupMetadataValue(message: GroupMetadataValue, version: Short): JsonNode = {
val json = GroupMetadataValueJsonConverter.write(message, version) val json = GroupMetadataValueJsonConverter.write(message, version)
@ -482,90 +531,16 @@ object DumpLogSegments {
json json
} }
private def prepareValue(message: ApiMessage, version: Short): String = {
val messageAsJson = if (message.apiKey == GroupCoordinatorRecordType.GROUP_METADATA.id) {
prepareGroupMetadataValue(message.asInstanceOf[GroupMetadataValue], version)
} else {
GroupCoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version)
}
val json = new ObjectNode(JsonNodeFactory.instance)
json.set("version", new TextNode(version.toString))
json.set("data", messageAsJson)
json.toString
}
override def parse(record: Record): (Option[String], Option[String]) = {
if (!record.hasKey)
throw new RuntimeException(s"Failed to decode message at offset ${record.offset} using offset " +
"topic decoder (message had a missing key)")
try {
val r = serde.deserialize(record.key, record.value)
(
Some(prepareKey(r.key)),
Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("<DELETE>"))
)
} catch {
case e: UnknownRecordTypeException =>
(
Some(s"Unknown record type ${e.unknownType} at offset ${record.offset}, skipping."),
None
)
case e: Throwable =>
(
Some(s"Error at offset ${record.offset}, skipping. ${e.getMessage}"),
None
)
}
}
} }
// Package private for testing. // Package private for testing.
class TransactionLogMessageParser extends MessageParser[String, String] { class TransactionLogMessageParser extends CoordinatorRecordMessageParser(new TransactionCoordinatorRecordSerde()) {
private val serde = new TransactionCoordinatorRecordSerde() override protected def keyAsJson(message: ApiMessage): JsonNode = {
TransactionCoordinatorRecordJsonConverters.writeRecordKeyAsJson(message)
private def prepareKey(message: ApiMessage): String = {
val json = new ObjectNode(JsonNodeFactory.instance)
json.set("type", new TextNode(message.apiKey.toString))
json.set("data", TransactionCoordinatorRecordJsonConverters.writeRecordKeyAsJson(message))
json.toString
} }
private def prepareValue(message: ApiMessage, version: Short): String = { override protected def valueAsJson(message: ApiMessage, version: Short): JsonNode = {
val json = new ObjectNode(JsonNodeFactory.instance) TransactionCoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version)
json.set("version", new TextNode(version.toString))
json.set("data", TransactionCoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version))
json.toString
}
override def parse(record: Record): (Option[String], Option[String]) = {
if (!record.hasKey)
throw new RuntimeException(s"Failed to decode message at offset ${record.offset} using offset " +
"transaction-log decoder (message had a missing key)")
try {
val r = serde.deserialize(record.key, record.value)
(
Some(prepareKey(r.key)),
Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("<DELETE>"))
)
} catch {
case e: UnknownRecordTypeException =>
(
Some(s"Unknown record type ${e.unknownType} at offset ${record.offset}, skipping."),
None
)
case e: Throwable =>
e.printStackTrace()
(
Some(s"Error at offset ${record.offset}, skipping. ${e.getMessage}"),
None
)
}
} }
} }
@ -610,47 +585,13 @@ object DumpLogSegments {
} }
// for test visibility // for test visibility
class ShareGroupStateMessageParser extends MessageParser[String, String] { class ShareGroupStateMessageParser extends CoordinatorRecordMessageParser(new ShareCoordinatorRecordSerde()) {
private val serde = new ShareCoordinatorRecordSerde() override protected def keyAsJson(message: ApiMessage): JsonNode = {
ShareCoordinatorRecordJsonConverters.writeRecordKeyAsJson(message)
private def prepareKey(message: ApiMessage): String = {
val json = new ObjectNode(JsonNodeFactory.instance)
json.set("type", new TextNode(message.apiKey.toString))
json.set("data", ShareCoordinatorRecordJsonConverters.writeRecordKeyAsJson(message))
json.toString
} }
private def prepareValue(message: ApiMessage, version: Short): String = { override protected def valueAsJson(message: ApiMessage, version: Short): JsonNode = {
val json = new ObjectNode(JsonNodeFactory.instance) ShareCoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version)
json.set("version", new TextNode(version.toString))
json.set("data", ShareCoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version))
json.toString
}
override def parse(record: Record): (Option[String], Option[String]) = {
if (!record.hasKey)
throw new RuntimeException(s"Failed to decode message at offset ${record.offset} using share group state " +
"topic decoder (message had a missing key)")
try {
val r = serde.deserialize(record.key, record.value)
(
Some(prepareKey(r.key)),
Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("<DELETE>"))
)
} catch {
case e: UnknownRecordTypeException =>
(
Some(s"Unknown record type ${e.unknownType} at offset ${record.offset}, skipping."),
None
)
case e: Throwable =>
(
Some(s"Error at offset ${record.offset}, skipping. ${e.getMessage}"),
None
)
}
} }
} }

View File

@ -655,7 +655,7 @@ class DumpLogSegmentsTest {
// The key is mandatory. // The key is mandatory.
assertEquals( assertEquals(
"Failed to decode message at offset 0 using offset topic decoder (message had a missing key)", "Failed to decode message at offset 0 using the specified decoder (message had a missing key)",
assertThrows( assertThrows(
classOf[RuntimeException], classOf[RuntimeException],
() => parser.parse(TestUtils.singletonRecords(key = null, value = null).records.iterator.next) () => parser.parse(TestUtils.singletonRecords(key = null, value = null).records.iterator.next)
@ -813,7 +813,7 @@ class DumpLogSegmentsTest {
// The key is mandatory. // The key is mandatory.
assertEquals( assertEquals(
"Failed to decode message at offset 0 using offset transaction-log decoder (message had a missing key)", "Failed to decode message at offset 0 using the specified decoder (message had a missing key)",
assertThrows( assertThrows(
classOf[RuntimeException], classOf[RuntimeException],
() => parser.parse(TestUtils.singletonRecords(key = null, value = null).records.iterator.next) () => parser.parse(TestUtils.singletonRecords(key = null, value = null).records.iterator.next)
@ -1044,7 +1044,7 @@ class DumpLogSegmentsTest {
// The key is mandatory. // The key is mandatory.
assertEquals( assertEquals(
"Failed to decode message at offset 0 using share group state topic decoder (message had a missing key)", "Failed to decode message at offset 0 using the specified decoder (message had a missing key)",
assertThrows( assertThrows(
classOf[RuntimeException], classOf[RuntimeException],
() => parser.parse(TestUtils.singletonRecords(key = null, value = null).records.iterator.next) () => parser.parse(TestUtils.singletonRecords(key = null, value = null).records.iterator.next)