diff --git a/build.gradle b/build.gradle index f9e905753ac..1bdf8f36ed7 100644 --- a/build.gradle +++ b/build.gradle @@ -1328,6 +1328,8 @@ project(':group-coordinator') { implementation project(':clients') implementation project(':metadata') implementation project(':storage') + implementation libs.jacksonDatabind + implementation libs.jacksonJDK8Datatypes implementation libs.slf4jApi implementation libs.metrics @@ -1364,7 +1366,7 @@ project(':group-coordinator') { args = [ "-p", "org.apache.kafka.coordinator.group.generated", "-o", "src/generated/java/org/apache/kafka/coordinator/group/generated", "-i", "src/main/resources/common/message", - "-m", "MessageDataGenerator" + "-m", "MessageDataGenerator", "JsonConverterGenerator" ] inputs.dir("src/main/resources/common/message") .withPropertyName("messages") diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 46ffe0f1864..fc378eb7dd2 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -260,6 +260,9 @@ + + + diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index b9dda91e25c..6ffe4d71996 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -32,7 +32,6 @@ import kafka.utils.CoreUtils.inLock import kafka.utils.Implicits._ import kafka.utils._ import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.{Metrics, Sensor} import org.apache.kafka.common.metrics.stats.{Avg, Max, Meter} @@ -42,7 +41,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse} import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.common.{KafkaException, MessageFormatter, TopicIdPartition, TopicPartition} +import org.apache.kafka.common.{MessageFormatter, TopicIdPartition, TopicPartition} import org.apache.kafka.coordinator.group.OffsetConfig import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData} import org.apache.kafka.server.common.MetadataVersion @@ -1286,83 +1285,6 @@ object GroupMetadataManager { } } - /** - * Exposed for printing records using [[kafka.tools.DumpLogSegments]] - */ - def formatRecordKeyAndValue(record: Record): (Option[String], Option[String]) = { - if (!record.hasKey) { - throw new KafkaException("Failed to decode message using offset topic decoder (message had a missing key)") - } else { - GroupMetadataManager.readMessageKey(record.key) match { - case offsetKey: OffsetKey => parseOffsets(offsetKey, record.value) - case groupMetadataKey: GroupMetadataKey => parseGroupMetadata(groupMetadataKey, record.value) - case unknownKey: UnknownKey => (Some(s"unknown::version=${unknownKey.version}"), None) - } - } - } - - private def parseOffsets(offsetKey: OffsetKey, payload: ByteBuffer): (Option[String], Option[String]) = { - val groupId = offsetKey.key.group - val topicPartition = offsetKey.key.topicPartition - val keyString = s"offset_commit::group=$groupId,partition=$topicPartition" - - val offset = GroupMetadataManager.readOffsetMessageValue(payload) - val valueString = if (offset == null) { - "" - } else { - if (offset.metadata.isEmpty) - s"offset=${offset.offset}" - else - s"offset=${offset.offset},metadata=${offset.metadata}" - } - - (Some(keyString), Some(valueString)) - } - - private def parseGroupMetadata(groupMetadataKey: GroupMetadataKey, payload: ByteBuffer): (Option[String], Option[String]) = { - val groupId = groupMetadataKey.key - val keyString = s"group_metadata::group=$groupId" - - val group = GroupMetadataManager.readGroupMessageValue(groupId, payload, Time.SYSTEM) - val valueString = if (group == null) - "" - else { - val protocolType = group.protocolType.getOrElse("") - - val assignment = group.allMemberMetadata.map { member => - if (protocolType == ConsumerProtocol.PROTOCOL_TYPE) { - val partitionAssignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) - val userData = Option(partitionAssignment.userData) - .map(Utils.toArray) - .map(hex) - .getOrElse("") - - if (userData.isEmpty) - s"${member.memberId}=${partitionAssignment.partitions}" - else - s"${member.memberId}=${partitionAssignment.partitions}:$userData" - } else { - s"${member.memberId}=${hex(member.assignment)}" - } - }.mkString("{", ",", "}") - - Json.encodeAsString(Map( - "protocolType" -> protocolType, - "protocol" -> group.protocolName.orNull, - "generationId" -> group.generationId, - "assignment" -> assignment - ).asJava) - } - (Some(keyString), Some(valueString)) - } - - private def hex(bytes: Array[Byte]): String = { - if (bytes.isEmpty) - "" - else - "%X".format(BigInt(1, bytes)) - } - def maybeConvertOffsetCommitError(error: Errors) : Errors = { error match { case Errors.NETWORK_EXCEPTION => diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 613bd0d95e1..34c650fed32 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -17,28 +17,38 @@ package kafka.tools +import com.fasterxml.jackson.databind.JsonNode + import java.io._ import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, ObjectNode, TextNode} -import kafka.coordinator.group.GroupMetadataManager import kafka.coordinator.transaction.TransactionLog import kafka.log._ import kafka.serializer.Decoder import kafka.utils._ import kafka.utils.Implicits._ +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol +import org.apache.kafka.common.message.ConsumerProtocolAssignment +import org.apache.kafka.common.message.ConsumerProtocolAssignmentJsonConverter +import org.apache.kafka.common.message.ConsumerProtocolSubscription +import org.apache.kafka.common.message.ConsumerProtocolSubscriptionJsonConverter import org.apache.kafka.common.message.KRaftVersionRecordJsonConverter import org.apache.kafka.common.message.SnapshotFooterRecordJsonConverter import org.apache.kafka.common.message.SnapshotHeaderRecordJsonConverter import org.apache.kafka.common.message.VotersRecordJsonConverter import org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordType} -import org.apache.kafka.common.protocol.ByteBufferAccessor +import org.apache.kafka.common.protocol.{ByteBufferAccessor, Message} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils +import org.apache.kafka.coordinator.group.RecordSerde +import org.apache.kafka.coordinator.group.generated.{ConsumerGroupCurrentMemberAssignmentKey, ConsumerGroupCurrentMemberAssignmentKeyJsonConverter, ConsumerGroupCurrentMemberAssignmentValue, ConsumerGroupCurrentMemberAssignmentValueJsonConverter, ConsumerGroupMemberMetadataKey, ConsumerGroupMemberMetadataKeyJsonConverter, ConsumerGroupMemberMetadataValue, ConsumerGroupMemberMetadataValueJsonConverter, ConsumerGroupMetadataKey, ConsumerGroupMetadataKeyJsonConverter, ConsumerGroupMetadataValue, ConsumerGroupMetadataValueJsonConverter, ConsumerGroupPartitionMetadataKey, ConsumerGroupPartitionMetadataKeyJsonConverter, ConsumerGroupPartitionMetadataValue, ConsumerGroupPartitionMetadataValueJsonConverter, ConsumerGroupTargetAssignmentMemberKey, ConsumerGroupTargetAssignmentMemberKeyJsonConverter, ConsumerGroupTargetAssignmentMemberValue, ConsumerGroupTargetAssignmentMemberValueJsonConverter, ConsumerGroupTargetAssignmentMetadataKey, ConsumerGroupTargetAssignmentMetadataKeyJsonConverter, ConsumerGroupTargetAssignmentMetadataValue, ConsumerGroupTargetAssignmentMetadataValueJsonConverter, GroupMetadataKey, GroupMetadataKeyJsonConverter, GroupMetadataValue, GroupMetadataValueJsonConverter, OffsetCommitKey, OffsetCommitKeyJsonConverter, OffsetCommitValue, OffsetCommitValueJsonConverter} +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.UnknownRecordTypeException import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.metadata.bootstrap.BootstrapDirectory import org.apache.kafka.snapshot.Snapshots import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import org.apache.kafka.storage.internals.log.{CorruptSnapshotException, LogFileUtils, OffsetIndex, ProducerStateManager, TimeIndex, TransactionIndex} +import java.nio.ByteBuffer import scala.jdk.CollectionConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -407,9 +417,141 @@ object DumpLogSegments { } } - private class OffsetsMessageParser extends MessageParser[String, String] { + // Package private for testing. + class OffsetsMessageParser extends MessageParser[String, String] { + private val serde = new RecordSerde() + + private def prepareKey(message: Message, version: Short): String = { + val messageAsJson = message match { + case m: OffsetCommitKey => + OffsetCommitKeyJsonConverter.write(m, version) + case m: GroupMetadataKey => + GroupMetadataKeyJsonConverter.write(m, version) + case m: ConsumerGroupMetadataKey => + ConsumerGroupMetadataKeyJsonConverter.write(m, version) + case m: ConsumerGroupPartitionMetadataKey => + ConsumerGroupPartitionMetadataKeyJsonConverter.write(m, version) + case m: ConsumerGroupMemberMetadataKey => + ConsumerGroupMemberMetadataKeyJsonConverter.write(m, version) + case m: ConsumerGroupTargetAssignmentMetadataKey => + ConsumerGroupTargetAssignmentMetadataKeyJsonConverter.write(m, version) + case m: ConsumerGroupTargetAssignmentMemberKey => + ConsumerGroupTargetAssignmentMemberKeyJsonConverter.write(m, version) + case m: ConsumerGroupCurrentMemberAssignmentKey => + ConsumerGroupCurrentMemberAssignmentKeyJsonConverter.write(m, version) + case _ => throw new UnknownRecordTypeException(version) + } + + val json = new ObjectNode(JsonNodeFactory.instance) + json.set("type", new TextNode(version.toString)) + json.set("data", messageAsJson) + json.toString + } + + private def prepareGroupMetadataValue(message: GroupMetadataValue, version: Short): JsonNode = { + val json = GroupMetadataValueJsonConverter.write(message, version) + + def replace[T]( + node: JsonNode, + field: String, + reader: (org.apache.kafka.common.protocol.Readable, Short) => T, + writer: (T, Short) => JsonNode + ): Unit = { + Option(node.get(field)).foreach { filedNode => + try { + val buffer = ByteBuffer.wrap(filedNode.binaryValue()) + val accessor = new ByteBufferAccessor(buffer) + val version = accessor.readShort + val data = reader(accessor, version) + node.asInstanceOf[ObjectNode].replace(field, writer(data, version)) + } catch { + case _: RuntimeException => // Swallow and keep the original bytes. + } + } + } + + Option(json.get("protocolType")).foreach { protocolTypeNode => + // If the group uses the consumer embedded protocol, we deserialize + // the subscription and the assignment of each member. + if (protocolTypeNode.asText() == ConsumerProtocol.PROTOCOL_TYPE) { + Option(json.get("members")).foreach { membersNode => + if (membersNode.isArray) { + membersNode.forEach { memberNode => + // Replace the subscription field by its deserialized version. + replace( + memberNode, + "subscription", + (readable, version) => new ConsumerProtocolSubscription(readable, version), + ConsumerProtocolSubscriptionJsonConverter.write + ) + + // Replace the assignment field by its deserialized version. + replace( + memberNode, + "assignment", + (readable, version) => new ConsumerProtocolAssignment(readable, version), + ConsumerProtocolAssignmentJsonConverter.write + ) + } + } + } + } + } + + json + } + + private def prepareValue(message: Message, version: Short): String = { + val messageAsJson = message match { + case m: OffsetCommitValue => + OffsetCommitValueJsonConverter.write(m, version) + case m: GroupMetadataValue => + prepareGroupMetadataValue(m, version) + case m: ConsumerGroupMetadataValue => + ConsumerGroupMetadataValueJsonConverter.write(m, version) + case m: ConsumerGroupPartitionMetadataValue => + ConsumerGroupPartitionMetadataValueJsonConverter.write(m, version) + case m: ConsumerGroupMemberMetadataValue => + ConsumerGroupMemberMetadataValueJsonConverter.write(m, version) + case m: ConsumerGroupTargetAssignmentMetadataValue => + ConsumerGroupTargetAssignmentMetadataValueJsonConverter.write(m, version) + case m: ConsumerGroupTargetAssignmentMemberValue => + ConsumerGroupTargetAssignmentMemberValueJsonConverter.write(m, version) + case m: ConsumerGroupCurrentMemberAssignmentValue => + ConsumerGroupCurrentMemberAssignmentValueJsonConverter.write(m, version) + case _ => throw new IllegalStateException(s"Message value ${message.getClass.getSimpleName} is not supported.") + } + + val json = new ObjectNode(JsonNodeFactory.instance) + json.set("version", new TextNode(version.toString)) + json.set("data", messageAsJson) + json.toString + } + override def parse(record: Record): (Option[String], Option[String]) = { - GroupMetadataManager.formatRecordKeyAndValue(record) + if (!record.hasKey) + throw new RuntimeException(s"Failed to decode message at offset ${record.offset} using offset " + + "topic decoder (message had a missing key)") + + try { + val r = serde.deserialize(record.key, record.value) + ( + Some(prepareKey(r.key.message, r.key.version)), + Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("")) + ) + } catch { + case e: UnknownRecordTypeException => + ( + Some(s"Unknown record type ${e.unknownType} at offset ${record.offset}, skipping."), + None + ) + + case e: Throwable => + ( + Some(s"Error at offset ${record.offset}, skipping. ${e.getMessage}"), + None + ) + } } } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 50e3c27069d..afcd2770ee8 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -2766,62 +2766,6 @@ class GroupMetadataManagerTest { } } - @Test - def testCommittedOffsetParsing(): Unit = { - val groupId = "group" - val topicPartition = new TopicPartition("topic", 0) - val offsetCommitRecord = TestUtils.records(Seq( - new SimpleRecord( - GroupMetadataManager.offsetCommitKey(groupId, topicPartition), - GroupMetadataManager.offsetCommitValue(OffsetAndMetadata(35L, "", time.milliseconds()), MetadataVersion.latestTesting) - ) - )).records.asScala.head - val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(offsetCommitRecord) - assertEquals(Some(s"offset_commit::group=$groupId,partition=$topicPartition"), keyStringOpt) - assertEquals(Some("offset=35"), valueStringOpt) - } - - @Test - def testCommittedOffsetTombstoneParsing(): Unit = { - val groupId = "group" - val topicPartition = new TopicPartition("topic", 0) - val offsetCommitRecord = TestUtils.records(Seq( - new SimpleRecord(GroupMetadataManager.offsetCommitKey(groupId, topicPartition), null) - )).records.asScala.head - val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(offsetCommitRecord) - assertEquals(Some(s"offset_commit::group=$groupId,partition=$topicPartition"), keyStringOpt) - assertEquals(Some(""), valueStringOpt) - } - - @Test - def testGroupMetadataParsingWithNullUserData(): Unit = { - val generation = 935 - val protocolType = "consumer" - val protocol = "range" - val memberId = "98098230493" - val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment( - new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null) - )) - val groupMetadataRecord = TestUtils.records(Seq( - buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes) - )).records.asScala.head - val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(groupMetadataRecord) - assertEquals(Some(s"group_metadata::group=$groupId"), keyStringOpt) - assertEquals(Some("{\"protocolType\":\"consumer\",\"protocol\":\"range\"," + - "\"generationId\":935,\"assignment\":\"{98098230493=[topic-0]}\"}"), valueStringOpt) - } - - @Test - def testGroupMetadataTombstoneParsing(): Unit = { - val groupId = "group" - val groupMetadataRecord = TestUtils.records(Seq( - new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null) - )).records.asScala.head - val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(groupMetadataRecord) - assertEquals(Some(s"group_metadata::group=$groupId"), keyStringOpt) - assertEquals(Some(""), valueStringOpt) - } - private def verifyAppendAndCaptureCallback(): ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = { val capturedArgument: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) verify(replicaManager).appendRecords(anyLong(), diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index 1d710be7aa5..19b54b0ebfa 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -20,20 +20,26 @@ package kafka.tools import java.io.{ByteArrayOutputStream, File, PrintWriter} import java.nio.ByteBuffer import java.util +import java.util.Collections import java.util.Optional import java.util.Arrays import java.util.Properties import kafka.log.{LogTestUtils, UnifiedLog} import kafka.raft.{KafkaMetadataLog, MetadataLogConfig} import kafka.server.{BrokerTopicStats, KafkaRaftServer} -import kafka.tools.DumpLogSegments.TimeIndexDumpErrors +import kafka.tools.DumpLogSegments.{OffsetsMessageParser, TimeIndexDumpErrors} import kafka.utils.TestUtils -import org.apache.kafka.common.Uuid +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.{Assignment, Subscription} +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol +import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRecord, TopicRecord} import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache} -import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, MemoryRecords, RecordVersion, SimpleRecord} +import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, MemoryRecords, Record, RecordVersion, SimpleRecord} import org.apache.kafka.common.utils.Utils +import org.apache.kafka.coordinator.group +import org.apache.kafka.coordinator.group.RecordSerde +import org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue, ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey, GroupMetadataValue} import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch} @@ -403,6 +409,194 @@ class DumpLogSegmentsTest { assertEquals(partialBatches, partialBatchesCount) } + @Test + def testOffsetsMessageParser(): Unit = { + val serde = new RecordSerde() + val parser = new OffsetsMessageParser() + + def serializedRecord(key: ApiMessageAndVersion, value: ApiMessageAndVersion): Record = { + val record = new group.Record(key, value) + TestUtils.singletonRecords( + key = serde.serializeKey(record), + value = serde.serializeValue(record) + ).records.iterator.next + } + + // The key is mandatory. + assertEquals( + "Failed to decode message at offset 0 using offset topic decoder (message had a missing key)", + assertThrows( + classOf[RuntimeException], + () => parser.parse(TestUtils.singletonRecords(key = null, value = null).records.iterator.next) + ).getMessage + ) + + // A valid key and value should work. + assertEquals( + ( + Some("{\"type\":\"3\",\"data\":{\"groupId\":\"group\"}}"), + Some("{\"version\":\"0\",\"data\":{\"epoch\":10}}") + ), + parser.parse(serializedRecord( + new ApiMessageAndVersion( + new ConsumerGroupMetadataKey() + .setGroupId("group"), + 3.toShort + ), + new ApiMessageAndVersion( + new ConsumerGroupMetadataValue() + .setEpoch(10), + 0.toShort + ) + )) + ) + + // Consumer embedded protocol is parsed if possible. + assertEquals( + ( + Some("{\"type\":\"2\",\"data\":{\"group\":\"group\"}}"), + Some("{\"version\":\"4\",\"data\":{\"protocolType\":\"consumer\",\"generation\":10,\"protocol\":\"range\"," + + "\"leader\":\"member\",\"currentStateTimestamp\":-1,\"members\":[{\"memberId\":\"member\"," + + "\"groupInstanceId\":\"instance\",\"clientId\":\"client\",\"clientHost\":\"host\"," + + "\"rebalanceTimeout\":1000,\"sessionTimeout\":100,\"subscription\":{\"topics\":[\"foo\"]," + + "\"userData\":null,\"ownedPartitions\":[{\"topic\":\"foo\",\"partitions\":[0]}]," + + "\"generationId\":0,\"rackId\":\"rack\"},\"assignment\":{\"assignedPartitions\":" + + "[{\"topic\":\"foo\",\"partitions\":[0]}],\"userData\":null}}]}}") + ), + parser.parse(serializedRecord( + new ApiMessageAndVersion( + new GroupMetadataKey() + .setGroup("group"), + 2.toShort + ), + new ApiMessageAndVersion( + new GroupMetadataValue() + .setProtocolType("consumer") + .setProtocol("range") + .setLeader("member") + .setGeneration(10) + .setMembers(Collections.singletonList( + new GroupMetadataValue.MemberMetadata() + .setMemberId("member") + .setClientId("client") + .setClientHost("host") + .setGroupInstanceId("instance") + .setSessionTimeout(100) + .setRebalanceTimeout(1000) + .setSubscription(Utils.toArray(ConsumerProtocol.serializeSubscription( + new Subscription( + Collections.singletonList("foo"), + null, + Collections.singletonList(new TopicPartition("foo", 0)), + 0, + Optional.of("rack"))))) + .setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment( + new Assignment(Collections.singletonList(new TopicPartition("foo", 0)))))) + )), + GroupMetadataValue.HIGHEST_SUPPORTED_VERSION + ) + )) + ) + + // Consumer embedded protocol is not parsed if malformed. + assertEquals( + ( + Some("{\"type\":\"2\",\"data\":{\"group\":\"group\"}}"), + Some("{\"version\":\"4\",\"data\":{\"protocolType\":\"consumer\",\"generation\":10,\"protocol\":\"range\"," + + "\"leader\":\"member\",\"currentStateTimestamp\":-1,\"members\":[{\"memberId\":\"member\"," + + "\"groupInstanceId\":\"instance\",\"clientId\":\"client\",\"clientHost\":\"host\"," + + "\"rebalanceTimeout\":1000,\"sessionTimeout\":100,\"subscription\":\"U3Vic2NyaXB0aW9u\"," + + "\"assignment\":\"QXNzaWdubWVudA==\"}]}}") + ), + parser.parse(serializedRecord( + new ApiMessageAndVersion( + new GroupMetadataKey() + .setGroup("group"), + 2.toShort + ), + new ApiMessageAndVersion( + new GroupMetadataValue() + .setProtocolType("consumer") + .setProtocol("range") + .setLeader("member") + .setGeneration(10) + .setMembers(Collections.singletonList( + new GroupMetadataValue.MemberMetadata() + .setMemberId("member") + .setClientId("client") + .setClientHost("host") + .setGroupInstanceId("instance") + .setSessionTimeout(100) + .setRebalanceTimeout(1000) + .setSubscription("Subscription".getBytes) + .setAssignment("Assignment".getBytes) + )), + GroupMetadataValue.HIGHEST_SUPPORTED_VERSION + ) + )) + ) + + // A valid key with a tombstone should work. + assertEquals( + ( + Some("{\"type\":\"3\",\"data\":{\"groupId\":\"group\"}}"), + Some("") + ), + parser.parse(serializedRecord( + new ApiMessageAndVersion( + new ConsumerGroupMetadataKey() + .setGroupId("group"), + 3.toShort + ), + null + )) + ) + + // An unknown record type should be handled and reported as such. + assertEquals( + ( + Some( + "Unknown record type 32767 at offset 0, skipping." + ), + None + ), + parser.parse(serializedRecord( + new ApiMessageAndVersion( + new ConsumerGroupMetadataKey() + .setGroupId("group"), + Short.MaxValue // Invalid record id. + ), + new ApiMessageAndVersion( + new ConsumerGroupMetadataValue() + .setEpoch(10), + 0.toShort + ) + )) + ) + + // Any parsing error is swallowed and reported. + assertEquals( + ( + Some( + "Error at offset 0, skipping. Could not read record with version 0 from value's buffer due to: " + + "Error reading byte array of 536870911 byte(s): only 1 byte(s) available." + ), + None + ), + parser.parse(serializedRecord( + new ApiMessageAndVersion( + new ConsumerGroupMetadataKey() + .setGroupId("group"), + 3.toShort + ), + new ApiMessageAndVersion( + new ConsumerGroupMemberMetadataValue(), // The value does correspond to the record id. + 0.toShort + ) + )) + ) + } + private def readBatchMetadata(lines: util.ListIterator[String]): Option[String] = { while (lines.hasNext) { val line = lines.next() @@ -536,5 +730,4 @@ class DumpLogSegmentsTest { } } } - }