From b6adec48c5501d7c6ceebabecf1321dd6c7b8835 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 26 Mar 2025 11:12:38 +0100 Subject: [PATCH] KAFKA-18616; Refactor Tools's ApiMessageFormatter (#18695) This patch refactors the `ApiMessageFormatter` to follow what we have done in https://github.com/apache/kafka/pull/18688. Reviewers: Chia-Ping Tsai --- checkstyle/import-control.xml | 1 + ...=> CoordinatorRecordMessageFormatter.java} | 64 +++-- .../GroupMetadataMessageFormatter.java | 53 ++-- .../consumer/OffsetsMessageFormatter.java | 56 ++-- .../TransactionLogMessageFormatter.java | 52 ++-- .../ShareGroupStateMessageFormatter.java | 3 +- ...CoordinatorRecordMessageFormatterTest.java | 68 +++++ .../GroupMetadataMessageFormatterTest.java | 271 ++++++++++-------- .../consumer/OffsetMessageFormatterTest.java | 224 ++++++++------- .../TransactionLogMessageFormatterTest.java | 148 +++++----- 10 files changed, 512 insertions(+), 428 deletions(-) rename tools/src/main/java/org/apache/kafka/tools/consumer/{ApiMessageFormatter.java => CoordinatorRecordMessageFormatter.java} (52%) create mode 100644 tools/src/test/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatterTest.java diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 318c1721928..15ed859b817 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -338,6 +338,7 @@ + diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ApiMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatter.java similarity index 52% rename from tools/src/main/java/org/apache/kafka/tools/consumer/ApiMessageFormatter.java rename to tools/src/main/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatter.java index c3c529265e1..3838bf01aba 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/ApiMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatter.java @@ -18,6 +18,9 @@ package org.apache.kafka.tools.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; @@ -31,44 +34,50 @@ import java.util.Objects; import static java.nio.charset.StandardCharsets.UTF_8; -public abstract class ApiMessageFormatter implements MessageFormatter { - +public abstract class CoordinatorRecordMessageFormatter implements MessageFormatter { private static final String TYPE = "type"; private static final String VERSION = "version"; private static final String DATA = "data"; private static final String KEY = "key"; private static final String VALUE = "value"; - static final String UNKNOWN = "unknown"; + + private final CoordinatorRecordSerde serde; + + public CoordinatorRecordMessageFormatter(CoordinatorRecordSerde serde) { + this.serde = serde; + } @Override public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { + if (Objects.isNull(consumerRecord.key())) return; + ObjectNode json = new ObjectNode(JsonNodeFactory.instance); + try { + CoordinatorRecord record = serde.deserialize( + ByteBuffer.wrap(consumerRecord.key()), + consumerRecord.value() != null ? ByteBuffer.wrap(consumerRecord.value()) : null + ); - byte[] key = consumerRecord.key(); - if (Objects.nonNull(key)) { - short keyVersion = ByteBuffer.wrap(key).getShort(); - JsonNode dataNode = readToKeyJson(ByteBuffer.wrap(key)); + if (!isRecordTypeAllowed(record.key().apiKey())) return; - if (dataNode instanceof NullNode) { - return; + json + .putObject(KEY) + .put(TYPE, record.key().apiKey()) + .set(DATA, keyAsJson(record.key())); + + if (Objects.nonNull(record.value())) { + json + .putObject(VALUE) + .put(VERSION, record.value().version()) + .set(DATA, valueAsJson(record.value().message(), record.value().version())); + } else { + json.set(VALUE, NullNode.getInstance()); } - json.putObject(KEY) - .put(TYPE, keyVersion) - .set(DATA, dataNode); - } else { + } catch (CoordinatorRecordSerde.UnknownRecordTypeException ex) { return; - } - - byte[] value = consumerRecord.value(); - if (Objects.nonNull(value)) { - short valueVersion = ByteBuffer.wrap(value).getShort(); - JsonNode dataNode = readToValueJson(ByteBuffer.wrap(value)); - - json.putObject(VALUE) - .put(VERSION, valueVersion) - .set(DATA, dataNode); - } else { - json.set(VALUE, NullNode.getInstance()); + } catch (RuntimeException ex) { + throw new RuntimeException("Could not read record at offset " + consumerRecord.offset() + + " due to: " + ex.getMessage(), ex); } try { @@ -78,6 +87,7 @@ public abstract class ApiMessageFormatter implements MessageFormatter { } } - protected abstract JsonNode readToKeyJson(ByteBuffer byteBuffer); - protected abstract JsonNode readToValueJson(ByteBuffer byteBuffer); + protected abstract boolean isRecordTypeAllowed(short recordType); + protected abstract JsonNode keyAsJson(ApiMessage message); + protected abstract JsonNode valueAsJson(ApiMessage message, short version); } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java index e26f4d103d5..460486d6232 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java @@ -16,45 +16,36 @@ */ package org.apache.kafka.tools.consumer; -import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde; +import org.apache.kafka.coordinator.group.generated.CoordinatorRecordJsonConverters; import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; -import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; -import org.apache.kafka.coordinator.group.generated.GroupMetadataKeyJsonConverter; -import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.GroupMetadataValueJsonConverter; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.NullNode; -import com.fasterxml.jackson.databind.node.TextNode; -import java.nio.ByteBuffer; +import java.util.Set; -public class GroupMetadataMessageFormatter extends ApiMessageFormatter { - @Override - protected JsonNode readToKeyJson(ByteBuffer byteBuffer) { - try { - switch (CoordinatorRecordType.fromId(byteBuffer.getShort())) { - case GROUP_METADATA: - return GroupMetadataKeyJsonConverter.write( - new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), (short) 0), - (short) 0 - ); +public class GroupMetadataMessageFormatter extends CoordinatorRecordMessageFormatter { + private static final Set ALLOWED_RECORDS = Set.of( + CoordinatorRecordType.GROUP_METADATA.id() + ); - default: - return NullNode.getInstance(); - } - } catch (UnsupportedVersionException ex) { - return NullNode.getInstance(); - } + public GroupMetadataMessageFormatter() { + super(new GroupCoordinatorRecordSerde()); } @Override - protected JsonNode readToValueJson(ByteBuffer byteBuffer) { - short version = byteBuffer.getShort(); - if (version >= GroupMetadataValue.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataValue.HIGHEST_SUPPORTED_VERSION) { - return GroupMetadataValueJsonConverter.write(new GroupMetadataValue(new ByteBufferAccessor(byteBuffer), version), version); - } - return new TextNode(UNKNOWN); + protected boolean isRecordTypeAllowed(short recordType) { + return ALLOWED_RECORDS.contains(recordType); + } + + @Override + protected JsonNode keyAsJson(ApiMessage message) { + return CoordinatorRecordJsonConverters.writeRecordKeyAsJson(message); + } + + @Override + protected JsonNode valueAsJson(ApiMessage message, short version) { + return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java index 3cc715b9348..f21612fb921 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java @@ -16,50 +16,40 @@ */ package org.apache.kafka.tools.consumer; -import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde; +import org.apache.kafka.coordinator.group.generated.CoordinatorRecordJsonConverters; import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; -import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; -import org.apache.kafka.coordinator.group.generated.OffsetCommitKeyJsonConverter; -import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; -import org.apache.kafka.coordinator.group.generated.OffsetCommitValueJsonConverter; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.NullNode; -import com.fasterxml.jackson.databind.node.TextNode; -import java.nio.ByteBuffer; +import java.util.Set; /** * Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false. */ -public class OffsetsMessageFormatter extends ApiMessageFormatter { - @Override - protected JsonNode readToKeyJson(ByteBuffer byteBuffer) { - try { - switch (CoordinatorRecordType.fromId(byteBuffer.getShort())) { - // We can read both record types with the offset commit one. - case LEGACY_OFFSET_COMMIT: - case OFFSET_COMMIT: - return OffsetCommitKeyJsonConverter.write( - new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), (short) 0), - (short) 0 - ); +public class OffsetsMessageFormatter extends CoordinatorRecordMessageFormatter { + private static final Set ALLOWED_RECORDS = Set.of( + CoordinatorRecordType.LEGACY_OFFSET_COMMIT.id(), + CoordinatorRecordType.OFFSET_COMMIT.id() + ); - default: - return NullNode.getInstance(); - } - } catch (UnsupportedVersionException ex) { - return NullNode.getInstance(); - } + public OffsetsMessageFormatter() { + super(new GroupCoordinatorRecordSerde()); } @Override - protected JsonNode readToValueJson(ByteBuffer byteBuffer) { - short version = byteBuffer.getShort(); - if (version >= OffsetCommitValue.LOWEST_SUPPORTED_VERSION && version <= OffsetCommitValue.HIGHEST_SUPPORTED_VERSION) { - return OffsetCommitValueJsonConverter.write(new OffsetCommitValue(new ByteBufferAccessor(byteBuffer), version), version); - } - return new TextNode(UNKNOWN); + protected boolean isRecordTypeAllowed(short recordType) { + return ALLOWED_RECORDS.contains(recordType); + } + + @Override + protected JsonNode keyAsJson(ApiMessage message) { + return CoordinatorRecordJsonConverters.writeRecordKeyAsJson(message); + } + + @Override + protected JsonNode valueAsJson(ApiMessage message, short version) { + return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java index 574ce0ce63e..cf9c540da3e 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java @@ -16,45 +16,29 @@ */ package org.apache.kafka.tools.consumer; -import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.protocol.ByteBufferAccessor; -import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordType; -import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; -import org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter; -import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; -import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde; +import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordJsonConverters; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.NullNode; -import com.fasterxml.jackson.databind.node.TextNode; -import java.nio.ByteBuffer; - -public class TransactionLogMessageFormatter extends ApiMessageFormatter { - @Override - protected JsonNode readToKeyJson(ByteBuffer byteBuffer) { - try { - switch (CoordinatorRecordType.fromId(byteBuffer.getShort())) { - case TRANSACTION_LOG: - return TransactionLogKeyJsonConverter.write( - new TransactionLogKey(new ByteBufferAccessor(byteBuffer), (short) 0), - (short) 0 - ); - - default: - return NullNode.getInstance(); - } - } catch (UnsupportedVersionException ex) { - return NullNode.getInstance(); - } +public class TransactionLogMessageFormatter extends CoordinatorRecordMessageFormatter { + public TransactionLogMessageFormatter() { + super(new TransactionCoordinatorRecordSerde()); } @Override - protected JsonNode readToValueJson(ByteBuffer byteBuffer) { - short version = byteBuffer.getShort(); - if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION && version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) { - return TransactionLogValueJsonConverter.write(new TransactionLogValue(new ByteBufferAccessor(byteBuffer), version), version); - } - return new TextNode(UNKNOWN); + protected boolean isRecordTypeAllowed(short recordType) { + return true; + } + + @Override + protected JsonNode keyAsJson(ApiMessage message) { + return CoordinatorRecordJsonConverters.writeRecordKeyAsJson(message); + } + + @Override + protected JsonNode valueAsJson(ApiMessage message, short version) { + return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/share/ShareGroupStateMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/share/ShareGroupStateMessageFormatter.java index c5358b62d1b..d695e17d044 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/share/ShareGroupStateMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/share/ShareGroupStateMessageFormatter.java @@ -30,6 +30,7 @@ import org.apache.kafka.coordinator.share.generated.ShareUpdateKey; import org.apache.kafka.coordinator.share.generated.ShareUpdateKeyJsonConverter; import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; import org.apache.kafka.coordinator.share.generated.ShareUpdateValueJsonConverter; +import org.apache.kafka.tools.consumer.CoordinatorRecordMessageFormatter; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; @@ -132,7 +133,7 @@ public class ShareGroupStateMessageFormatter implements MessageFormatter { * as per RPC spec. * To differentiate, we need to use the corresponding key versions. This is acceptable as * the records will always appear in pairs (key, value). However, this means that we cannot - * extend {@link org.apache.kafka.tools.consumer.ApiMessageFormatter} as it requires overriding + * extend {@link CoordinatorRecordMessageFormatter} as it requires overriding * readToValueJson whose signature does not allow for passing keyversion. * * @param byteBuffer - Represents the raw data read from the topic diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatterTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatterTest.java new file mode 100644 index 00000000000..58cb3f7055e --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatterTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; + +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.Optional; +import java.util.stream.Stream; + +import static java.util.Collections.emptyMap; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class CoordinatorRecordMessageFormatterTest { + private static final String TOPIC = "TOPIC"; + + protected abstract CoordinatorRecordMessageFormatter formatter(); + protected abstract Stream parameters(); + + @ParameterizedTest + @MethodSource("parameters") + public void testMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) { + ConsumerRecord record = new ConsumerRecord<>( + TOPIC, + 0, + 0, + 0L, + TimestampType.CREATE_TIME, + 0, + 0, + keyBuffer, + valueBuffer, + new RecordHeaders(), + Optional.empty() + ); + + try (MessageFormatter formatter = formatter()) { + formatter.configure(emptyMap()); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + formatter.writeTo(record, new PrintStream(out)); + assertEquals(expectedOutput.replaceAll("\\s+", ""), out.toString()); + } + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatterTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatterTest.java index 62ae43bd09c..0e158bc016b 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatterTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatterTest.java @@ -16,144 +16,179 @@ */ package org.apache.kafka.tools.consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.MessageFormatter; -import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.protocol.MessageUtil; -import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; -import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.util.Optional; import java.util.stream.Stream; -import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; -import static org.junit.jupiter.api.Assertions.assertEquals; -public class GroupMetadataMessageFormatterTest { +public class GroupMetadataMessageFormatterTest extends CoordinatorRecordMessageFormatterTest { private static final OffsetCommitKey OFFSET_COMMIT_KEY = new OffsetCommitKey() - .setGroup("group-id") - .setTopic("foo") - .setPartition(1); + .setGroup("group-id") + .setTopic("foo") + .setPartition(1); private static final OffsetCommitValue OFFSET_COMMIT_VALUE = new OffsetCommitValue() - .setOffset(100L) - .setLeaderEpoch(10) - .setMetadata("metadata") - .setCommitTimestamp(1234L) - .setExpireTimestamp(-1L); + .setOffset(100L) + .setLeaderEpoch(10) + .setMetadata("metadata") + .setCommitTimestamp(1234L) + .setExpireTimestamp(-1L); private static final GroupMetadataValue.MemberMetadata MEMBER_METADATA = new GroupMetadataValue.MemberMetadata() - .setMemberId("member-1") - .setClientId("client-1") - .setClientHost("host-1") - .setRebalanceTimeout(1000) - .setSessionTimeout(1500) - .setGroupInstanceId("group-instance-1") - .setSubscription(new byte[]{0, 1}) - .setAssignment(new byte[]{1, 2}); + .setMemberId("member-1") + .setClientId("client-1") + .setClientHost("host-1") + .setRebalanceTimeout(1000) + .setSessionTimeout(1500) + .setGroupInstanceId("group-instance-1") + .setSubscription(new byte[]{0, 1}) + .setAssignment(new byte[]{1, 2}); private static final GroupMetadataKey GROUP_METADATA_KEY = new GroupMetadataKey() - .setGroup("group-id"); + .setGroup("group-id"); private static final GroupMetadataValue GROUP_METADATA_VALUE = new GroupMetadataValue() - .setProtocolType("consumer") - .setGeneration(1) - .setProtocol("range") - .setLeader("leader") - .setMembers(singletonList(MEMBER_METADATA)) - .setCurrentStateTimestamp(1234L); - private static final String TOPIC = "TOPIC"; + .setProtocolType("consumer") + .setGeneration(1) + .setProtocol("range") + .setLeader("leader") + .setMembers(singletonList(MEMBER_METADATA)) + .setCurrentStateTimestamp(1234L); - private static Stream parameters() { + @Override + protected CoordinatorRecordMessageFormatter formatter() { + return new GroupMetadataMessageFormatter(); + } + + @Override + protected Stream parameters() { return Stream.of( - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_VALUE).array(), - "" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 0, GROUP_METADATA_VALUE).array(), - "{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":0," + - "\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," + - "\"leader\":\"leader\",\"members\":[{\"memberId\":\"member-1\",\"clientId\":\"client-1\"," + - "\"clientHost\":\"host-1\",\"sessionTimeout\":1500,\"subscription\":\"AAE=\"," + - "\"assignment\":\"AQI=\"}]}}}" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 1, GROUP_METADATA_VALUE).array(), - "{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":1," + - "\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," + - "\"leader\":\"leader\",\"members\":[{\"memberId\":\"member-1\",\"clientId\":\"client-1\"," + - "\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000,\"sessionTimeout\":1500," + - "\"subscription\":\"AAE=\",\"assignment\":\"AQI=\"}]}}}" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_VALUE).array(), - "{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":2," + - "\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," + - "\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," + - "\"clientId\":\"client-1\",\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000," + - "\"sessionTimeout\":1500,\"subscription\":\"AAE=\",\"assignment\":\"AQI=\"}]}}}" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 3, GROUP_METADATA_VALUE).array(), - "{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":3," + - "\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," + - "\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," + - "\"groupInstanceId\":\"group-instance-1\",\"clientId\":\"client-1\",\"clientHost\":\"host-1\"," + - "\"rebalanceTimeout\":1000,\"sessionTimeout\":1500,\"subscription\":\"AAE=\",\"assignment\":\"AQI=\"}]}}}" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 4, GROUP_METADATA_VALUE).array(), - "{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":4," + - "\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," + - "\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," + - "\"groupInstanceId\":\"group-instance-1\",\"clientId\":\"client-1\",\"clientHost\":\"host-1\"," + - "\"rebalanceTimeout\":1000,\"sessionTimeout\":1500,\"subscription\":\"AAE=\",\"assignment\":\"AQI=\"}]}}}" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), - null, - "{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":null}"), - Arguments.of( - null, - MessageUtil.toVersionPrefixedByteBuffer((short) 4, GROUP_METADATA_VALUE).array(), - ""), - Arguments.of(null, null, ""), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 4, OFFSET_COMMIT_VALUE).array(), - "" - ) + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 0, GROUP_METADATA_VALUE).array(), + """ + {"key":{"type":2,"data":{"group":"group-id"}}, + "value":{"version":0, + "data":{"protocolType":"consumer", + "generation":1, + "protocol":"range", + "leader":"leader", + "members":[{"memberId":"member-1", + "clientId":"client-1", + "clientHost":"host-1", + "sessionTimeout":1500, + "subscription":"AAE=", + "assignment":"AQI="}]}}} + """ + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 1, GROUP_METADATA_VALUE).array(), + """ + {"key":{"type":2,"data":{"group":"group-id"}}, + "value":{"version":1, + "data":{"protocolType":"consumer", + "generation":1, + "protocol":"range", + "leader":"leader", + "members":[{"memberId":"member-1", + "clientId":"client-1", + "clientHost":"host-1", + "rebalanceTimeout":1000, + "sessionTimeout":1500, + "subscription":"AAE=", + "assignment":"AQI="}]}}} + """ + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_VALUE).array(), + """ + {"key":{"type":2,"data":{"group":"group-id"}}, + "value":{"version":2, + "data":{"protocolType":"consumer", + "generation":1, + "protocol":"range", + "leader":"leader", + "currentStateTimestamp":1234, + "members":[{"memberId":"member-1", + "clientId":"client-1", + "clientHost":"host-1", + "rebalanceTimeout":1000, + "sessionTimeout":1500, + "subscription":"AAE=", + "assignment":"AQI="}]}}} + """ + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 3, GROUP_METADATA_VALUE).array(), + """ + {"key":{"type":2,"data":{"group":"group-id"}}, + "value":{"version":3, + "data":{"protocolType":"consumer", + "generation":1, + "protocol":"range", + "leader":"leader", + "currentStateTimestamp":1234, + "members":[{"memberId":"member-1", + "groupInstanceId":"group-instance-1", + "clientId":"client-1", + "clientHost":"host-1", + "rebalanceTimeout":1000, + "sessionTimeout":1500, + "subscription":"AAE=", + "assignment":"AQI="}]}}} + """ + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 4, GROUP_METADATA_VALUE).array(), + """ + {"key":{"type":2,"data":{"group":"group-id"}}, + "value":{"version":4, + "data":{"protocolType":"consumer", + "generation":1, + "protocol":"range", + "leader":"leader", + "currentStateTimestamp":1234, + "members":[{"memberId":"member-1", + "groupInstanceId":"group-instance-1", + "clientId":"client-1", + "clientHost":"host-1", + "rebalanceTimeout":1000, + "sessionTimeout":1500, + "subscription":"AAE=", + "assignment":"AQI="}]}}} + """ + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), + null, + """ + {"key":{"type":2,"data":{"group":"group-id"}},"value":null} + """ + ), + Arguments.of( + null, + MessageUtil.toVersionPrefixedByteBuffer((short) 4, GROUP_METADATA_VALUE).array(), + "" + ), + Arguments.of(null, null, ""), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_VALUE).array(), + "" + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer(Short.MAX_VALUE, OFFSET_COMMIT_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_VALUE).array(), + "" + ) ); } - - @ParameterizedTest - @MethodSource("parameters") - public void testTransactionLogMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) { - ConsumerRecord record = new ConsumerRecord<>( - TOPIC, 0, 0, - 0L, TimestampType.CREATE_TIME, 0, - 0, keyBuffer, valueBuffer, - new RecordHeaders(), Optional.empty()); - - try (MessageFormatter formatter = new GroupMetadataMessageFormatter()) { - formatter.configure(emptyMap()); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - formatter.writeTo(record, new PrintStream(out)); - assertEquals(expectedOutput, out.toString()); - } - } } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java index 684c681a175..331b252dcc4 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java @@ -16,135 +16,137 @@ */ package org.apache.kafka.tools.consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.MessageFormatter; -import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.protocol.MessageUtil; -import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; -import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; import java.util.Collections; -import java.util.Optional; import java.util.stream.Stream; -import static java.util.Collections.emptyMap; -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class OffsetMessageFormatterTest { +public class OffsetMessageFormatterTest extends CoordinatorRecordMessageFormatterTest { private static final OffsetCommitKey OFFSET_COMMIT_KEY = new OffsetCommitKey() - .setGroup("group-id") - .setTopic("foo") - .setPartition(1); + .setGroup("group-id") + .setTopic("foo") + .setPartition(1); private static final OffsetCommitValue OFFSET_COMMIT_VALUE = new OffsetCommitValue() - .setOffset(100L) - .setLeaderEpoch(10) - .setMetadata("metadata") - .setCommitTimestamp(1234L) - .setExpireTimestamp(-1L); + .setOffset(100L) + .setLeaderEpoch(10) + .setMetadata("metadata") + .setCommitTimestamp(1234L) + .setExpireTimestamp(5678L); private static final GroupMetadataKey GROUP_METADATA_KEY = new GroupMetadataKey().setGroup("group-id"); private static final GroupMetadataValue GROUP_METADATA_VALUE = new GroupMetadataValue() - .setProtocolType("consumer") - .setGeneration(1) - .setProtocol("range") - .setLeader("leader") - .setMembers(Collections.emptyList()); - private static final String TOPIC = "TOPIC"; + .setProtocolType("consumer") + .setGeneration(1) + .setProtocol("range") + .setLeader("leader") + .setMembers(Collections.emptyList()); - private static Stream parameters() { + @Override + protected CoordinatorRecordMessageFormatter formatter() { + return new OffsetsMessageFormatter(); + } + + @Override + protected Stream parameters() { return Stream.of( - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 10, OFFSET_COMMIT_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 10, OFFSET_COMMIT_VALUE).array(), - "" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_VALUE).array(), - "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + - "\"value\":{\"version\":0,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," + - "\"commitTimestamp\":1234}}}" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_VALUE).array(), - "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + - "\"value\":{\"version\":1,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," + - "\"commitTimestamp\":1234,\"expireTimestamp\":-1}}}" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 2, OFFSET_COMMIT_VALUE).array(), - "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + - "\"value\":{\"version\":2,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," + - "\"commitTimestamp\":1234}}}" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 3, OFFSET_COMMIT_VALUE).array(), - "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + - "\"value\":{\"version\":3,\"data\":{\"offset\":100,\"leaderEpoch\":10," + - "\"metadata\":\"metadata\",\"commitTimestamp\":1234}}}" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 4, OFFSET_COMMIT_VALUE).array(), - "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + - "\"value\":{\"version\":4,\"data\":{\"offset\":100,\"leaderEpoch\":10," + - "\"metadata\":\"metadata\",\"commitTimestamp\":1234}}}" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 5, OFFSET_COMMIT_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 4, OFFSET_COMMIT_VALUE).array(), - "" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 5, OFFSET_COMMIT_VALUE).array(), - "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + - "\"value\":{\"version\":5,\"data\":\"unknown\"}}" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), - null, - "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + - "\"value\":null}"), - Arguments.of( - null, - MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_VALUE).array(), - ""), - Arguments.of(null, null, ""), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_VALUE).array(), - "" - ) + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_VALUE).array(), + """ + {"key":{"type":0,"data":{"group":"group-id","topic":"foo","partition":1}}, + "value":{"version":0, + "data":{"offset":100, + "metadata":"metadata", + "commitTimestamp":1234}}} + """ + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_VALUE).array(), + """ + {"key":{"type":1,"data":{"group":"group-id","topic":"foo","partition":1}}, + "value":{"version":0, + "data":{"offset":100, + "metadata":"metadata", + "commitTimestamp":1234}}} + """ + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_VALUE).array(), + """ + {"key":{"type":1,"data":{"group":"group-id","topic":"foo","partition":1}}, + "value":{"version":1, + "data":{"offset":100, + "metadata":"metadata", + "commitTimestamp":1234, + "expireTimestamp":5678}}} + """ + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 2, OFFSET_COMMIT_VALUE).array(), + """ + {"key":{"type":1,"data":{"group":"group-id","topic":"foo","partition":1}}, + "value":{"version":2, + "data":{"offset":100, + "metadata":"metadata", + "commitTimestamp":1234}}} + """ + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 3, OFFSET_COMMIT_VALUE).array(), + """ + {"key":{"type":1,"data":{"group":"group-id","topic":"foo","partition":1}}, + "value":{"version":3, + "data":{"offset":100, + "leaderEpoch":10, + "metadata":"metadata", + "commitTimestamp":1234}}} + """ + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 4, OFFSET_COMMIT_VALUE).array(), + """ + {"key":{"type":1,"data":{"group":"group-id","topic":"foo","partition":1}}, + "value":{"version":4, + "data":{"offset":100, + "leaderEpoch":10, + "metadata":"metadata", + "commitTimestamp":1234}}} + """ + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(), + null, + """ + {"key":{"type":1,"data":{"group":"group-id","topic":"foo","partition":1}},"value":null} + """ + ), + Arguments.of( + null, + MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_VALUE).array(), + "" + ), + Arguments.of(null, null, ""), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_VALUE).array(), + "" + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer(Short.MAX_VALUE, GROUP_METADATA_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_VALUE).array(), + "" + ) ); } - - @ParameterizedTest - @MethodSource("parameters") - public void testTransactionLogMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) { - ConsumerRecord record = new ConsumerRecord<>( - TOPIC, 0, 0, - 0L, TimestampType.CREATE_TIME, 0, - 0, keyBuffer, valueBuffer, - new RecordHeaders(), Optional.empty()); - - try (MessageFormatter formatter = new OffsetsMessageFormatter()) { - formatter.configure(emptyMap()); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - formatter.writeTo(record, new PrintStream(out)); - assertEquals(expectedOutput, out.toString()); - } - } } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java index dc7946d5fa0..e2b4ed8aead 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java @@ -16,93 +16,95 @@ */ package org.apache.kafka.tools.consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.MessageFormatter; -import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.protocol.MessageUtil; -import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; -import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.util.Optional; import java.util.stream.Stream; import static java.util.Collections.emptyList; -import static java.util.Collections.emptyMap; -import static org.junit.jupiter.api.Assertions.assertEquals; -public class TransactionLogMessageFormatterTest { +public class TransactionLogMessageFormatterTest extends CoordinatorRecordMessageFormatterTest { private static final TransactionLogKey TXN_LOG_KEY = new TransactionLogKey() - .setTransactionalId("TXNID"); + .setTransactionalId("TXNID"); private static final TransactionLogValue TXN_LOG_VALUE = new TransactionLogValue() - .setProducerId(100) - .setProducerEpoch((short) 50) - .setTransactionStatus((byte) 4) - .setTransactionStartTimestampMs(750L) - .setTransactionLastUpdateTimestampMs(1000L) - .setTransactionTimeoutMs(500) - .setTransactionPartitions(emptyList()); - private static final String TOPIC = "TOPIC"; - - private static Stream parameters() { + .setProducerId(100) + .setProducerEpoch((short) 50) + .setTransactionStatus((byte) 4) + .setTransactionStartTimestampMs(750L) + .setTransactionLastUpdateTimestampMs(1000L) + .setTransactionTimeoutMs(500) + .setTransactionPartitions(emptyList()); + + @Override + protected CoordinatorRecordMessageFormatter formatter() { + return new TransactionLogMessageFormatter(); + } + + @Override + protected Stream parameters() { return Stream.of( - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 10, TXN_LOG_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 10, TXN_LOG_VALUE).array(), - "" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 0, TXN_LOG_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(), - "{\"key\":{\"type\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," + - "\"value\":{\"version\":1,\"data\":{\"producerId\":100,\"producerEpoch\":50," + - "\"transactionTimeoutMs\":500,\"transactionStatus\":4,\"transactionPartitions\":[]," + - "\"transactionLastUpdateTimestampMs\":1000,\"transactionStartTimestampMs\":750}}}" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 0, TXN_LOG_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 5, TXN_LOG_VALUE).array(), - "{\"key\":{\"type\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," + - "\"value\":{\"version\":5,\"data\":\"unknown\"}}" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(), - ""), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 0, TXN_LOG_KEY).array(), - null, - "{\"key\":{\"type\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," + - "\"value\":null}"), - Arguments.of( - null, - MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(), - ""), - Arguments.of(null, null, "") + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 10, TXN_LOG_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 10, TXN_LOG_VALUE).array(), + "" + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 0, TXN_LOG_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(), + """ + {"key":{"type":0,"data":{"transactionalId":"TXNID"}}, + "value":{"version":1, + "data":{"producerId":100, + "producerEpoch":50, + "transactionTimeoutMs":500, + "transactionStatus":4, + "transactionPartitions":[], + "transactionLastUpdateTimestampMs":1000, + "transactionStartTimestampMs":750}}} + """ + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 0, TXN_LOG_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(), + """ + {"key":{"type":0,"data":{"transactionalId":"TXNID"}}, + "value":{"version":1, + "data":{"producerId":100, + "producerEpoch":50, + "transactionTimeoutMs":500, + "transactionStatus":4, + "transactionPartitions":[], + "transactionLastUpdateTimestampMs":1000, + "transactionStartTimestampMs":750}}} + """ + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(), + "" + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 0, TXN_LOG_KEY).array(), + null, + """ + {"key":{"type":0,"data":{"transactionalId":"TXNID"}},"value":null} + """ + ), + Arguments.of( + null, + MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(), + "" + ), + Arguments.of(null, null, ""), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer(Short.MAX_VALUE, TXN_LOG_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(), + "" + ) ); } - - @ParameterizedTest - @MethodSource("parameters") - public void testTransactionLogMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) { - ConsumerRecord record = new ConsumerRecord<>( - TOPIC, 0, 0, - 0L, TimestampType.CREATE_TIME, 0, - 0, keyBuffer, valueBuffer, - new RecordHeaders(), Optional.empty()); - - try (MessageFormatter formatter = new TransactionLogMessageFormatter()) { - formatter.configure(emptyMap()); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - formatter.writeTo(record, new PrintStream(out)); - assertEquals(expectedOutput, out.toString()); - } - } }