KAFKA-18616; Refactor Tools's ApiMessageFormatter (#18695)

This patch refactors the `ApiMessageFormatter` to follow what we have
done in https://github.com/apache/kafka/pull/18688.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Jacot 2025-03-26 11:12:38 +01:00 committed by GitHub
parent 91758cc99d
commit b6adec48c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 512 additions and 428 deletions

View File

@ -338,6 +338,7 @@
<subpackage name="consumer">
<allow pkg="org.apache.kafka.tools"/>
<allow pkg="org.apache.kafka.coordinator.common.runtime" />
<subpackage name="group">
<allow pkg="org.apache.kafka.coordinator.group"/>
<allow pkg="kafka.api"/>

View File

@ -18,6 +18,9 @@ package org.apache.kafka.tools.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
@ -31,45 +34,51 @@ import java.util.Objects;
import static java.nio.charset.StandardCharsets.UTF_8;
public abstract class ApiMessageFormatter implements MessageFormatter {
public abstract class CoordinatorRecordMessageFormatter implements MessageFormatter {
private static final String TYPE = "type";
private static final String VERSION = "version";
private static final String DATA = "data";
private static final String KEY = "key";
private static final String VALUE = "value";
static final String UNKNOWN = "unknown";
private final CoordinatorRecordSerde serde;
public CoordinatorRecordMessageFormatter(CoordinatorRecordSerde serde) {
this.serde = serde;
}
@Override
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output) {
if (Objects.isNull(consumerRecord.key())) return;
ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
try {
CoordinatorRecord record = serde.deserialize(
ByteBuffer.wrap(consumerRecord.key()),
consumerRecord.value() != null ? ByteBuffer.wrap(consumerRecord.value()) : null
);
byte[] key = consumerRecord.key();
if (Objects.nonNull(key)) {
short keyVersion = ByteBuffer.wrap(key).getShort();
JsonNode dataNode = readToKeyJson(ByteBuffer.wrap(key));
if (!isRecordTypeAllowed(record.key().apiKey())) return;
if (dataNode instanceof NullNode) {
return;
}
json.putObject(KEY)
.put(TYPE, keyVersion)
.set(DATA, dataNode);
} else {
return;
}
json
.putObject(KEY)
.put(TYPE, record.key().apiKey())
.set(DATA, keyAsJson(record.key()));
byte[] value = consumerRecord.value();
if (Objects.nonNull(value)) {
short valueVersion = ByteBuffer.wrap(value).getShort();
JsonNode dataNode = readToValueJson(ByteBuffer.wrap(value));
json.putObject(VALUE)
.put(VERSION, valueVersion)
.set(DATA, dataNode);
if (Objects.nonNull(record.value())) {
json
.putObject(VALUE)
.put(VERSION, record.value().version())
.set(DATA, valueAsJson(record.value().message(), record.value().version()));
} else {
json.set(VALUE, NullNode.getInstance());
}
} catch (CoordinatorRecordSerde.UnknownRecordTypeException ex) {
return;
} catch (RuntimeException ex) {
throw new RuntimeException("Could not read record at offset " + consumerRecord.offset() +
" due to: " + ex.getMessage(), ex);
}
try {
output.write(json.toString().getBytes(UTF_8));
@ -78,6 +87,7 @@ public abstract class ApiMessageFormatter implements MessageFormatter {
}
}
protected abstract JsonNode readToKeyJson(ByteBuffer byteBuffer);
protected abstract JsonNode readToValueJson(ByteBuffer byteBuffer);
protected abstract boolean isRecordTypeAllowed(short recordType);
protected abstract JsonNode keyAsJson(ApiMessage message);
protected abstract JsonNode valueAsJson(ApiMessage message, short version);
}

View File

@ -16,45 +16,36 @@
*/
package org.apache.kafka.tools.consumer;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordJsonConverters;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKeyJsonConverter;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValueJsonConverter;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode;
import java.nio.ByteBuffer;
import java.util.Set;
public class GroupMetadataMessageFormatter extends ApiMessageFormatter {
@Override
protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
try {
switch (CoordinatorRecordType.fromId(byteBuffer.getShort())) {
case GROUP_METADATA:
return GroupMetadataKeyJsonConverter.write(
new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), (short) 0),
(short) 0
public class GroupMetadataMessageFormatter extends CoordinatorRecordMessageFormatter {
private static final Set<Short> ALLOWED_RECORDS = Set.of(
CoordinatorRecordType.GROUP_METADATA.id()
);
default:
return NullNode.getInstance();
}
} catch (UnsupportedVersionException ex) {
return NullNode.getInstance();
}
public GroupMetadataMessageFormatter() {
super(new GroupCoordinatorRecordSerde());
}
@Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= GroupMetadataValue.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataValue.HIGHEST_SUPPORTED_VERSION) {
return GroupMetadataValueJsonConverter.write(new GroupMetadataValue(new ByteBufferAccessor(byteBuffer), version), version);
protected boolean isRecordTypeAllowed(short recordType) {
return ALLOWED_RECORDS.contains(recordType);
}
return new TextNode(UNKNOWN);
@Override
protected JsonNode keyAsJson(ApiMessage message) {
return CoordinatorRecordJsonConverters.writeRecordKeyAsJson(message);
}
@Override
protected JsonNode valueAsJson(ApiMessage message, short version) {
return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version);
}
}

View File

@ -16,50 +16,40 @@
*/
package org.apache.kafka.tools.consumer;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordJsonConverters;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKeyJsonConverter;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValueJsonConverter;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode;
import java.nio.ByteBuffer;
import java.util.Set;
/**
* Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
*/
public class OffsetsMessageFormatter extends ApiMessageFormatter {
@Override
protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
try {
switch (CoordinatorRecordType.fromId(byteBuffer.getShort())) {
// We can read both record types with the offset commit one.
case LEGACY_OFFSET_COMMIT:
case OFFSET_COMMIT:
return OffsetCommitKeyJsonConverter.write(
new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), (short) 0),
(short) 0
public class OffsetsMessageFormatter extends CoordinatorRecordMessageFormatter {
private static final Set<Short> ALLOWED_RECORDS = Set.of(
CoordinatorRecordType.LEGACY_OFFSET_COMMIT.id(),
CoordinatorRecordType.OFFSET_COMMIT.id()
);
default:
return NullNode.getInstance();
}
} catch (UnsupportedVersionException ex) {
return NullNode.getInstance();
}
public OffsetsMessageFormatter() {
super(new GroupCoordinatorRecordSerde());
}
@Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= OffsetCommitValue.LOWEST_SUPPORTED_VERSION && version <= OffsetCommitValue.HIGHEST_SUPPORTED_VERSION) {
return OffsetCommitValueJsonConverter.write(new OffsetCommitValue(new ByteBufferAccessor(byteBuffer), version), version);
protected boolean isRecordTypeAllowed(short recordType) {
return ALLOWED_RECORDS.contains(recordType);
}
return new TextNode(UNKNOWN);
@Override
protected JsonNode keyAsJson(ApiMessage message) {
return CoordinatorRecordJsonConverters.writeRecordKeyAsJson(message);
}
@Override
protected JsonNode valueAsJson(ApiMessage message, short version) {
return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version);
}
}

View File

@ -16,45 +16,29 @@
*/
package org.apache.kafka.tools.consumer;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde;
import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordJsonConverters;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode;
import java.nio.ByteBuffer;
public class TransactionLogMessageFormatter extends ApiMessageFormatter {
@Override
protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
try {
switch (CoordinatorRecordType.fromId(byteBuffer.getShort())) {
case TRANSACTION_LOG:
return TransactionLogKeyJsonConverter.write(
new TransactionLogKey(new ByteBufferAccessor(byteBuffer), (short) 0),
(short) 0
);
default:
return NullNode.getInstance();
}
} catch (UnsupportedVersionException ex) {
return NullNode.getInstance();
}
public class TransactionLogMessageFormatter extends CoordinatorRecordMessageFormatter {
public TransactionLogMessageFormatter() {
super(new TransactionCoordinatorRecordSerde());
}
@Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION && version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
return TransactionLogValueJsonConverter.write(new TransactionLogValue(new ByteBufferAccessor(byteBuffer), version), version);
protected boolean isRecordTypeAllowed(short recordType) {
return true;
}
return new TextNode(UNKNOWN);
@Override
protected JsonNode keyAsJson(ApiMessage message) {
return CoordinatorRecordJsonConverters.writeRecordKeyAsJson(message);
}
@Override
protected JsonNode valueAsJson(ApiMessage message, short version) {
return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version);
}
}

View File

@ -30,6 +30,7 @@ import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
import org.apache.kafka.coordinator.share.generated.ShareUpdateKeyJsonConverter;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValueJsonConverter;
import org.apache.kafka.tools.consumer.CoordinatorRecordMessageFormatter;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
@ -132,7 +133,7 @@ public class ShareGroupStateMessageFormatter implements MessageFormatter {
* as per RPC spec.
* To differentiate, we need to use the corresponding key versions. This is acceptable as
* the records will always appear in pairs (key, value). However, this means that we cannot
* extend {@link org.apache.kafka.tools.consumer.ApiMessageFormatter} as it requires overriding
* extend {@link CoordinatorRecordMessageFormatter} as it requires overriding
* readToValueJson whose signature does not allow for passing keyversion.
*
* @param byteBuffer - Represents the raw data read from the topic

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Optional;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class CoordinatorRecordMessageFormatterTest {
private static final String TOPIC = "TOPIC";
protected abstract CoordinatorRecordMessageFormatter formatter();
protected abstract Stream<Arguments> parameters();
@ParameterizedTest
@MethodSource("parameters")
public void testMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) {
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
TOPIC,
0,
0,
0L,
TimestampType.CREATE_TIME,
0,
0,
keyBuffer,
valueBuffer,
new RecordHeaders(),
Optional.empty()
);
try (MessageFormatter formatter = formatter()) {
formatter.configure(emptyMap());
ByteArrayOutputStream out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals(expectedOutput.replaceAll("\\s+", ""), out.toString());
}
}
}

View File

@ -16,30 +16,19 @@
*/
package org.apache.kafka.tools.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Optional;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class GroupMetadataMessageFormatterTest {
public class GroupMetadataMessageFormatterTest extends CoordinatorRecordMessageFormatterTest {
private static final OffsetCommitKey OFFSET_COMMIT_KEY = new OffsetCommitKey()
.setGroup("group-id")
@ -69,91 +58,137 @@ public class GroupMetadataMessageFormatterTest {
.setLeader("leader")
.setMembers(singletonList(MEMBER_METADATA))
.setCurrentStateTimestamp(1234L);
private static final String TOPIC = "TOPIC";
private static Stream<Arguments> parameters() {
@Override
protected CoordinatorRecordMessageFormatter formatter() {
return new GroupMetadataMessageFormatter();
}
@Override
protected Stream<Arguments> parameters() {
return Stream.of(
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_VALUE).array(),
""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 0, GROUP_METADATA_VALUE).array(),
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":0," +
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
"\"leader\":\"leader\",\"members\":[{\"memberId\":\"member-1\",\"clientId\":\"client-1\"," +
"\"clientHost\":\"host-1\",\"sessionTimeout\":1500,\"subscription\":\"AAE=\"," +
"\"assignment\":\"AQI=\"}]}}}"
"""
{"key":{"type":2,"data":{"group":"group-id"}},
"value":{"version":0,
"data":{"protocolType":"consumer",
"generation":1,
"protocol":"range",
"leader":"leader",
"members":[{"memberId":"member-1",
"clientId":"client-1",
"clientHost":"host-1",
"sessionTimeout":1500,
"subscription":"AAE=",
"assignment":"AQI="}]}}}
"""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 1, GROUP_METADATA_VALUE).array(),
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":1," +
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
"\"leader\":\"leader\",\"members\":[{\"memberId\":\"member-1\",\"clientId\":\"client-1\"," +
"\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000,\"sessionTimeout\":1500," +
"\"subscription\":\"AAE=\",\"assignment\":\"AQI=\"}]}}}"
"""
{"key":{"type":2,"data":{"group":"group-id"}},
"value":{"version":1,
"data":{"protocolType":"consumer",
"generation":1,
"protocol":"range",
"leader":"leader",
"members":[{"memberId":"member-1",
"clientId":"client-1",
"clientHost":"host-1",
"rebalanceTimeout":1000,
"sessionTimeout":1500,
"subscription":"AAE=",
"assignment":"AQI="}]}}}
"""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_VALUE).array(),
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":2," +
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
"\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," +
"\"clientId\":\"client-1\",\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000," +
"\"sessionTimeout\":1500,\"subscription\":\"AAE=\",\"assignment\":\"AQI=\"}]}}}"
"""
{"key":{"type":2,"data":{"group":"group-id"}},
"value":{"version":2,
"data":{"protocolType":"consumer",
"generation":1,
"protocol":"range",
"leader":"leader",
"currentStateTimestamp":1234,
"members":[{"memberId":"member-1",
"clientId":"client-1",
"clientHost":"host-1",
"rebalanceTimeout":1000,
"sessionTimeout":1500,
"subscription":"AAE=",
"assignment":"AQI="}]}}}
"""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 3, GROUP_METADATA_VALUE).array(),
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":3," +
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
"\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," +
"\"groupInstanceId\":\"group-instance-1\",\"clientId\":\"client-1\",\"clientHost\":\"host-1\"," +
"\"rebalanceTimeout\":1000,\"sessionTimeout\":1500,\"subscription\":\"AAE=\",\"assignment\":\"AQI=\"}]}}}"
"""
{"key":{"type":2,"data":{"group":"group-id"}},
"value":{"version":3,
"data":{"protocolType":"consumer",
"generation":1,
"protocol":"range",
"leader":"leader",
"currentStateTimestamp":1234,
"members":[{"memberId":"member-1",
"groupInstanceId":"group-instance-1",
"clientId":"client-1",
"clientHost":"host-1",
"rebalanceTimeout":1000,
"sessionTimeout":1500,
"subscription":"AAE=",
"assignment":"AQI="}]}}}
"""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 4, GROUP_METADATA_VALUE).array(),
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":4," +
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
"\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," +
"\"groupInstanceId\":\"group-instance-1\",\"clientId\":\"client-1\",\"clientHost\":\"host-1\"," +
"\"rebalanceTimeout\":1000,\"sessionTimeout\":1500,\"subscription\":\"AAE=\",\"assignment\":\"AQI=\"}]}}}"
"""
{"key":{"type":2,"data":{"group":"group-id"}},
"value":{"version":4,
"data":{"protocolType":"consumer",
"generation":1,
"protocol":"range",
"leader":"leader",
"currentStateTimestamp":1234,
"members":[{"memberId":"member-1",
"groupInstanceId":"group-instance-1",
"clientId":"client-1",
"clientHost":"host-1",
"rebalanceTimeout":1000,
"sessionTimeout":1500,
"subscription":"AAE=",
"assignment":"AQI="}]}}}
"""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
null,
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":null}"),
"""
{"key":{"type":2,"data":{"group":"group-id"}},"value":null}
"""
),
Arguments.of(
null,
MessageUtil.toVersionPrefixedByteBuffer((short) 4, GROUP_METADATA_VALUE).array(),
""),
""
),
Arguments.of(null, null, ""),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 4, OFFSET_COMMIT_VALUE).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_VALUE).array(),
""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer(Short.MAX_VALUE, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_VALUE).array(),
""
)
);
}
@ParameterizedTest
@MethodSource("parameters")
public void testTransactionLogMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) {
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
TOPIC, 0, 0,
0L, TimestampType.CREATE_TIME, 0,
0, keyBuffer, valueBuffer,
new RecordHeaders(), Optional.empty());
try (MessageFormatter formatter = new GroupMetadataMessageFormatter()) {
formatter.configure(emptyMap());
ByteArrayOutputStream out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals(expectedOutput, out.toString());
}
}
}

View File

@ -16,30 +16,18 @@
*/
package org.apache.kafka.tools.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Collections;
import java.util.Optional;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class OffsetMessageFormatterTest {
public class OffsetMessageFormatterTest extends CoordinatorRecordMessageFormatterTest {
private static final OffsetCommitKey OFFSET_COMMIT_KEY = new OffsetCommitKey()
.setGroup("group-id")
@ -50,7 +38,7 @@ public class OffsetMessageFormatterTest {
.setLeaderEpoch(10)
.setMetadata("metadata")
.setCommitTimestamp(1234L)
.setExpireTimestamp(-1L);
.setExpireTimestamp(5678L);
private static final GroupMetadataKey GROUP_METADATA_KEY = new GroupMetadataKey().setGroup("group-id");
private static final GroupMetadataValue GROUP_METADATA_VALUE = new GroupMetadataValue()
.setProtocolType("consumer")
@ -58,93 +46,107 @@ public class OffsetMessageFormatterTest {
.setProtocol("range")
.setLeader("leader")
.setMembers(Collections.emptyList());
private static final String TOPIC = "TOPIC";
private static Stream<Arguments> parameters() {
@Override
protected CoordinatorRecordMessageFormatter formatter() {
return new OffsetsMessageFormatter();
}
@Override
protected Stream<Arguments> parameters() {
return Stream.of(
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 10, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 10, OFFSET_COMMIT_VALUE).array(),
""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_VALUE).array(),
"{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," +
"\"value\":{\"version\":0,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," +
"\"commitTimestamp\":1234}}}"
"""
{"key":{"type":0,"data":{"group":"group-id","topic":"foo","partition":1}},
"value":{"version":0,
"data":{"offset":100,
"metadata":"metadata",
"commitTimestamp":1234}}}
"""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_VALUE).array(),
"""
{"key":{"type":1,"data":{"group":"group-id","topic":"foo","partition":1}},
"value":{"version":0,
"data":{"offset":100,
"metadata":"metadata",
"commitTimestamp":1234}}}
"""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_VALUE).array(),
"{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," +
"\"value\":{\"version\":1,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," +
"\"commitTimestamp\":1234,\"expireTimestamp\":-1}}}"
"""
{"key":{"type":1,"data":{"group":"group-id","topic":"foo","partition":1}},
"value":{"version":1,
"data":{"offset":100,
"metadata":"metadata",
"commitTimestamp":1234,
"expireTimestamp":5678}}}
"""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 2, OFFSET_COMMIT_VALUE).array(),
"{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," +
"\"value\":{\"version\":2,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," +
"\"commitTimestamp\":1234}}}"
"""
{"key":{"type":1,"data":{"group":"group-id","topic":"foo","partition":1}},
"value":{"version":2,
"data":{"offset":100,
"metadata":"metadata",
"commitTimestamp":1234}}}
"""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 3, OFFSET_COMMIT_VALUE).array(),
"{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," +
"\"value\":{\"version\":3,\"data\":{\"offset\":100,\"leaderEpoch\":10," +
"\"metadata\":\"metadata\",\"commitTimestamp\":1234}}}"
"""
{"key":{"type":1,"data":{"group":"group-id","topic":"foo","partition":1}},
"value":{"version":3,
"data":{"offset":100,
"leaderEpoch":10,
"metadata":"metadata",
"commitTimestamp":1234}}}
"""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 4, OFFSET_COMMIT_VALUE).array(),
"{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," +
"\"value\":{\"version\":4,\"data\":{\"offset\":100,\"leaderEpoch\":10," +
"\"metadata\":\"metadata\",\"commitTimestamp\":1234}}}"
"""
{"key":{"type":1,"data":{"group":"group-id","topic":"foo","partition":1}},
"value":{"version":4,
"data":{"offset":100,
"leaderEpoch":10,
"metadata":"metadata",
"commitTimestamp":1234}}}
"""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 5, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 4, OFFSET_COMMIT_VALUE).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(),
null,
"""
{"key":{"type":1,"data":{"group":"group-id","topic":"foo","partition":1}},"value":null}
"""
),
Arguments.of(
null,
MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_VALUE).array(),
""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 5, OFFSET_COMMIT_VALUE).array(),
"{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," +
"\"value\":{\"version\":5,\"data\":\"unknown\"}}"
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
null,
"{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," +
"\"value\":null}"),
Arguments.of(
null,
MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_VALUE).array(),
""),
Arguments.of(null, null, ""),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_VALUE).array(),
""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer(Short.MAX_VALUE, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_VALUE).array(),
""
)
);
}
@ParameterizedTest
@MethodSource("parameters")
public void testTransactionLogMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) {
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
TOPIC, 0, 0,
0L, TimestampType.CREATE_TIME, 0,
0, keyBuffer, valueBuffer,
new RecordHeaders(), Optional.empty());
try (MessageFormatter formatter = new OffsetsMessageFormatter()) {
formatter.configure(emptyMap());
ByteArrayOutputStream out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals(expectedOutput, out.toString());
}
}
}

View File

@ -16,28 +16,17 @@
*/
package org.apache.kafka.tools.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Optional;
import java.util.stream.Stream;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TransactionLogMessageFormatterTest {
public class TransactionLogMessageFormatterTest extends CoordinatorRecordMessageFormatterTest {
private static final TransactionLogKey TXN_LOG_KEY = new TransactionLogKey()
.setTransactionalId("TXNID");
@ -49,9 +38,14 @@ public class TransactionLogMessageFormatterTest {
.setTransactionLastUpdateTimestampMs(1000L)
.setTransactionTimeoutMs(500)
.setTransactionPartitions(emptyList());
private static final String TOPIC = "TOPIC";
private static Stream<Arguments> parameters() {
@Override
protected CoordinatorRecordMessageFormatter formatter() {
return new TransactionLogMessageFormatter();
}
@Override
protected Stream<Arguments> parameters() {
return Stream.of(
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 10, TXN_LOG_KEY).array(),
@ -61,48 +55,56 @@ public class TransactionLogMessageFormatterTest {
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, TXN_LOG_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(),
"{\"key\":{\"type\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," +
"\"value\":{\"version\":1,\"data\":{\"producerId\":100,\"producerEpoch\":50," +
"\"transactionTimeoutMs\":500,\"transactionStatus\":4,\"transactionPartitions\":[]," +
"\"transactionLastUpdateTimestampMs\":1000,\"transactionStartTimestampMs\":750}}}"
"""
{"key":{"type":0,"data":{"transactionalId":"TXNID"}},
"value":{"version":1,
"data":{"producerId":100,
"producerEpoch":50,
"transactionTimeoutMs":500,
"transactionStatus":4,
"transactionPartitions":[],
"transactionLastUpdateTimestampMs":1000,
"transactionStartTimestampMs":750}}}
"""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, TXN_LOG_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 5, TXN_LOG_VALUE).array(),
"{\"key\":{\"type\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," +
"\"value\":{\"version\":5,\"data\":\"unknown\"}}"
MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(),
"""
{"key":{"type":0,"data":{"transactionalId":"TXNID"}},
"value":{"version":1,
"data":{"producerId":100,
"producerEpoch":50,
"transactionTimeoutMs":500,
"transactionStatus":4,
"transactionPartitions":[],
"transactionLastUpdateTimestampMs":1000,
"transactionStartTimestampMs":750}}}
"""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(),
""),
""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, TXN_LOG_KEY).array(),
null,
"{\"key\":{\"type\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," +
"\"value\":null}"),
"""
{"key":{"type":0,"data":{"transactionalId":"TXNID"}},"value":null}
"""
),
Arguments.of(
null,
MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(),
""),
Arguments.of(null, null, "")
""
),
Arguments.of(null, null, ""),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer(Short.MAX_VALUE, TXN_LOG_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(),
""
)
);
}
@ParameterizedTest
@MethodSource("parameters")
public void testTransactionLogMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) {
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
TOPIC, 0, 0,
0L, TimestampType.CREATE_TIME, 0,
0, keyBuffer, valueBuffer,
new RecordHeaders(), Optional.empty());
try (MessageFormatter formatter = new TransactionLogMessageFormatter()) {
formatter.configure(emptyMap());
ByteArrayOutputStream out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals(expectedOutput, out.toString());
}
}
}