From 80d2a8a42d0976ff859fccc76fd10b7cb4411491 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 24 Jan 2025 13:59:30 +0100 Subject: [PATCH] KAFKA-18616; Refactor DumpLogSegments's MessageParsers (#18688) All the work that we have done to automate and to simplify the coordinator records allows us to simplify the related MessageParsers in DumpLogSegments. They can all share the same based implementation. This is nice because it ensures that we handle all those records similarly. Reviewers: Chia-Ping Tsai --- .../scala/kafka/tools/DumpLogSegments.scala | 187 ++++++------------ .../kafka/tools/DumpLogSegmentsTest.scala | 6 +- 2 files changed, 67 insertions(+), 126 deletions(-) diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index e63be08bfb4..dfa357943c5 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -36,8 +36,9 @@ import org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordT import org.apache.kafka.common.protocol.{ApiMessage, ByteBufferAccessor} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils -import org.apache.kafka.coordinator.group.generated.{CoordinatorRecordJsonConverters => GroupCoordinatorRecordJsonConverters, CoordinatorRecordType => GroupCoordinatorRecordType, GroupMetadataValue, GroupMetadataValueJsonConverter} +import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, GroupMetadataValueJsonConverter, CoordinatorRecordJsonConverters => GroupCoordinatorRecordJsonConverters, CoordinatorRecordType => GroupCoordinatorRecordType} import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde import org.apache.kafka.coordinator.share.generated.{CoordinatorRecordJsonConverters => ShareCoordinatorRecordJsonConverters} @@ -419,17 +420,65 @@ object DumpLogSegments { } } - // Package private for testing. - class OffsetsMessageParser extends MessageParser[String, String] { - private val serde = new GroupCoordinatorRecordSerde() + abstract class CoordinatorRecordMessageParser(serde: CoordinatorRecordSerde) extends MessageParser[String, String] { + override def parse(record: Record): (Option[String], Option[String]) = { + if (!record.hasKey) + throw new RuntimeException(s"Failed to decode message at offset ${record.offset} using the " + + "specified decoder (message had a missing key)") + + try { + val r = serde.deserialize(record.key, record.value) + ( + Some(prepareKey(r.key)), + Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("")) + ) + } catch { + case e: UnknownRecordTypeException => + ( + Some(s"Unknown record type ${e.unknownType} at offset ${record.offset}, skipping."), + None + ) + + case e: Throwable => + ( + Some(s"Error at offset ${record.offset}, skipping. ${e.getMessage}"), + None + ) + } + } private def prepareKey(message: ApiMessage): String = { val json = new ObjectNode(JsonNodeFactory.instance) json.set("type", new TextNode(message.apiKey.toString)) - json.set("data", GroupCoordinatorRecordJsonConverters.writeRecordKeyAsJson(message)) + json.set("data", keyAsJson(message)) json.toString } + private def prepareValue(message: ApiMessage, version: Short): String = { + val json = new ObjectNode(JsonNodeFactory.instance) + json.set("version", new TextNode(version.toString)) + json.set("data", valueAsJson(message, version)) + json.toString + } + + protected def keyAsJson(message: ApiMessage): JsonNode + protected def valueAsJson(message: ApiMessage, version: Short): JsonNode + } + + // Package private for testing. + class OffsetsMessageParser extends CoordinatorRecordMessageParser(new GroupCoordinatorRecordSerde()) { + protected def keyAsJson(message: ApiMessage): JsonNode = { + GroupCoordinatorRecordJsonConverters.writeRecordKeyAsJson(message) + } + + protected def valueAsJson(message: ApiMessage, version: Short): JsonNode = { + if (message.apiKey == GroupCoordinatorRecordType.GROUP_METADATA.id) { + prepareGroupMetadataValue(message.asInstanceOf[GroupMetadataValue], version) + } else { + GroupCoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version) + } + } + private def prepareGroupMetadataValue(message: GroupMetadataValue, version: Short): JsonNode = { val json = GroupMetadataValueJsonConverter.write(message, version) @@ -482,90 +531,16 @@ object DumpLogSegments { json } - - private def prepareValue(message: ApiMessage, version: Short): String = { - val messageAsJson = if (message.apiKey == GroupCoordinatorRecordType.GROUP_METADATA.id) { - prepareGroupMetadataValue(message.asInstanceOf[GroupMetadataValue], version) - } else { - GroupCoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version) - } - - val json = new ObjectNode(JsonNodeFactory.instance) - json.set("version", new TextNode(version.toString)) - json.set("data", messageAsJson) - json.toString - } - - override def parse(record: Record): (Option[String], Option[String]) = { - if (!record.hasKey) - throw new RuntimeException(s"Failed to decode message at offset ${record.offset} using offset " + - "topic decoder (message had a missing key)") - - try { - val r = serde.deserialize(record.key, record.value) - ( - Some(prepareKey(r.key)), - Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("")) - ) - } catch { - case e: UnknownRecordTypeException => - ( - Some(s"Unknown record type ${e.unknownType} at offset ${record.offset}, skipping."), - None - ) - - case e: Throwable => - ( - Some(s"Error at offset ${record.offset}, skipping. ${e.getMessage}"), - None - ) - } - } } // Package private for testing. - class TransactionLogMessageParser extends MessageParser[String, String] { - private val serde = new TransactionCoordinatorRecordSerde() - - private def prepareKey(message: ApiMessage): String = { - val json = new ObjectNode(JsonNodeFactory.instance) - json.set("type", new TextNode(message.apiKey.toString)) - json.set("data", TransactionCoordinatorRecordJsonConverters.writeRecordKeyAsJson(message)) - json.toString + class TransactionLogMessageParser extends CoordinatorRecordMessageParser(new TransactionCoordinatorRecordSerde()) { + override protected def keyAsJson(message: ApiMessage): JsonNode = { + TransactionCoordinatorRecordJsonConverters.writeRecordKeyAsJson(message) } - private def prepareValue(message: ApiMessage, version: Short): String = { - val json = new ObjectNode(JsonNodeFactory.instance) - json.set("version", new TextNode(version.toString)) - json.set("data", TransactionCoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version)) - json.toString - } - - override def parse(record: Record): (Option[String], Option[String]) = { - if (!record.hasKey) - throw new RuntimeException(s"Failed to decode message at offset ${record.offset} using offset " + - "transaction-log decoder (message had a missing key)") - - try { - val r = serde.deserialize(record.key, record.value) - ( - Some(prepareKey(r.key)), - Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("")) - ) - } catch { - case e: UnknownRecordTypeException => - ( - Some(s"Unknown record type ${e.unknownType} at offset ${record.offset}, skipping."), - None - ) - - case e: Throwable => - e.printStackTrace() - ( - Some(s"Error at offset ${record.offset}, skipping. ${e.getMessage}"), - None - ) - } + override protected def valueAsJson(message: ApiMessage, version: Short): JsonNode = { + TransactionCoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version) } } @@ -610,47 +585,13 @@ object DumpLogSegments { } // for test visibility - class ShareGroupStateMessageParser extends MessageParser[String, String] { - private val serde = new ShareCoordinatorRecordSerde() - - private def prepareKey(message: ApiMessage): String = { - val json = new ObjectNode(JsonNodeFactory.instance) - json.set("type", new TextNode(message.apiKey.toString)) - json.set("data", ShareCoordinatorRecordJsonConverters.writeRecordKeyAsJson(message)) - json.toString + class ShareGroupStateMessageParser extends CoordinatorRecordMessageParser(new ShareCoordinatorRecordSerde()) { + override protected def keyAsJson(message: ApiMessage): JsonNode = { + ShareCoordinatorRecordJsonConverters.writeRecordKeyAsJson(message) } - private def prepareValue(message: ApiMessage, version: Short): String = { - val json = new ObjectNode(JsonNodeFactory.instance) - json.set("version", new TextNode(version.toString)) - json.set("data", ShareCoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version)) - json.toString - } - - override def parse(record: Record): (Option[String], Option[String]) = { - if (!record.hasKey) - throw new RuntimeException(s"Failed to decode message at offset ${record.offset} using share group state " + - "topic decoder (message had a missing key)") - - try { - val r = serde.deserialize(record.key, record.value) - ( - Some(prepareKey(r.key)), - Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("")) - ) - } catch { - case e: UnknownRecordTypeException => - ( - Some(s"Unknown record type ${e.unknownType} at offset ${record.offset}, skipping."), - None - ) - - case e: Throwable => - ( - Some(s"Error at offset ${record.offset}, skipping. ${e.getMessage}"), - None - ) - } + override protected def valueAsJson(message: ApiMessage, version: Short): JsonNode = { + ShareCoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version) } } diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index 2e6c1abe114..53d153ea273 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -655,7 +655,7 @@ class DumpLogSegmentsTest { // The key is mandatory. assertEquals( - "Failed to decode message at offset 0 using offset topic decoder (message had a missing key)", + "Failed to decode message at offset 0 using the specified decoder (message had a missing key)", assertThrows( classOf[RuntimeException], () => parser.parse(TestUtils.singletonRecords(key = null, value = null).records.iterator.next) @@ -813,7 +813,7 @@ class DumpLogSegmentsTest { // The key is mandatory. assertEquals( - "Failed to decode message at offset 0 using offset transaction-log decoder (message had a missing key)", + "Failed to decode message at offset 0 using the specified decoder (message had a missing key)", assertThrows( classOf[RuntimeException], () => parser.parse(TestUtils.singletonRecords(key = null, value = null).records.iterator.next) @@ -1044,7 +1044,7 @@ class DumpLogSegmentsTest { // The key is mandatory. assertEquals( - "Failed to decode message at offset 0 using share group state topic decoder (message had a missing key)", + "Failed to decode message at offset 0 using the specified decoder (message had a missing key)", assertThrows( classOf[RuntimeException], () => parser.parse(TestUtils.singletonRecords(key = null, value = null).records.iterator.next)