MINOR: Fix MessageFormatters (#18266)

While looking at the message formatters in https://github.com/apache/kafka/pull/18261, I have noticed at few incorrect test cases.
* We should not log anything when the record type is unknown because the formatters have clear goals.
* We should not parse the value when the key is null or when the key cannot be parsed. While it works in the tests, in practice, this is wrong because we cannot assume that type of the value if the type of the key is not defined. The key drives the type of the entire record.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Jacot 2024-12-19 16:12:50 +01:00 committed by GitHub
parent 64279d2e82
commit b31aa65115
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 65 additions and 159 deletions

View File

@ -33,6 +33,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
public abstract class ApiMessageFormatter implements MessageFormatter { public abstract class ApiMessageFormatter implements MessageFormatter {
private static final String TYPE = "type";
private static final String VERSION = "version"; private static final String VERSION = "version";
private static final String DATA = "data"; private static final String DATA = "data";
private static final String KEY = "key"; private static final String KEY = "key";
@ -46,22 +47,22 @@ public abstract class ApiMessageFormatter implements MessageFormatter {
byte[] key = consumerRecord.key(); byte[] key = consumerRecord.key();
if (Objects.nonNull(key)) { if (Objects.nonNull(key)) {
short keyVersion = ByteBuffer.wrap(key).getShort(); short keyVersion = ByteBuffer.wrap(key).getShort();
JsonNode dataNode = readToKeyJson(ByteBuffer.wrap(key), keyVersion); JsonNode dataNode = readToKeyJson(ByteBuffer.wrap(key));
if (dataNode instanceof NullNode) { if (dataNode instanceof NullNode) {
return; return;
} }
json.putObject(KEY) json.putObject(KEY)
.put(VERSION, keyVersion) .put(TYPE, keyVersion)
.set(DATA, dataNode); .set(DATA, dataNode);
} else { } else {
json.set(KEY, NullNode.getInstance()); return;
} }
byte[] value = consumerRecord.value(); byte[] value = consumerRecord.value();
if (Objects.nonNull(value)) { if (Objects.nonNull(value)) {
short valueVersion = ByteBuffer.wrap(value).getShort(); short valueVersion = ByteBuffer.wrap(value).getShort();
JsonNode dataNode = readToValueJson(ByteBuffer.wrap(value), valueVersion); JsonNode dataNode = readToValueJson(ByteBuffer.wrap(value));
json.putObject(VALUE) json.putObject(VALUE)
.put(VERSION, valueVersion) .put(VERSION, valueVersion)
@ -77,6 +78,6 @@ public abstract class ApiMessageFormatter implements MessageFormatter {
} }
} }
protected abstract JsonNode readToKeyJson(ByteBuffer byteBuffer, short version); protected abstract JsonNode readToKeyJson(ByteBuffer byteBuffer);
protected abstract JsonNode readToValueJson(ByteBuffer byteBuffer, short version); protected abstract JsonNode readToValueJson(ByteBuffer byteBuffer);
} }

View File

@ -16,66 +16,34 @@
*/ */
package org.apache.kafka.tools.consumer; package org.apache.kafka.tools.consumer;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKeyJsonConverter; import org.apache.kafka.coordinator.group.generated.GroupMetadataKeyJsonConverter;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValueJsonConverter; import org.apache.kafka.coordinator.group.generated.GroupMetadataValueJsonConverter;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode; import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode; import com.fasterxml.jackson.databind.node.TextNode;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Optional;
public class GroupMetadataMessageFormatter extends ApiMessageFormatter { public class GroupMetadataMessageFormatter extends ApiMessageFormatter {
@Override @Override
protected JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) { protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
return readToGroupMetadataKey(byteBuffer) short version = byteBuffer.getShort();
.map(logKey -> transferKeyMessageToJsonNode(logKey, version)) if (version >= GroupMetadataKey.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataKey.HIGHEST_SUPPORTED_VERSION) {
.orElseGet(() -> new TextNode(UNKNOWN)); return GroupMetadataKeyJsonConverter.write(new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), version), version);
}
return NullNode.getInstance();
} }
@Override @Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer, short version) { protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
return readToGroupMetadataValue(byteBuffer)
.map(logValue -> GroupMetadataValueJsonConverter.write(logValue, version))
.orElseGet(() -> new TextNode(UNKNOWN));
}
private Optional<ApiMessage> readToGroupMetadataKey(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort(); short version = byteBuffer.getShort();
if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION if (version >= GroupMetadataValue.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataValue.HIGHEST_SUPPORTED_VERSION) {
&& version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) { return GroupMetadataValueJsonConverter.write(new GroupMetadataValue(new ByteBufferAccessor(byteBuffer), version), version);
return Optional.of(new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), version));
} else if (version >= GroupMetadataKey.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataKey.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), version));
} else {
return Optional.empty();
}
}
private JsonNode transferKeyMessageToJsonNode(ApiMessage message, short version) {
if (message instanceof OffsetCommitKey) {
return NullNode.getInstance();
} else if (message instanceof GroupMetadataKey) {
return GroupMetadataKeyJsonConverter.write((GroupMetadataKey) message, version);
} else {
return new TextNode(UNKNOWN);
}
}
private Optional<GroupMetadataValue> readToGroupMetadataValue(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= GroupMetadataValue.LOWEST_SUPPORTED_VERSION
&& version <= GroupMetadataValue.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new GroupMetadataValue(new ByteBufferAccessor(byteBuffer), version));
} else {
return Optional.empty();
} }
return new TextNode(UNKNOWN);
} }
} }

View File

@ -16,9 +16,7 @@
*/ */
package org.apache.kafka.tools.consumer; package org.apache.kafka.tools.consumer;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKeyJsonConverter; import org.apache.kafka.coordinator.group.generated.OffsetCommitKeyJsonConverter;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
@ -29,56 +27,26 @@ import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode; import com.fasterxml.jackson.databind.node.TextNode;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Optional;
/** /**
* Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false. * Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
*/ */
public class OffsetsMessageFormatter extends ApiMessageFormatter { public class OffsetsMessageFormatter extends ApiMessageFormatter {
@Override @Override
protected JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) { protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
return readToGroupMetadataKey(byteBuffer) short version = byteBuffer.getShort();
.map(logKey -> transferKeyMessageToJsonNode(logKey, version)) if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION && version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) {
.orElseGet(() -> new TextNode(UNKNOWN)); return OffsetCommitKeyJsonConverter.write(new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), version), version);
}
return NullNode.getInstance();
} }
@Override @Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer, short version) { protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
return readToOffsetMessageValue(byteBuffer)
.map(logValue -> OffsetCommitValueJsonConverter.write(logValue, version))
.orElseGet(() -> new TextNode(UNKNOWN));
}
private Optional<ApiMessage> readToGroupMetadataKey(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort(); short version = byteBuffer.getShort();
if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION if (version >= OffsetCommitValue.LOWEST_SUPPORTED_VERSION && version <= OffsetCommitValue.HIGHEST_SUPPORTED_VERSION) {
&& version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) { return OffsetCommitValueJsonConverter.write(new OffsetCommitValue(new ByteBufferAccessor(byteBuffer), version), version);
return Optional.of(new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), version));
} else if (version >= GroupMetadataKey.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataKey.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), version));
} else {
return Optional.empty();
}
}
private JsonNode transferKeyMessageToJsonNode(ApiMessage logKey, short keyVersion) {
if (logKey instanceof OffsetCommitKey) {
return OffsetCommitKeyJsonConverter.write((OffsetCommitKey) logKey, keyVersion);
} else if (logKey instanceof GroupMetadataKey) {
return NullNode.getInstance();
} else {
return new TextNode(UNKNOWN);
}
}
private Optional<OffsetCommitValue> readToOffsetMessageValue(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= OffsetCommitValue.LOWEST_SUPPORTED_VERSION
&& version <= OffsetCommitValue.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new OffsetCommitValue(new ByteBufferAccessor(byteBuffer), version));
} else {
return Optional.empty();
} }
return new TextNode(UNKNOWN);
} }
} }

View File

@ -23,44 +23,27 @@ import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter; import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode; import com.fasterxml.jackson.databind.node.TextNode;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Optional;
public class TransactionLogMessageFormatter extends ApiMessageFormatter { public class TransactionLogMessageFormatter extends ApiMessageFormatter {
@Override @Override
protected JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) { protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
return readToTransactionLogKey(byteBuffer) short version = byteBuffer.getShort();
.map(logKey -> TransactionLogKeyJsonConverter.write(logKey, version)) if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION && version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
.orElseGet(() -> new TextNode(UNKNOWN)); return TransactionLogKeyJsonConverter.write(new TransactionLogKey(new ByteBufferAccessor(byteBuffer), version), version);
}
return NullNode.getInstance();
} }
@Override @Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer, short version) { protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
return readToTransactionLogValue(byteBuffer)
.map(logValue -> TransactionLogValueJsonConverter.write(logValue, version))
.orElseGet(() -> new TextNode(UNKNOWN));
}
private Optional<TransactionLogKey> readToTransactionLogKey(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort(); short version = byteBuffer.getShort();
if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION && version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) { return TransactionLogValueJsonConverter.write(new TransactionLogValue(new ByteBufferAccessor(byteBuffer), version), version);
return Optional.of(new TransactionLogKey(new ByteBufferAccessor(byteBuffer), version));
} else {
return Optional.empty();
}
}
private Optional<TransactionLogValue> readToTransactionLogValue(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION
&& version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new TransactionLogValue(new ByteBufferAccessor(byteBuffer), version));
} else {
return Optional.empty();
} }
return new TextNode(UNKNOWN);
} }
} }

View File

@ -76,12 +76,12 @@ public class GroupMetadataMessageFormatterTest {
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_VALUE).array(),
"{\"key\":{\"version\":10,\"data\":\"unknown\"},\"value\":{\"version\":10,\"data\":\"unknown\"}}" ""
), ),
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 0, GROUP_METADATA_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 0, GROUP_METADATA_VALUE).array(),
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":0," + "{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":0," +
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," + "\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
"\"leader\":\"leader\",\"members\":[{\"memberId\":\"member-1\",\"clientId\":\"client-1\"," + "\"leader\":\"leader\",\"members\":[{\"memberId\":\"member-1\",\"clientId\":\"client-1\"," +
"\"clientHost\":\"host-1\",\"sessionTimeout\":1500,\"subscription\":\"AAE=\"," + "\"clientHost\":\"host-1\",\"sessionTimeout\":1500,\"subscription\":\"AAE=\"," +
@ -90,7 +90,7 @@ public class GroupMetadataMessageFormatterTest {
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 1, GROUP_METADATA_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 1, GROUP_METADATA_VALUE).array(),
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":1," + "{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":1," +
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," + "\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
"\"leader\":\"leader\",\"members\":[{\"memberId\":\"member-1\",\"clientId\":\"client-1\"," + "\"leader\":\"leader\",\"members\":[{\"memberId\":\"member-1\",\"clientId\":\"client-1\"," +
"\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000,\"sessionTimeout\":1500," + "\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000,\"sessionTimeout\":1500," +
@ -99,7 +99,7 @@ public class GroupMetadataMessageFormatterTest {
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_VALUE).array(),
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":2," + "{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":2," +
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," + "\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
"\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," + "\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," +
"\"clientId\":\"client-1\",\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000," + "\"clientId\":\"client-1\",\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000," +
@ -108,7 +108,7 @@ public class GroupMetadataMessageFormatterTest {
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 3, GROUP_METADATA_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 3, GROUP_METADATA_VALUE).array(),
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":3," + "{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":3," +
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," + "\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
"\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," + "\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," +
"\"groupInstanceId\":\"group-instance-1\",\"clientId\":\"client-1\",\"clientHost\":\"host-1\"," + "\"groupInstanceId\":\"group-instance-1\",\"clientId\":\"client-1\",\"clientHost\":\"host-1\"," +
@ -117,7 +117,7 @@ public class GroupMetadataMessageFormatterTest {
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 4, GROUP_METADATA_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 4, GROUP_METADATA_VALUE).array(),
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":4," + "{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":4," +
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," + "\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
"\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," + "\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," +
"\"groupInstanceId\":\"group-instance-1\",\"clientId\":\"client-1\",\"clientHost\":\"host-1\"," + "\"groupInstanceId\":\"group-instance-1\",\"clientId\":\"client-1\",\"clientHost\":\"host-1\"," +
@ -126,16 +126,12 @@ public class GroupMetadataMessageFormatterTest {
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
null, null,
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":null}"), "{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":null}"),
Arguments.of( Arguments.of(
null, null,
MessageUtil.toVersionPrefixedByteBuffer((short) 4, GROUP_METADATA_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 4, GROUP_METADATA_VALUE).array(),
"{\"key\":null,\"value\":{\"version\":4,\"data\":{\"protocolType\":\"consumer\",\"generation\":1," + ""),
"\"protocol\":\"range\",\"leader\":\"leader\",\"currentStateTimestamp\":1234," + Arguments.of(null, null, ""),
"\"members\":[{\"memberId\":\"member-1\",\"groupInstanceId\":\"group-instance-1\"," +
"\"clientId\":\"client-1\",\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000," +
"\"sessionTimeout\":1500,\"subscription\":\"AAE=\",\"assignment\":\"AQI=\"}]}}}"),
Arguments.of(null, null, "{\"key\":null,\"value\":null}"),
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 4, OFFSET_COMMIT_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 4, OFFSET_COMMIT_VALUE).array(),

View File

@ -65,67 +65,64 @@ public class OffsetMessageFormatterTest {
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 10, OFFSET_COMMIT_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 10, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 10, OFFSET_COMMIT_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 10, OFFSET_COMMIT_VALUE).array(),
"{\"key\":{\"version\":10,\"data\":\"unknown\"},\"value\":{\"version\":10,\"data\":\"unknown\"}}" ""
), ),
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_VALUE).array(),
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," +
"\"value\":{\"version\":0,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," + "\"value\":{\"version\":0,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," +
"\"commitTimestamp\":1234}}}" "\"commitTimestamp\":1234}}}"
), ),
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_VALUE).array(),
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," +
"\"value\":{\"version\":1,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," + "\"value\":{\"version\":1,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," +
"\"commitTimestamp\":1234,\"expireTimestamp\":-1}}}" "\"commitTimestamp\":1234,\"expireTimestamp\":-1}}}"
), ),
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 2, OFFSET_COMMIT_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 2, OFFSET_COMMIT_VALUE).array(),
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," +
"\"value\":{\"version\":2,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," + "\"value\":{\"version\":2,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," +
"\"commitTimestamp\":1234}}}" "\"commitTimestamp\":1234}}}"
), ),
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 3, OFFSET_COMMIT_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 3, OFFSET_COMMIT_VALUE).array(),
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," +
"\"value\":{\"version\":3,\"data\":{\"offset\":100,\"leaderEpoch\":10," + "\"value\":{\"version\":3,\"data\":{\"offset\":100,\"leaderEpoch\":10," +
"\"metadata\":\"metadata\",\"commitTimestamp\":1234}}}" "\"metadata\":\"metadata\",\"commitTimestamp\":1234}}}"
), ),
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 4, OFFSET_COMMIT_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 4, OFFSET_COMMIT_VALUE).array(),
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," +
"\"value\":{\"version\":4,\"data\":{\"offset\":100,\"leaderEpoch\":10," + "\"value\":{\"version\":4,\"data\":{\"offset\":100,\"leaderEpoch\":10," +
"\"metadata\":\"metadata\",\"commitTimestamp\":1234}}}" "\"metadata\":\"metadata\",\"commitTimestamp\":1234}}}"
), ),
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 5, OFFSET_COMMIT_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 5, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 4, OFFSET_COMMIT_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 4, OFFSET_COMMIT_VALUE).array(),
"{\"key\":{\"version\":5,\"data\":\"unknown\"},\"value\":{\"version\":4," + ""
"\"data\":{\"offset\":100,\"leaderEpoch\":10,\"metadata\":\"metadata\"," +
"\"commitTimestamp\":1234}}}"
), ),
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 5, OFFSET_COMMIT_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 5, OFFSET_COMMIT_VALUE).array(),
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," +
"\"value\":{\"version\":5,\"data\":\"unknown\"}}" "\"value\":{\"version\":5,\"data\":\"unknown\"}}"
), ),
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
null, null,
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," +
"\"value\":null}"), "\"value\":null}"),
Arguments.of( Arguments.of(
null, null,
MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_VALUE).array(),
"{\"key\":null,\"value\":{\"version\":1,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," + ""),
"\"commitTimestamp\":1234,\"expireTimestamp\":-1}}}"), Arguments.of(null, null, ""),
Arguments.of(null, null, "{\"key\":null,\"value\":null}"),
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_VALUE).array(),

View File

@ -56,13 +56,12 @@ public class TransactionLogMessageFormatterTest {
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 10, TXN_LOG_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 10, TXN_LOG_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 10, TXN_LOG_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 10, TXN_LOG_VALUE).array(),
"{\"key\":{\"version\":10,\"data\":\"unknown\"}," + ""
"\"value\":{\"version\":10,\"data\":\"unknown\"}}"
), ),
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, TXN_LOG_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 0, TXN_LOG_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(),
"{\"key\":{\"version\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," + "{\"key\":{\"type\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," +
"\"value\":{\"version\":1,\"data\":{\"producerId\":100,\"producerEpoch\":50," + "\"value\":{\"version\":1,\"data\":{\"producerId\":100,\"producerEpoch\":50," +
"\"transactionTimeoutMs\":500,\"transactionStatus\":4,\"transactionPartitions\":[]," + "\"transactionTimeoutMs\":500,\"transactionStatus\":4,\"transactionPartitions\":[]," +
"\"transactionLastUpdateTimestampMs\":1000,\"transactionStartTimestampMs\":750}}}" "\"transactionLastUpdateTimestampMs\":1000,\"transactionStartTimestampMs\":750}}}"
@ -70,29 +69,23 @@ public class TransactionLogMessageFormatterTest {
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, TXN_LOG_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 0, TXN_LOG_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 5, TXN_LOG_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 5, TXN_LOG_VALUE).array(),
"{\"key\":{\"version\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," + "{\"key\":{\"type\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," +
"\"value\":{\"version\":5,\"data\":\"unknown\"}}" "\"value\":{\"version\":5,\"data\":\"unknown\"}}"
), ),
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(),
"{\"key\":{\"version\":1,\"data\":\"unknown\"}," + ""),
"\"value\":{\"version\":1,\"data\":{\"producerId\":100,\"producerEpoch\":50," +
"\"transactionTimeoutMs\":500,\"transactionStatus\":4,\"transactionPartitions\":[]," +
"\"transactionLastUpdateTimestampMs\":1000,\"transactionStartTimestampMs\":750}}}"),
Arguments.of( Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, TXN_LOG_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 0, TXN_LOG_KEY).array(),
null, null,
"{\"key\":{\"version\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," + "{\"key\":{\"type\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," +
"\"value\":null}"), "\"value\":null}"),
Arguments.of( Arguments.of(
null, null,
MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(),
"{\"key\":null," + ""),
"\"value\":{\"version\":1,\"data\":{\"producerId\":100,\"producerEpoch\":50," + Arguments.of(null, null, "")
"\"transactionTimeoutMs\":500,\"transactionStatus\":4,\"transactionPartitions\":[]," +
"\"transactionLastUpdateTimestampMs\":1000,\"transactionStartTimestampMs\":750}}}"),
Arguments.of(null, null, "{\"key\":null,\"value\":null}")
); );
} }