KAFKA-16470 kafka-dump-log --offsets-decoder should support new records (#15652)

When the consumer group protocol is used in a cluster, it is, at the moment, impossible to see all records stored in the __consumer_offsets topic with kafka-dump-log --offsets-decoder. It does not know how to handle all the new records.

This patch refactors the OffsetsMessageParser used internally by kafka-dump-log to use the RecordSerde used by the new group coordinator. It ensures that the tool is always in sync with the coordinator implementation. The patch also changes the format to using the toString'ed representations of the records instead of having custom logic to dump them. It ensures that all the information is always dumped. The downside of the latest is that inner byte arrays (e.g. assignment in the classic protocol) are no longer deserialized. Personally, I feel like that it is acceptable and it is actually better to stay as close as possible to the actual records in this tool. It also avoids issues like https://issues.apache.org/jira/browse/KAFKA-15603.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Jacot 2024-05-07 02:49:31 +02:00 committed by GitHub
parent fe8ccbc92c
commit 0df340d64d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 350 additions and 144 deletions

View File

@ -1328,6 +1328,8 @@ project(':group-coordinator') {
implementation project(':clients')
implementation project(':metadata')
implementation project(':storage')
implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes
implementation libs.slf4jApi
implementation libs.metrics
@ -1364,7 +1366,7 @@ project(':group-coordinator') {
args = [ "-p", "org.apache.kafka.coordinator.group.generated",
"-o", "src/generated/java/org/apache/kafka/coordinator/group/generated",
"-i", "src/main/resources/common/message",
"-m", "MessageDataGenerator"
"-m", "MessageDataGenerator", "JsonConverterGenerator"
]
inputs.dir("src/main/resources/common/message")
.withPropertyName("messages")

View File

@ -260,6 +260,9 @@
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
<subpackage name="generated">
<allow pkg="com.fasterxml.jackson" />
</subpackage>
</subpackage>
</subpackage>

View File

@ -32,7 +32,6 @@ import kafka.utils.CoreUtils.inLock
import kafka.utils.Implicits._
import kafka.utils._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.{Metrics, Sensor}
import org.apache.kafka.common.metrics.stats.{Avg, Max, Meter}
@ -42,7 +41,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, MessageFormatter, TopicIdPartition, TopicPartition}
import org.apache.kafka.common.{MessageFormatter, TopicIdPartition, TopicPartition}
import org.apache.kafka.coordinator.group.OffsetConfig
import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData}
import org.apache.kafka.server.common.MetadataVersion
@ -1286,83 +1285,6 @@ object GroupMetadataManager {
}
}
/**
* Exposed for printing records using [[kafka.tools.DumpLogSegments]]
*/
def formatRecordKeyAndValue(record: Record): (Option[String], Option[String]) = {
if (!record.hasKey) {
throw new KafkaException("Failed to decode message using offset topic decoder (message had a missing key)")
} else {
GroupMetadataManager.readMessageKey(record.key) match {
case offsetKey: OffsetKey => parseOffsets(offsetKey, record.value)
case groupMetadataKey: GroupMetadataKey => parseGroupMetadata(groupMetadataKey, record.value)
case unknownKey: UnknownKey => (Some(s"unknown::version=${unknownKey.version}"), None)
}
}
}
private def parseOffsets(offsetKey: OffsetKey, payload: ByteBuffer): (Option[String], Option[String]) = {
val groupId = offsetKey.key.group
val topicPartition = offsetKey.key.topicPartition
val keyString = s"offset_commit::group=$groupId,partition=$topicPartition"
val offset = GroupMetadataManager.readOffsetMessageValue(payload)
val valueString = if (offset == null) {
"<DELETE>"
} else {
if (offset.metadata.isEmpty)
s"offset=${offset.offset}"
else
s"offset=${offset.offset},metadata=${offset.metadata}"
}
(Some(keyString), Some(valueString))
}
private def parseGroupMetadata(groupMetadataKey: GroupMetadataKey, payload: ByteBuffer): (Option[String], Option[String]) = {
val groupId = groupMetadataKey.key
val keyString = s"group_metadata::group=$groupId"
val group = GroupMetadataManager.readGroupMessageValue(groupId, payload, Time.SYSTEM)
val valueString = if (group == null)
"<DELETE>"
else {
val protocolType = group.protocolType.getOrElse("")
val assignment = group.allMemberMetadata.map { member =>
if (protocolType == ConsumerProtocol.PROTOCOL_TYPE) {
val partitionAssignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
val userData = Option(partitionAssignment.userData)
.map(Utils.toArray)
.map(hex)
.getOrElse("")
if (userData.isEmpty)
s"${member.memberId}=${partitionAssignment.partitions}"
else
s"${member.memberId}=${partitionAssignment.partitions}:$userData"
} else {
s"${member.memberId}=${hex(member.assignment)}"
}
}.mkString("{", ",", "}")
Json.encodeAsString(Map(
"protocolType" -> protocolType,
"protocol" -> group.protocolName.orNull,
"generationId" -> group.generationId,
"assignment" -> assignment
).asJava)
}
(Some(keyString), Some(valueString))
}
private def hex(bytes: Array[Byte]): String = {
if (bytes.isEmpty)
""
else
"%X".format(BigInt(1, bytes))
}
def maybeConvertOffsetCommitError(error: Errors) : Errors = {
error match {
case Errors.NETWORK_EXCEPTION =>

View File

@ -17,28 +17,38 @@
package kafka.tools
import com.fasterxml.jackson.databind.JsonNode
import java.io._
import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, ObjectNode, TextNode}
import kafka.coordinator.group.GroupMetadataManager
import kafka.coordinator.transaction.TransactionLog
import kafka.log._
import kafka.serializer.Decoder
import kafka.utils._
import kafka.utils.Implicits._
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.message.ConsumerProtocolAssignment
import org.apache.kafka.common.message.ConsumerProtocolAssignmentJsonConverter
import org.apache.kafka.common.message.ConsumerProtocolSubscription
import org.apache.kafka.common.message.ConsumerProtocolSubscriptionJsonConverter
import org.apache.kafka.common.message.KRaftVersionRecordJsonConverter
import org.apache.kafka.common.message.SnapshotFooterRecordJsonConverter
import org.apache.kafka.common.message.SnapshotHeaderRecordJsonConverter
import org.apache.kafka.common.message.VotersRecordJsonConverter
import org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordType}
import org.apache.kafka.common.protocol.ByteBufferAccessor
import org.apache.kafka.common.protocol.{ByteBufferAccessor, Message}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.RecordSerde
import org.apache.kafka.coordinator.group.generated.{ConsumerGroupCurrentMemberAssignmentKey, ConsumerGroupCurrentMemberAssignmentKeyJsonConverter, ConsumerGroupCurrentMemberAssignmentValue, ConsumerGroupCurrentMemberAssignmentValueJsonConverter, ConsumerGroupMemberMetadataKey, ConsumerGroupMemberMetadataKeyJsonConverter, ConsumerGroupMemberMetadataValue, ConsumerGroupMemberMetadataValueJsonConverter, ConsumerGroupMetadataKey, ConsumerGroupMetadataKeyJsonConverter, ConsumerGroupMetadataValue, ConsumerGroupMetadataValueJsonConverter, ConsumerGroupPartitionMetadataKey, ConsumerGroupPartitionMetadataKeyJsonConverter, ConsumerGroupPartitionMetadataValue, ConsumerGroupPartitionMetadataValueJsonConverter, ConsumerGroupTargetAssignmentMemberKey, ConsumerGroupTargetAssignmentMemberKeyJsonConverter, ConsumerGroupTargetAssignmentMemberValue, ConsumerGroupTargetAssignmentMemberValueJsonConverter, ConsumerGroupTargetAssignmentMetadataKey, ConsumerGroupTargetAssignmentMetadataKeyJsonConverter, ConsumerGroupTargetAssignmentMetadataValue, ConsumerGroupTargetAssignmentMetadataValueJsonConverter, GroupMetadataKey, GroupMetadataKeyJsonConverter, GroupMetadataValue, GroupMetadataValueJsonConverter, OffsetCommitKey, OffsetCommitKeyJsonConverter, OffsetCommitValue, OffsetCommitValueJsonConverter}
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.UnknownRecordTypeException
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
import org.apache.kafka.snapshot.Snapshots
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.{CorruptSnapshotException, LogFileUtils, OffsetIndex, ProducerStateManager, TimeIndex, TransactionIndex}
import java.nio.ByteBuffer
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@ -407,9 +417,141 @@ object DumpLogSegments {
}
}
private class OffsetsMessageParser extends MessageParser[String, String] {
// Package private for testing.
class OffsetsMessageParser extends MessageParser[String, String] {
private val serde = new RecordSerde()
private def prepareKey(message: Message, version: Short): String = {
val messageAsJson = message match {
case m: OffsetCommitKey =>
OffsetCommitKeyJsonConverter.write(m, version)
case m: GroupMetadataKey =>
GroupMetadataKeyJsonConverter.write(m, version)
case m: ConsumerGroupMetadataKey =>
ConsumerGroupMetadataKeyJsonConverter.write(m, version)
case m: ConsumerGroupPartitionMetadataKey =>
ConsumerGroupPartitionMetadataKeyJsonConverter.write(m, version)
case m: ConsumerGroupMemberMetadataKey =>
ConsumerGroupMemberMetadataKeyJsonConverter.write(m, version)
case m: ConsumerGroupTargetAssignmentMetadataKey =>
ConsumerGroupTargetAssignmentMetadataKeyJsonConverter.write(m, version)
case m: ConsumerGroupTargetAssignmentMemberKey =>
ConsumerGroupTargetAssignmentMemberKeyJsonConverter.write(m, version)
case m: ConsumerGroupCurrentMemberAssignmentKey =>
ConsumerGroupCurrentMemberAssignmentKeyJsonConverter.write(m, version)
case _ => throw new UnknownRecordTypeException(version)
}
val json = new ObjectNode(JsonNodeFactory.instance)
json.set("type", new TextNode(version.toString))
json.set("data", messageAsJson)
json.toString
}
private def prepareGroupMetadataValue(message: GroupMetadataValue, version: Short): JsonNode = {
val json = GroupMetadataValueJsonConverter.write(message, version)
def replace[T](
node: JsonNode,
field: String,
reader: (org.apache.kafka.common.protocol.Readable, Short) => T,
writer: (T, Short) => JsonNode
): Unit = {
Option(node.get(field)).foreach { filedNode =>
try {
val buffer = ByteBuffer.wrap(filedNode.binaryValue())
val accessor = new ByteBufferAccessor(buffer)
val version = accessor.readShort
val data = reader(accessor, version)
node.asInstanceOf[ObjectNode].replace(field, writer(data, version))
} catch {
case _: RuntimeException => // Swallow and keep the original bytes.
}
}
}
Option(json.get("protocolType")).foreach { protocolTypeNode =>
// If the group uses the consumer embedded protocol, we deserialize
// the subscription and the assignment of each member.
if (protocolTypeNode.asText() == ConsumerProtocol.PROTOCOL_TYPE) {
Option(json.get("members")).foreach { membersNode =>
if (membersNode.isArray) {
membersNode.forEach { memberNode =>
// Replace the subscription field by its deserialized version.
replace(
memberNode,
"subscription",
(readable, version) => new ConsumerProtocolSubscription(readable, version),
ConsumerProtocolSubscriptionJsonConverter.write
)
// Replace the assignment field by its deserialized version.
replace(
memberNode,
"assignment",
(readable, version) => new ConsumerProtocolAssignment(readable, version),
ConsumerProtocolAssignmentJsonConverter.write
)
}
}
}
}
}
json
}
private def prepareValue(message: Message, version: Short): String = {
val messageAsJson = message match {
case m: OffsetCommitValue =>
OffsetCommitValueJsonConverter.write(m, version)
case m: GroupMetadataValue =>
prepareGroupMetadataValue(m, version)
case m: ConsumerGroupMetadataValue =>
ConsumerGroupMetadataValueJsonConverter.write(m, version)
case m: ConsumerGroupPartitionMetadataValue =>
ConsumerGroupPartitionMetadataValueJsonConverter.write(m, version)
case m: ConsumerGroupMemberMetadataValue =>
ConsumerGroupMemberMetadataValueJsonConverter.write(m, version)
case m: ConsumerGroupTargetAssignmentMetadataValue =>
ConsumerGroupTargetAssignmentMetadataValueJsonConverter.write(m, version)
case m: ConsumerGroupTargetAssignmentMemberValue =>
ConsumerGroupTargetAssignmentMemberValueJsonConverter.write(m, version)
case m: ConsumerGroupCurrentMemberAssignmentValue =>
ConsumerGroupCurrentMemberAssignmentValueJsonConverter.write(m, version)
case _ => throw new IllegalStateException(s"Message value ${message.getClass.getSimpleName} is not supported.")
}
val json = new ObjectNode(JsonNodeFactory.instance)
json.set("version", new TextNode(version.toString))
json.set("data", messageAsJson)
json.toString
}
override def parse(record: Record): (Option[String], Option[String]) = {
GroupMetadataManager.formatRecordKeyAndValue(record)
if (!record.hasKey)
throw new RuntimeException(s"Failed to decode message at offset ${record.offset} using offset " +
"topic decoder (message had a missing key)")
try {
val r = serde.deserialize(record.key, record.value)
(
Some(prepareKey(r.key.message, r.key.version)),
Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("<DELETE>"))
)
} catch {
case e: UnknownRecordTypeException =>
(
Some(s"Unknown record type ${e.unknownType} at offset ${record.offset}, skipping."),
None
)
case e: Throwable =>
(
Some(s"Error at offset ${record.offset}, skipping. ${e.getMessage}"),
None
)
}
}
}

View File

@ -2766,62 +2766,6 @@ class GroupMetadataManagerTest {
}
}
@Test
def testCommittedOffsetParsing(): Unit = {
val groupId = "group"
val topicPartition = new TopicPartition("topic", 0)
val offsetCommitRecord = TestUtils.records(Seq(
new SimpleRecord(
GroupMetadataManager.offsetCommitKey(groupId, topicPartition),
GroupMetadataManager.offsetCommitValue(OffsetAndMetadata(35L, "", time.milliseconds()), MetadataVersion.latestTesting)
)
)).records.asScala.head
val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(offsetCommitRecord)
assertEquals(Some(s"offset_commit::group=$groupId,partition=$topicPartition"), keyStringOpt)
assertEquals(Some("offset=35"), valueStringOpt)
}
@Test
def testCommittedOffsetTombstoneParsing(): Unit = {
val groupId = "group"
val topicPartition = new TopicPartition("topic", 0)
val offsetCommitRecord = TestUtils.records(Seq(
new SimpleRecord(GroupMetadataManager.offsetCommitKey(groupId, topicPartition), null)
)).records.asScala.head
val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(offsetCommitRecord)
assertEquals(Some(s"offset_commit::group=$groupId,partition=$topicPartition"), keyStringOpt)
assertEquals(Some("<DELETE>"), valueStringOpt)
}
@Test
def testGroupMetadataParsingWithNullUserData(): Unit = {
val generation = 935
val protocolType = "consumer"
val protocol = "range"
val memberId = "98098230493"
val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null)
))
val groupMetadataRecord = TestUtils.records(Seq(
buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes)
)).records.asScala.head
val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(groupMetadataRecord)
assertEquals(Some(s"group_metadata::group=$groupId"), keyStringOpt)
assertEquals(Some("{\"protocolType\":\"consumer\",\"protocol\":\"range\"," +
"\"generationId\":935,\"assignment\":\"{98098230493=[topic-0]}\"}"), valueStringOpt)
}
@Test
def testGroupMetadataTombstoneParsing(): Unit = {
val groupId = "group"
val groupMetadataRecord = TestUtils.records(Seq(
new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
)).records.asScala.head
val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(groupMetadataRecord)
assertEquals(Some(s"group_metadata::group=$groupId"), keyStringOpt)
assertEquals(Some("<DELETE>"), valueStringOpt)
}
private def verifyAppendAndCaptureCallback(): ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = {
val capturedArgument: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
verify(replicaManager).appendRecords(anyLong(),

View File

@ -20,20 +20,26 @@ package kafka.tools
import java.io.{ByteArrayOutputStream, File, PrintWriter}
import java.nio.ByteBuffer
import java.util
import java.util.Collections
import java.util.Optional
import java.util.Arrays
import java.util.Properties
import kafka.log.{LogTestUtils, UnifiedLog}
import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
import kafka.server.{BrokerTopicStats, KafkaRaftServer}
import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
import kafka.tools.DumpLogSegments.{OffsetsMessageParser, TimeIndexDumpErrors}
import kafka.utils.TestUtils
import org.apache.kafka.common.Uuid
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.{Assignment, Subscription}
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRecord, TopicRecord}
import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache}
import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, MemoryRecords, RecordVersion, SimpleRecord}
import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, MemoryRecords, Record, RecordVersion, SimpleRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group
import org.apache.kafka.coordinator.group.RecordSerde
import org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue, ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey, GroupMetadataValue}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch}
@ -403,6 +409,194 @@ class DumpLogSegmentsTest {
assertEquals(partialBatches, partialBatchesCount)
}
@Test
def testOffsetsMessageParser(): Unit = {
val serde = new RecordSerde()
val parser = new OffsetsMessageParser()
def serializedRecord(key: ApiMessageAndVersion, value: ApiMessageAndVersion): Record = {
val record = new group.Record(key, value)
TestUtils.singletonRecords(
key = serde.serializeKey(record),
value = serde.serializeValue(record)
).records.iterator.next
}
// The key is mandatory.
assertEquals(
"Failed to decode message at offset 0 using offset topic decoder (message had a missing key)",
assertThrows(
classOf[RuntimeException],
() => parser.parse(TestUtils.singletonRecords(key = null, value = null).records.iterator.next)
).getMessage
)
// A valid key and value should work.
assertEquals(
(
Some("{\"type\":\"3\",\"data\":{\"groupId\":\"group\"}}"),
Some("{\"version\":\"0\",\"data\":{\"epoch\":10}}")
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(
new ConsumerGroupMetadataKey()
.setGroupId("group"),
3.toShort
),
new ApiMessageAndVersion(
new ConsumerGroupMetadataValue()
.setEpoch(10),
0.toShort
)
))
)
// Consumer embedded protocol is parsed if possible.
assertEquals(
(
Some("{\"type\":\"2\",\"data\":{\"group\":\"group\"}}"),
Some("{\"version\":\"4\",\"data\":{\"protocolType\":\"consumer\",\"generation\":10,\"protocol\":\"range\"," +
"\"leader\":\"member\",\"currentStateTimestamp\":-1,\"members\":[{\"memberId\":\"member\"," +
"\"groupInstanceId\":\"instance\",\"clientId\":\"client\",\"clientHost\":\"host\"," +
"\"rebalanceTimeout\":1000,\"sessionTimeout\":100,\"subscription\":{\"topics\":[\"foo\"]," +
"\"userData\":null,\"ownedPartitions\":[{\"topic\":\"foo\",\"partitions\":[0]}]," +
"\"generationId\":0,\"rackId\":\"rack\"},\"assignment\":{\"assignedPartitions\":" +
"[{\"topic\":\"foo\",\"partitions\":[0]}],\"userData\":null}}]}}")
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(
new GroupMetadataKey()
.setGroup("group"),
2.toShort
),
new ApiMessageAndVersion(
new GroupMetadataValue()
.setProtocolType("consumer")
.setProtocol("range")
.setLeader("member")
.setGeneration(10)
.setMembers(Collections.singletonList(
new GroupMetadataValue.MemberMetadata()
.setMemberId("member")
.setClientId("client")
.setClientHost("host")
.setGroupInstanceId("instance")
.setSessionTimeout(100)
.setRebalanceTimeout(1000)
.setSubscription(Utils.toArray(ConsumerProtocol.serializeSubscription(
new Subscription(
Collections.singletonList("foo"),
null,
Collections.singletonList(new TopicPartition("foo", 0)),
0,
Optional.of("rack")))))
.setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment(
new Assignment(Collections.singletonList(new TopicPartition("foo", 0))))))
)),
GroupMetadataValue.HIGHEST_SUPPORTED_VERSION
)
))
)
// Consumer embedded protocol is not parsed if malformed.
assertEquals(
(
Some("{\"type\":\"2\",\"data\":{\"group\":\"group\"}}"),
Some("{\"version\":\"4\",\"data\":{\"protocolType\":\"consumer\",\"generation\":10,\"protocol\":\"range\"," +
"\"leader\":\"member\",\"currentStateTimestamp\":-1,\"members\":[{\"memberId\":\"member\"," +
"\"groupInstanceId\":\"instance\",\"clientId\":\"client\",\"clientHost\":\"host\"," +
"\"rebalanceTimeout\":1000,\"sessionTimeout\":100,\"subscription\":\"U3Vic2NyaXB0aW9u\"," +
"\"assignment\":\"QXNzaWdubWVudA==\"}]}}")
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(
new GroupMetadataKey()
.setGroup("group"),
2.toShort
),
new ApiMessageAndVersion(
new GroupMetadataValue()
.setProtocolType("consumer")
.setProtocol("range")
.setLeader("member")
.setGeneration(10)
.setMembers(Collections.singletonList(
new GroupMetadataValue.MemberMetadata()
.setMemberId("member")
.setClientId("client")
.setClientHost("host")
.setGroupInstanceId("instance")
.setSessionTimeout(100)
.setRebalanceTimeout(1000)
.setSubscription("Subscription".getBytes)
.setAssignment("Assignment".getBytes)
)),
GroupMetadataValue.HIGHEST_SUPPORTED_VERSION
)
))
)
// A valid key with a tombstone should work.
assertEquals(
(
Some("{\"type\":\"3\",\"data\":{\"groupId\":\"group\"}}"),
Some("<DELETE>")
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(
new ConsumerGroupMetadataKey()
.setGroupId("group"),
3.toShort
),
null
))
)
// An unknown record type should be handled and reported as such.
assertEquals(
(
Some(
"Unknown record type 32767 at offset 0, skipping."
),
None
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(
new ConsumerGroupMetadataKey()
.setGroupId("group"),
Short.MaxValue // Invalid record id.
),
new ApiMessageAndVersion(
new ConsumerGroupMetadataValue()
.setEpoch(10),
0.toShort
)
))
)
// Any parsing error is swallowed and reported.
assertEquals(
(
Some(
"Error at offset 0, skipping. Could not read record with version 0 from value's buffer due to: " +
"Error reading byte array of 536870911 byte(s): only 1 byte(s) available."
),
None
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(
new ConsumerGroupMetadataKey()
.setGroupId("group"),
3.toShort
),
new ApiMessageAndVersion(
new ConsumerGroupMemberMetadataValue(), // The value does correspond to the record id.
0.toShort
)
))
)
}
private def readBatchMetadata(lines: util.ListIterator[String]): Option[String] = {
while (lines.hasNext) {
val line = lines.next()
@ -536,5 +730,4 @@ class DumpLogSegmentsTest {
}
}
}
}