KAFKA-17633: Add share group record formatter and parser. (#17467)

As part of KIP-932, a new internal topic __share_group_state was introduced. There are 2 types of records which are currently being added in this topic - ShareSnapshotKey/Value and ShareUpdateKey/Value
In light of this, we must make the existing tooling like kafka-console-consumer and kafka-dump-log aware of these records for debugging and introspection purposes.
This PR introduces ShareGroupStateMessageFormatter to be used used with kafka-console-consumer and adds an internal class ShareGroupStateMessageParser in DumpLogSegments.scala.
Unit tests have been added to DumpLogSegmentsTest.scala


Reviewers:  Andrew Schofield <aschofield@confluent.io>,  Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Sushant Mahajan 2024-10-15 11:44:15 +05:30 committed by GitHub
parent aa6460dbed
commit 5545d717c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 613 additions and 7 deletions

View File

@ -2442,6 +2442,8 @@ project(':tools') {
implementation project(':transaction-coordinator')
implementation project(':group-coordinator')
implementation project(':coordinator-common')
implementation project(':share-coordinator')
implementation project(':share')
implementation libs.argparse4j
implementation libs.jacksonDatabind
implementation libs.jacksonDataformatCsv

View File

@ -327,6 +327,10 @@
<allow pkg="org.apache.kafka.tools"/>
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="scala"/>
<subpackage name="share">
<allow pkg="org.apache.kafka.server.share" />
<allow pkg="org.apache.kafka.coordinator.share" />
</subpackage>
</subpackage>
</subpackage>

View File

@ -40,6 +40,8 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.generated.{ConsumerGroupCurrentMemberAssignmentKey, ConsumerGroupCurrentMemberAssignmentKeyJsonConverter, ConsumerGroupCurrentMemberAssignmentValue, ConsumerGroupCurrentMemberAssignmentValueJsonConverter, ConsumerGroupMemberMetadataKey, ConsumerGroupMemberMetadataKeyJsonConverter, ConsumerGroupMemberMetadataValue, ConsumerGroupMemberMetadataValueJsonConverter, ConsumerGroupMetadataKey, ConsumerGroupMetadataKeyJsonConverter, ConsumerGroupMetadataValue, ConsumerGroupMetadataValueJsonConverter, ConsumerGroupPartitionMetadataKey, ConsumerGroupPartitionMetadataKeyJsonConverter, ConsumerGroupPartitionMetadataValue, ConsumerGroupPartitionMetadataValueJsonConverter, ConsumerGroupTargetAssignmentMemberKey, ConsumerGroupTargetAssignmentMemberKeyJsonConverter, ConsumerGroupTargetAssignmentMemberValue, ConsumerGroupTargetAssignmentMemberValueJsonConverter, ConsumerGroupTargetAssignmentMetadataKey, ConsumerGroupTargetAssignmentMetadataKeyJsonConverter, ConsumerGroupTargetAssignmentMetadataValue, ConsumerGroupTargetAssignmentMetadataValueJsonConverter, GroupMetadataKey, GroupMetadataKeyJsonConverter, GroupMetadataValue, GroupMetadataValueJsonConverter, OffsetCommitKey, OffsetCommitKeyJsonConverter, OffsetCommitValue, OffsetCommitValueJsonConverter, ShareGroupCurrentMemberAssignmentKey, ShareGroupCurrentMemberAssignmentKeyJsonConverter, ShareGroupCurrentMemberAssignmentValue, ShareGroupCurrentMemberAssignmentValueJsonConverter, ShareGroupMemberMetadataKey, ShareGroupMemberMetadataKeyJsonConverter, ShareGroupMemberMetadataValue, ShareGroupMemberMetadataValueJsonConverter, ShareGroupMetadataKey, ShareGroupMetadataKeyJsonConverter, ShareGroupMetadataValue, ShareGroupMetadataValueJsonConverter, ShareGroupPartitionMetadataKey, ShareGroupPartitionMetadataKeyJsonConverter, ShareGroupPartitionMetadataValue, ShareGroupPartitionMetadataValueJsonConverter, ShareGroupStatePartitionMetadataKey, ShareGroupStatePartitionMetadataKeyJsonConverter, ShareGroupStatePartitionMetadataValue, ShareGroupStatePartitionMetadataValueJsonConverter, ShareGroupTargetAssignmentMemberKey, ShareGroupTargetAssignmentMemberKeyJsonConverter, ShareGroupTargetAssignmentMemberValue, ShareGroupTargetAssignmentMemberValueJsonConverter, ShareGroupTargetAssignmentMetadataKey, ShareGroupTargetAssignmentMetadataKeyJsonConverter, ShareGroupTargetAssignmentMetadataValue, ShareGroupTargetAssignmentMetadataValueJsonConverter}
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, ShareSnapshotKeyJsonConverter, ShareSnapshotValue, ShareSnapshotValueJsonConverter, ShareUpdateKey, ShareUpdateKeyJsonConverter, ShareUpdateValue, ShareUpdateValueJsonConverter}
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
import org.apache.kafka.snapshot.Snapshots
@ -628,6 +630,68 @@ object DumpLogSegments {
}
}
// for test visibility
class ShareGroupStateMessageParser extends MessageParser[String, String] {
private val serde = new ShareCoordinatorRecordSerde()
private def prepareKey(message: Message, version: Short): String = {
val messageAsJson = message match {
case m: ShareSnapshotKey =>
ShareSnapshotKeyJsonConverter.write(m, version)
case m: ShareUpdateKey =>
ShareUpdateKeyJsonConverter.write(m, version)
case _ => throw new UnknownRecordTypeException(version)
}
jsonString(messageAsJson, version)
}
private def prepareValue(message: Message, version: Short): String = {
val messageAsJson = message match {
case m: ShareSnapshotValue =>
ShareSnapshotValueJsonConverter.write(m, version)
case m: ShareUpdateValue =>
ShareUpdateValueJsonConverter.write(m, version)
case _ => throw new IllegalStateException(s"Message value ${message.getClass.getSimpleName} is not supported.")
}
jsonString(messageAsJson, version)
}
private def jsonString(jsonNode: JsonNode, version: Short): String = {
val json = new ObjectNode(JsonNodeFactory.instance)
json.set("type", new TextNode(version.toString))
json.set("data", jsonNode)
json.toString
}
override def parse(record: Record): (Option[String], Option[String]) = {
if (!record.hasKey)
throw new RuntimeException(s"Failed to decode message at offset ${record.offset} using share group state " +
"topic decoder (message had a missing key)")
try {
val r = serde.deserialize(record.key, record.value)
(
Some(prepareKey(r.key.message, r.key.version)),
Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("<DELETE>"))
)
} catch {
case e: UnknownRecordTypeException =>
(
Some(s"Unknown record type ${e.unknownType} at offset ${record.offset}, skipping."),
None
)
case e: Throwable =>
(
Some(s"Error at offset ${record.offset}, skipping. ${e.getMessage}"),
None
)
}
}
}
private class DumpLogSegmentsOptions(args: Array[String]) extends CommandDefaultOptions(args) {
private val printOpt = parser.accepts("print-data-log", "If set, printing the messages content when dumping data logs. Automatically set if any decoder option is specified.")
private val verifyOpt = parser.accepts("verify-index-only", "If set, just verify the index log without printing its content.")
@ -663,6 +727,8 @@ object DumpLogSegments {
private val clusterMetadataOpt = parser.accepts("cluster-metadata-decoder", "If set, log data will be parsed as cluster metadata records.")
private val remoteMetadataOpt = parser.accepts("remote-log-metadata-decoder", "If set, log data will be parsed as TopicBasedRemoteLogMetadataManager (RLMM) metadata records." +
" Instead, the value-decoder-class option can be used if a custom RLMM implementation is configured.")
private val shareStateOpt = parser.accepts("share-group-state-decoder", "If set, log data will be parsed as share group state data from the " +
"__share_group_state topic.")
private val skipRecordMetadataOpt = parser.accepts("skip-record-metadata", "Whether to skip printing metadata for each record.")
options = parser.parse(args : _*)
@ -674,7 +740,9 @@ object DumpLogSegments {
} else if (options.has(clusterMetadataOpt)) {
new ClusterMetadataLogMessageParser
} else if (options.has(remoteMetadataOpt)) {
new RemoteMetadataLogMessageParser
new RemoteMetadataLogMessageParser
} else if (options.has(shareStateOpt)) {
new ShareGroupStateMessageParser
} else {
val valueDecoder = CoreUtils.createObject[org.apache.kafka.tools.api.Decoder[_]](options.valueOf(valueDecoderOpt))
val keyDecoder = CoreUtils.createObject[org.apache.kafka.tools.api.Decoder[_]](options.valueOf(keyDecoderOpt))
@ -687,7 +755,8 @@ object DumpLogSegments {
options.has(clusterMetadataOpt) ||
options.has(remoteMetadataOpt) ||
options.has(valueDecoderOpt) ||
options.has(keyDecoderOpt)
options.has(keyDecoderOpt) ||
options.has(shareStateOpt)
lazy val skipRecordMetadata: Boolean = options.has(skipRecordMetadataOpt)
lazy val isDeepIteration: Boolean = options.has(deepIterationOpt) || shouldPrintDataLog

View File

@ -27,7 +27,7 @@ import java.util.stream.IntStream
import kafka.log.{LogTestUtils, UnifiedLog}
import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
import kafka.server.KafkaRaftServer
import kafka.tools.DumpLogSegments.{OffsetsMessageParser, TimeIndexDumpErrors}
import kafka.tools.DumpLogSegments.{OffsetsMessageParser, ShareGroupStateMessageParser, TimeIndexDumpErrors}
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.{Assignment, Subscription}
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
@ -41,6 +41,8 @@ import org.apache.kafka.common.utils.{Exit, Utils}
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
import org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue, ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey, GroupMetadataValue}
import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, ShareSnapshotValue, ShareUpdateKey, ShareUpdateValue}
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRecordSerde}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch, VoterSetTest}
@ -962,4 +964,148 @@ class DumpLogSegmentsTest {
}
}
}
@Test
def testShareGroupStateMessageParser(): Unit = {
val serde = new ShareCoordinatorRecordSerde()
val parser = new ShareGroupStateMessageParser()
def serializedRecord(key: ApiMessageAndVersion, value: ApiMessageAndVersion): Record = {
val record = new CoordinatorRecord(key, value)
TestUtils.singletonRecords(
key = serde.serializeKey(record),
value = serde.serializeValue(record)
).records.iterator.next
}
// The key is mandatory.
assertEquals(
"Failed to decode message at offset 0 using share group state topic decoder (message had a missing key)",
assertThrows(
classOf[RuntimeException],
() => parser.parse(TestUtils.singletonRecords(key = null, value = null).records.iterator.next)
).getMessage
)
// A valid key and value should work (ShareSnapshot).
assertEquals(
(
Some("{\"type\":\"0\",\"data\":{\"groupId\":\"gs1\",\"topicId\":\"Uj5wn_FqTXirEASvVZRY1w\",\"partition\":0}}"),
Some("{\"type\":\"0\",\"data\":{\"snapshotEpoch\":0,\"stateEpoch\":0,\"leaderEpoch\":0,\"startOffset\":0,\"stateBatches\":[{\"firstOffset\":0,\"lastOffset\":4,\"deliveryState\":2,\"deliveryCount\":1}]}}")
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(new ShareSnapshotKey()
.setGroupId("gs1")
.setTopicId(Uuid.fromString("Uj5wn_FqTXirEASvVZRY1w"))
.setPartition(0),
ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION),
new ApiMessageAndVersion(new ShareSnapshotValue()
.setSnapshotEpoch(0)
.setStateEpoch(0)
.setLeaderEpoch(0)
.setStartOffset(0)
.setStateBatches(List[ShareSnapshotValue.StateBatch](
new ShareSnapshotValue.StateBatch()
.setFirstOffset(0)
.setLastOffset(4)
.setDeliveryState(2)
.setDeliveryCount(1)
).asJava),
ShareCoordinator.SHARE_SNAPSHOT_RECORD_VALUE_VERSION)
))
)
// A valid key and value should work (ShareUpdate).
assertEquals(
(
Some("{\"type\":\"1\",\"data\":{\"groupId\":\"gs1\",\"topicId\":\"Uj5wn_FqTXirEASvVZRY1w\",\"partition\":0}}"),
Some("{\"type\":\"0\",\"data\":{\"snapshotEpoch\":0,\"leaderEpoch\":0,\"startOffset\":0,\"stateBatches\":[{\"firstOffset\":0,\"lastOffset\":4,\"deliveryState\":2,\"deliveryCount\":1}]}}")
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(new ShareUpdateKey()
.setGroupId("gs1")
.setTopicId(Uuid.fromString("Uj5wn_FqTXirEASvVZRY1w"))
.setPartition(0),
ShareCoordinator.SHARE_UPDATE_RECORD_KEY_VERSION),
new ApiMessageAndVersion(new ShareUpdateValue()
.setSnapshotEpoch(0)
.setLeaderEpoch(0)
.setStartOffset(0)
.setStateBatches(List[ShareUpdateValue.StateBatch](
new ShareUpdateValue.StateBatch()
.setFirstOffset(0)
.setLastOffset(4)
.setDeliveryState(2)
.setDeliveryCount(1)
).asJava),
0.toShort)
))
)
// A valid key with a tombstone should work.
assertEquals(
(
Some("{\"type\":\"0\",\"data\":{\"groupId\":\"gs1\",\"topicId\":\"Uj5wn_FqTXirEASvVZRY1w\",\"partition\":0}}"),
Some("<DELETE>")
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(
new ShareSnapshotKey()
.setGroupId("gs1")
.setTopicId(Uuid.fromString("Uj5wn_FqTXirEASvVZRY1w"))
.setPartition(0),
0.toShort
),
null
))
)
// An unknown record type should be handled and reported as such.
assertEquals(
(
Some(
"Unknown record type 32767 at offset 0, skipping."
),
None
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(
new ShareSnapshotKey()
.setGroupId("group")
.setTopicId(Uuid.fromString("Uj5wn_FqTXirEASvVZRY1w"))
.setPartition(0),
Short.MaxValue // Invalid record id.
),
new ApiMessageAndVersion(
new ShareSnapshotValue()
.setSnapshotEpoch(0),
0.toShort
)
))
)
// Any parsing error is swallowed and reported.
assertEquals(
(
Some(
"Error at offset 0, skipping. Could not read record with version 0 from value's buffer due to: " +
"non-nullable field stateBatches was serialized as null."
),
None
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(
new ShareUpdateKey()
.setGroupId("group")
.setTopicId(Uuid.fromString("Uj5wn_FqTXirEASvVZRY1w"))
.setPartition(0),
1.toShort
),
new ApiMessageAndVersion(
new ShareSnapshotValue(), // incorrect class to deserialize the snapshot update value
0.toShort
)
))
)
}
}

View File

@ -46,7 +46,7 @@ public class ShareCoordinatorRecordHelpers {
.setDeliveryCount(batch.deliveryCount())
.setDeliveryState(batch.deliveryState()))
.collect(Collectors.toList())),
(short) 0)
ShareCoordinator.SHARE_SNAPSHOT_RECORD_VALUE_VERSION)
);
}
@ -68,7 +68,7 @@ public class ShareCoordinatorRecordHelpers {
.setDeliveryCount(batch.deliveryCount())
.setDeliveryState(batch.deliveryState()))
.collect(Collectors.toList())),
(short) 0)
ShareCoordinator.SHARE_UPDATE_RECORD_VALUE_VERSION)
);
}
}

View File

@ -70,7 +70,7 @@ public class ShareCoordinatorRecordHelpersTest {
.setLastOffset(10L)
.setDeliveryState((byte) 0)
.setDeliveryCount((short) 1))),
(short) 0));
ShareCoordinator.SHARE_SNAPSHOT_RECORD_VALUE_VERSION));
assertEquals(expectedRecord, record);
}
@ -112,7 +112,7 @@ public class ShareCoordinatorRecordHelpersTest {
.setLastOffset(10L)
.setDeliveryState((byte) 0)
.setDeliveryCount((short) 1))),
(short) 0));
ShareCoordinator.SHARE_UPDATE_RECORD_VALUE_VERSION));
assertEquals(expectedRecord, record);
}

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer.group.share;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKeyJsonConverter;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValueJsonConverter;
import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
import org.apache.kafka.coordinator.share.generated.ShareUpdateKeyJsonConverter;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValueJsonConverter;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Optional;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
* Formatter for records of in __share_group_state topic.
*/
public class ShareGroupStateMessageFormatter implements MessageFormatter {
private static final String VERSION = "version";
private static final String DATA = "data";
private static final String KEY = "key";
private static final String VALUE = "value";
private static final String UNKNOWN = "unknown";
@Override
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output) {
ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
byte[] key = consumerRecord.key();
short keyVersion = -1;
if (Objects.nonNull(key)) {
keyVersion = ByteBuffer.wrap(key).getShort();
JsonNode dataNode = readToKeyJson(ByteBuffer.wrap(key), keyVersion);
if (dataNode instanceof NullNode) {
return;
}
json.putObject(KEY)
.put(VERSION, keyVersion)
.set(DATA, dataNode);
} else {
json.set(KEY, NullNode.getInstance());
}
byte[] value = consumerRecord.value();
if (Objects.nonNull(value)) {
short valueVersion = ByteBuffer.wrap(value).getShort();
JsonNode dataNode = readToValueJson(ByteBuffer.wrap(value), keyVersion, valueVersion);
json.putObject(VALUE)
.put(VERSION, valueVersion)
.set(DATA, dataNode);
} else {
json.set(VALUE, NullNode.getInstance());
}
try {
output.write(json.toString().getBytes(UTF_8));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) {
return readToSnapshotMessageKey(byteBuffer)
.map(logKey -> transferKeyMessageToJsonNode(logKey, version))
.orElseGet(() -> new TextNode(UNKNOWN));
}
private Optional<ApiMessage> readToSnapshotMessageKey(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= ShareSnapshotKey.LOWEST_SUPPORTED_VERSION
&& version <= ShareSnapshotKey.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new ShareSnapshotKey(new ByteBufferAccessor(byteBuffer), version));
} else if (version >= ShareUpdateKey.LOWEST_SUPPORTED_VERSION
&& version <= ShareUpdateKey.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new ShareUpdateKey(new ByteBufferAccessor(byteBuffer), version));
} else {
return Optional.empty();
}
}
private JsonNode transferKeyMessageToJsonNode(ApiMessage logKey, short keyVersion) {
if (logKey instanceof ShareSnapshotKey) {
return ShareSnapshotKeyJsonConverter.write((ShareSnapshotKey) logKey, keyVersion);
} else if (logKey instanceof ShareUpdateKey) {
return ShareUpdateKeyJsonConverter.write((ShareUpdateKey) logKey, keyVersion);
}
return new TextNode(UNKNOWN);
}
/**
* Here the valueVersion is not enough to identity the deserializer for the ByteBuffer.
* This is because both {@link ShareSnapshotValue} and {@link ShareUpdateValue} have version 0
* as per RPC spec.
* To differentiate, we need to use the corresponding key versions. This is acceptable as
* the records will always appear in pairs (key, value). However, this means that we cannot
* extend {@link org.apache.kafka.tools.consumer.ApiMessageFormatter} as it requires overriding
* readToValueJson whose signature does not allow for passing keyversion.
*
* @param byteBuffer - Represents the raw data read from the topic
* @param keyVersion - Version of the actual key component of the data read from topic
* @param valueVersion - Version of the actual value component of the data read from topic
* @return JsonNode corresponding to the raw data value component
*/
protected JsonNode readToValueJson(ByteBuffer byteBuffer, short keyVersion, short valueVersion) {
return readToSnapshotMessageValue(byteBuffer, keyVersion)
.map(logValue -> transferValueMessageToJsonNode(logValue, valueVersion))
.orElseGet(() -> new TextNode(UNKNOWN));
}
private JsonNode transferValueMessageToJsonNode(ApiMessage logValue, short version) {
if (logValue instanceof ShareSnapshotValue) {
return ShareSnapshotValueJsonConverter.write((ShareSnapshotValue) logValue, version);
} else if (logValue instanceof ShareUpdateValue) {
return ShareUpdateValueJsonConverter.write((ShareUpdateValue) logValue, version);
}
return new TextNode(UNKNOWN);
}
private Optional<ApiMessage> readToSnapshotMessageValue(ByteBuffer byteBuffer, short keyVersion) {
short version = byteBuffer.getShort();
// Check the key version here as that will determine which type
// of value record to fetch. Both share update and share snapshot
// value records can have the same version.
if (keyVersion >= ShareSnapshotKey.LOWEST_SUPPORTED_VERSION
&& keyVersion <= ShareSnapshotKey.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new ShareSnapshotValue(new ByteBufferAccessor(byteBuffer), version));
} else if (keyVersion >= ShareUpdateKey.LOWEST_SUPPORTED_VERSION
&& keyVersion <= ShareUpdateKey.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new ShareUpdateValue(new ByteBufferAccessor(byteBuffer), version));
}
return Optional.empty();
}
}

View File

@ -0,0 +1,218 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer.group.share;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.coordinator.share.ShareGroupOffset;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.persister.PersisterStateBatch;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ShareGroupStateMessageFormatterTest {
private static final SharePartitionKey KEY_1 = SharePartitionKey.getInstance("gs1", Uuid.fromString("gtb2stGYRk-vWZ2zAozmoA"), 0);
private static final ShareGroupOffset SHARE_GROUP_OFFSET_1 = new ShareGroupOffset.Builder()
.setSnapshotEpoch(0)
.setStateEpoch(1)
.setLeaderEpoch(20)
.setStartOffset(50)
.setStateBatches(
Arrays.asList(
new PersisterStateBatch(
100,
200,
(byte) 1,
(short) 10
),
new PersisterStateBatch(
201,
210,
(byte) 2,
(short) 10
)
)
).build();
private static final SharePartitionKey KEY_2 = SharePartitionKey.getInstance("gs2", Uuid.fromString("r9Nq4xGAQf28jvu36t7gQQ"), 0);
private static final ShareGroupOffset SHARE_GROUP_OFFSET_2 = new ShareGroupOffset.Builder()
.setSnapshotEpoch(1)
.setStateEpoch(3)
.setLeaderEpoch(25)
.setStartOffset(55)
.setStateBatches(
Arrays.asList(
new PersisterStateBatch(
100,
150,
(byte) 1,
(short) 12
),
new PersisterStateBatch(
151,
200,
(byte) 2,
(short) 15
)
)
).build();
private static final ShareSnapshotKey SHARE_SNAPSHOT_KEY = new ShareSnapshotKey()
.setGroupId(KEY_1.groupId())
.setTopicId(KEY_1.topicId())
.setPartition(KEY_1.partition());
private static final ShareSnapshotValue SHARE_SNAPSHOT_VALUE = new ShareSnapshotValue()
.setSnapshotEpoch(SHARE_GROUP_OFFSET_1.snapshotEpoch())
.setStateEpoch(SHARE_GROUP_OFFSET_1.stateEpoch())
.setLeaderEpoch(SHARE_GROUP_OFFSET_1.leaderEpoch())
.setStartOffset(SHARE_GROUP_OFFSET_1.startOffset())
.setStateBatches(
SHARE_GROUP_OFFSET_1.stateBatches().stream()
.map(batch -> new ShareSnapshotValue.StateBatch()
.setFirstOffset(batch.firstOffset())
.setLastOffset(batch.lastOffset())
.setDeliveryState(batch.deliveryState())
.setDeliveryCount(batch.deliveryCount()))
.collect(Collectors.toList())
);
private static final ShareUpdateKey SHARE_UPDATE_KEY = new ShareUpdateKey()
.setGroupId(KEY_2.groupId())
.setTopicId(KEY_2.topicId())
.setPartition(KEY_2.partition());
private static final ShareUpdateValue SHARE_UPDATE_VALUE = new ShareUpdateValue()
.setSnapshotEpoch(SHARE_GROUP_OFFSET_2.snapshotEpoch())
.setLeaderEpoch(SHARE_GROUP_OFFSET_2.leaderEpoch())
.setStartOffset(SHARE_GROUP_OFFSET_2.startOffset())
.setStateBatches(
SHARE_GROUP_OFFSET_2.stateBatches().stream()
.map(batch -> new ShareUpdateValue.StateBatch()
.setFirstOffset(batch.firstOffset())
.setLastOffset(batch.lastOffset())
.setDeliveryState(batch.deliveryState())
.setDeliveryCount(batch.deliveryCount()))
.collect(Collectors.toList())
);
private static Stream<Arguments> parameters() {
return Stream.of(
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_SNAPSHOT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_SNAPSHOT_VALUE).array(),
"{\"key\":{\"version\":0,\"data\":{\"groupId\":\"gs1\",\"topicId\":\"gtb2stGYRk-vWZ2zAozmoA\",\"partition\":0}},\"value\":{\"version\":0,\"data\":{\"snapshotEpoch\":0,\"stateEpoch\":1,\"leaderEpoch\":20,\"startOffset\":50,\"stateBatches\":[{\"firstOffset\":100,\"lastOffset\":200,\"deliveryState\":1,\"deliveryCount\":10},{\"firstOffset\":201,\"lastOffset\":210,\"deliveryState\":2,\"deliveryCount\":10}]}}}"
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 1, SHARE_UPDATE_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_UPDATE_VALUE).array(),
"{\"key\":{\"version\":1,\"data\":{\"groupId\":\"gs2\",\"topicId\":\"r9Nq4xGAQf28jvu36t7gQQ\",\"partition\":0}},\"value\":{\"version\":0,\"data\":{\"snapshotEpoch\":1,\"leaderEpoch\":25,\"startOffset\":55,\"stateBatches\":[{\"firstOffset\":100,\"lastOffset\":150,\"deliveryState\":1,\"deliveryCount\":12},{\"firstOffset\":151,\"lastOffset\":200,\"deliveryState\":2,\"deliveryCount\":15}]}}}"
),
// wrong versions
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 10, SHARE_SNAPSHOT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_SNAPSHOT_VALUE).array(),
"{\"key\":{\"version\":10,\"data\":\"unknown\"},\"value\":{\"version\":0,\"data\":\"unknown\"}}"
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 15, SHARE_UPDATE_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_UPDATE_VALUE).array(),
"{\"key\":{\"version\":15,\"data\":\"unknown\"},\"value\":{\"version\":0,\"data\":\"unknown\"}}"
)
);
}
private static Stream<Arguments> exceptions() {
return Stream.of(
// wrong types
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_SNAPSHOT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_UPDATE_VALUE).array(),
new RuntimeException("non-nullable field stateBatches was serialized as null")
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 1, SHARE_UPDATE_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_SNAPSHOT_VALUE).array(),
new RuntimeException("non-nullable field stateBatches was serialized as null")
)
);
}
@ParameterizedTest
@MethodSource("parameters")
public void testShareGroupStateMessageFormatter(
byte[] keyBuffer,
byte[] valueBuffer,
String expectedOutput
) {
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, 0,
0L, TimestampType.CREATE_TIME, 0,
0, keyBuffer, valueBuffer,
new RecordHeaders(), Optional.empty());
try (MessageFormatter formatter = new ShareGroupStateMessageFormatter()) {
formatter.configure(emptyMap());
ByteArrayOutputStream out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals(expectedOutput, out.toString());
}
}
@ParameterizedTest
@MethodSource("exceptions")
public void testShareGroupStateMessageFormatterException(
byte[] keyBuffer,
byte[] valueBuffer,
RuntimeException expectedOutput
) {
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, 0,
0L, TimestampType.CREATE_TIME, 0,
0, keyBuffer, valueBuffer,
new RecordHeaders(), Optional.empty());
try (MessageFormatter formatter = new ShareGroupStateMessageFormatter()) {
formatter.configure(emptyMap());
ByteArrayOutputStream out = new ByteArrayOutputStream();
RuntimeException re = assertThrows(RuntimeException.class, () -> formatter.writeTo(record, new PrintStream(out)));
assertEquals(expectedOutput.getMessage(), re.getMessage());
}
}
}