KAFKA-18207: Serde for handling transaction records (#18136)

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Mickael Maison 2024-12-19 21:39:09 +01:00 committed by GitHub
parent b3b40bb77b
commit 3fafa096b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 472 additions and 73 deletions

View File

@ -1671,6 +1671,7 @@ project(':transaction-coordinator') {
implementation libs.jacksonDatabind
implementation project(':clients')
implementation project(':server-common')
implementation project(':coordinator-common')
implementation libs.slf4jApi
testImplementation libs.junitJupiter
@ -2284,7 +2285,6 @@ project(':storage') {
implementation project(':storage:storage-api')
implementation project(':server-common')
implementation project(':clients')
implementation project(':transaction-coordinator')
implementation(libs.caffeine) {
exclude group: 'org.checkerframework', module: 'checker-qual'
}

View File

@ -34,6 +34,7 @@
<subpackage name="transaction">
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.coordinator.common.runtime" />
<allow pkg="org.apache.kafka.coordinator.transaction" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.slf4j" />

View File

@ -19,7 +19,7 @@ package kafka.coordinator.transaction
import java.nio.ByteBuffer
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil}
import org.apache.kafka.common.record.{Record, RecordBatch}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogValue}
import org.apache.kafka.server.common.TransactionVersion
@ -143,33 +143,6 @@ object TransactionLog {
} else throw new IllegalStateException(s"Unknown version $version from the transaction log message value")
}
}
/**
* Exposed for printing records using [[kafka.tools.DumpLogSegments]]
*/
def formatRecordKeyAndValue(record: Record): (Option[String], Option[String]) = {
TransactionLog.readTxnRecordKey(record.key) match {
case txnKey: TxnKey =>
val keyString = s"transaction_metadata::transactionalId=${txnKey.transactionalId}"
val valueString = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value) match {
case None => "<DELETE>"
case Some(txnMetadata) => s"producerId:${txnMetadata.producerId}," +
s"producerEpoch:${txnMetadata.producerEpoch}," +
s"state=${txnMetadata.state}," +
s"partitions=${txnMetadata.topicPartitions.mkString("[", ",", "]")}," +
s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," +
s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}"
}
(Some(keyString), Some(valueString))
case unknownKey: UnknownKey =>
(Some(s"unknown::version=${unknownKey.version}"), None)
}
}
}
sealed trait BaseKey{

View File

@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.JsonNode
import java.io._
import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, ObjectNode, TextNode}
import kafka.coordinator.transaction.TransactionLog
import kafka.log._
import kafka.utils.CoreUtils
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
@ -42,6 +41,8 @@ import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownReco
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, ShareSnapshotKeyJsonConverter, ShareSnapshotValue, ShareSnapshotValueJsonConverter, ShareUpdateKey, ShareUpdateKeyJsonConverter, ShareUpdateValue, ShareUpdateValueJsonConverter}
import org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogKeyJsonConverter, TransactionLogValue, TransactionLogValueJsonConverter}
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
import org.apache.kafka.snapshot.Snapshots
@ -588,9 +589,61 @@ object DumpLogSegments {
}
}
private class TransactionLogMessageParser extends MessageParser[String, String] {
// Package private for testing.
class TransactionLogMessageParser extends MessageParser[String, String] {
private val serde = new TransactionCoordinatorRecordSerde()
private def prepareKey(message: Message, version: Short): String = {
val messageAsJson = message match {
case m: TransactionLogKey =>
TransactionLogKeyJsonConverter.write(m, version)
case _ => throw new UnknownRecordTypeException(version)
}
val json = new ObjectNode(JsonNodeFactory.instance)
json.set("type", new TextNode(version.toString))
json.set("data", messageAsJson)
json.toString
}
private def prepareValue(message: Message, version: Short): String = {
val messageAsJson = message match {
case m: TransactionLogValue =>
TransactionLogValueJsonConverter.write(m, version)
case _ => throw new UnknownRecordTypeException(version)
}
val json = new ObjectNode(JsonNodeFactory.instance)
json.set("type", new TextNode(version.toString))
json.set("data", messageAsJson)
json.toString
}
override def parse(record: Record): (Option[String], Option[String]) = {
TransactionLog.formatRecordKeyAndValue(record)
if (!record.hasKey)
throw new RuntimeException(s"Failed to decode message at offset ${record.offset} using offset " +
"transaction-log decoder (message had a missing key)")
try {
val r = serde.deserialize(record.key, record.value)
(
Some(prepareKey(r.key.message, r.key.version)),
Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("<DELETE>"))
)
} catch {
case e: UnknownRecordTypeException =>
(
Some(s"Unknown record type ${e.unknownType} at offset ${record.offset}, skipping."),
None
)
case e: Throwable =>
e.printStackTrace()
(
Some(s"Error at offset ${record.offset}, skipping. ${e.getMessage}"),
None
)
}
}
}

View File

@ -17,7 +17,6 @@
package kafka.coordinator.transaction
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil}
@ -110,40 +109,6 @@ class TransactionLogTest {
assertEquals(pidMappings.size, count)
}
@Test
def testTransactionMetadataParsing(): Unit = {
val transactionalId = "id"
val producerId = 1334L
val topicPartition = new TopicPartition("topic", 0)
val txnMetadata = new TransactionMetadata(transactionalId, producerId, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_ID, producerEpoch,
RecordBatch.NO_PRODUCER_EPOCH, transactionTimeoutMs, Ongoing, collection.mutable.Set.empty[TopicPartition], 0, 0, TV_0)
txnMetadata.addPartitions(Set(topicPartition))
val keyBytes = TransactionLog.keyToBytes(transactionalId)
val valueBytes = TransactionLog.valueToBytes(txnMetadata.prepareNoTransit(), TV_2)
val transactionMetadataRecord = TestUtils.records(Seq(
new SimpleRecord(keyBytes, valueBytes)
)).records.asScala.head
val (keyStringOpt, valueStringOpt) = TransactionLog.formatRecordKeyAndValue(transactionMetadataRecord)
assertEquals(Some(s"transaction_metadata::transactionalId=$transactionalId"), keyStringOpt)
assertEquals(Some(s"producerId:$producerId,producerEpoch:$producerEpoch,state=Ongoing," +
s"partitions=[$topicPartition],txnLastUpdateTimestamp=0,txnTimeoutMs=$transactionTimeoutMs"), valueStringOpt)
}
@Test
def testTransactionMetadataTombstoneParsing(): Unit = {
val transactionalId = "id"
val transactionMetadataRecord = TestUtils.records(Seq(
new SimpleRecord(TransactionLog.keyToBytes(transactionalId), null)
)).records.asScala.head
val (keyStringOpt, valueStringOpt) = TransactionLog.formatRecordKeyAndValue(transactionMetadataRecord)
assertEquals(Some(s"transaction_metadata::transactionalId=$transactionalId"), keyStringOpt)
assertEquals(Some("<DELETE>"), valueStringOpt)
}
@Test
def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500, TV_0)

View File

@ -27,7 +27,7 @@ import java.util.stream.IntStream
import kafka.log.{LogTestUtils, UnifiedLog}
import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
import kafka.server.KafkaRaftServer
import kafka.tools.DumpLogSegments.{OffsetsMessageParser, ShareGroupStateMessageParser, TimeIndexDumpErrors}
import kafka.tools.DumpLogSegments.{OffsetsMessageParser, ShareGroupStateMessageParser, TimeIndexDumpErrors, TransactionLogMessageParser}
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.{Assignment, Subscription}
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
@ -43,7 +43,8 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
import org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue, ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey, GroupMetadataValue}
import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, ShareSnapshotValue, ShareUpdateKey, ShareUpdateValue}
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRecordSerde}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogValue}
import org.apache.kafka.coordinator.transaction.{TransactionCoordinatorRecordSerde, TransactionLogConfig}
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch, VoterSetTest}
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion}
@ -831,6 +832,126 @@ class DumpLogSegmentsTest {
)
}
@Test
def testTransactionLogMessageParser(): Unit = {
val serde = new TransactionCoordinatorRecordSerde()
val parser = new TransactionLogMessageParser()
def serializedRecord(key: ApiMessageAndVersion, value: ApiMessageAndVersion): Record = {
val record = new CoordinatorRecord(key, value)
TestUtils.singletonRecords(
key = serde.serializeKey(record),
value = serde.serializeValue(record)
).records.iterator.next
}
// The key is mandatory.
assertEquals(
"Failed to decode message at offset 0 using offset transaction-log decoder (message had a missing key)",
assertThrows(
classOf[RuntimeException],
() => parser.parse(TestUtils.singletonRecords(key = null, value = null).records.iterator.next)
).getMessage
)
// A valid key and value should work.
assertEquals(
(
Some("{\"type\":\"0\",\"data\":{\"transactionalId\":\"txnId\"}}"),
Some("{\"type\":\"0\",\"data\":{\"producerId\":123,\"producerEpoch\":0,\"transactionTimeoutMs\":0," +
"\"transactionStatus\":0,\"transactionPartitions\":[],\"transactionLastUpdateTimestampMs\":0," +
"\"transactionStartTimestampMs\":0}}")
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(
new TransactionLogKey()
.setTransactionalId("txnId"),
0.toShort
),
new ApiMessageAndVersion(
new TransactionLogValue()
.setProducerId(123L),
0.toShort
)
))
)
// A valid key with a tombstone should work.
assertEquals(
(
Some("{\"type\":\"0\",\"data\":{\"transactionalId\":\"txnId\"}}"),
Some("<DELETE>")
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(
new TransactionLogKey()
.setTransactionalId("txnId"),
0.toShort
),
null
))
)
// An unknown record type should be handled and reported as such.
assertEquals(
(
Some("Unknown record type 32767 at offset 0, skipping."),
None
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(
new TransactionLogKey()
.setTransactionalId("txnId"),
Short.MaxValue // Invalid record id.
),
new ApiMessageAndVersion(
new TransactionLogValue(),
0.toShort
)
))
)
// A valid key and value with all fields set should work.
assertEquals(
(
Some("{\"type\":\"0\",\"data\":{\"transactionalId\":\"txnId\"}}"),
Some("{\"type\":\"1\",\"data\":{\"producerId\":12,\"previousProducerId\":11,\"nextProducerId\":10," +
"\"producerEpoch\":2,\"transactionTimeoutMs\":14,\"transactionStatus\":0," +
"\"transactionPartitions\":[{\"topic\":\"topic1\",\"partitionIds\":[0,1,2]}," +
"{\"topic\":\"topic2\",\"partitionIds\":[3,4,5]}],\"transactionLastUpdateTimestampMs\":123," +
"\"transactionStartTimestampMs\":13}}")
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(
new TransactionLogKey()
.setTransactionalId("txnId"),
0.toShort
),
new ApiMessageAndVersion(
new TransactionLogValue()
.setClientTransactionVersion(0.toShort)
.setNextProducerId(10L)
.setPreviousProducerId(11L)
.setProducerEpoch(2.toShort)
.setProducerId(12L)
.setTransactionLastUpdateTimestampMs(123L)
.setTransactionPartitions(List(
new TransactionLogValue.PartitionsSchema()
.setTopic("topic1")
.setPartitionIds(List(0, 1, 2).map(Integer.valueOf).asJava),
new TransactionLogValue.PartitionsSchema()
.setTopic("topic2")
.setPartitionIds(List(3, 4, 5).map(Integer.valueOf).asJava)
).asJava)
.setTransactionStartTimestampMs(13L)
.setTransactionStatus(0)
.setTransactionTimeoutMs(14),
1.toShort
)
))
)
}
private def readBatchMetadata(lines: util.ListIterator[String]): Option[String] = {
while (lines.hasNext) {
val line = lines.next()

View File

@ -32,7 +32,6 @@ import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.server.util.MockScheduler;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
@ -785,7 +784,7 @@ public class LogSegmentTest {
topicPartition,
logDir,
(int) (Duration.ofMinutes(5).toMillis()),
new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false),
new ProducerStateManagerConfig(86400000, false),
new MockTime()
);
}

View File

@ -59,7 +59,6 @@ import static java.util.Arrays.asList;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT;
import static org.apache.kafka.storage.internals.log.ProducerStateManager.LATE_TRANSACTION_BUFFER_MS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -89,7 +88,7 @@ public class ProducerStateManagerTest {
public ProducerStateManagerTest() throws IOException {
logDir = TestUtils.tempDirectory();
partition = new TopicPartition("test", 0);
producerStateManagerConfig = new ProducerStateManagerConfig(PRODUCER_ID_EXPIRATION_MS_DEFAULT, true);
producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true);
time = new MockTime();
stateManager = new ProducerStateManager(partition, logDir, maxTransactionTimeoutMs,
producerStateManagerConfig, time);

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.transaction;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
public class TransactionCoordinatorRecordSerde extends CoordinatorRecordSerde {
@Override
protected ApiMessage apiMessageKeyFor(short recordVersion) {
switch (recordVersion) {
case 0:
return new TransactionLogKey();
default:
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
}
}
@Override
protected ApiMessage apiMessageValueFor(short recordVersion) {
switch (recordVersion) {
case 0:
case 1:
return new TransactionLogValue();
default:
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
}
}
}

View File

@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.transaction;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TransactionCoordinatorRecordSerdeTest {
@Test
public void testSerializeKey() {
TransactionCoordinatorRecordSerde serializer = new TransactionCoordinatorRecordSerde();
CoordinatorRecord record = new CoordinatorRecord(
new ApiMessageAndVersion(
new TransactionLogKey().setTransactionalId("txnId"),
(short) 0
),
new ApiMessageAndVersion(
new TransactionLogValue(),
(short) 0
)
);
assertArrayEquals(
MessageUtil.toVersionPrefixedBytes(record.key().version(), record.key().message()),
serializer.serializeKey(record)
);
}
@Test
public void testSerializeValue() {
TransactionCoordinatorRecordSerde serializer = new TransactionCoordinatorRecordSerde();
CoordinatorRecord record = new CoordinatorRecord(
new ApiMessageAndVersion(
new TransactionLogKey().setTransactionalId("txnId"),
(short) 0
),
new ApiMessageAndVersion(
new TransactionLogValue(),
(short) 0
)
);
assertArrayEquals(
MessageUtil.toVersionPrefixedBytes(record.value().version(), record.value().message()),
serializer.serializeValue(record)
);
}
@Test
public void testSerializeNullValue() {
TransactionCoordinatorRecordSerde serializer = new TransactionCoordinatorRecordSerde();
CoordinatorRecord record = new CoordinatorRecord(
new ApiMessageAndVersion(
new TransactionLogKey().setTransactionalId("txnId"),
(short) 0
),
null
);
assertNull(serializer.serializeValue(record));
}
@Test
public void testDeserialize() {
TransactionCoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde();
ApiMessageAndVersion key = new ApiMessageAndVersion(
new TransactionLogKey().setTransactionalId("txnId"),
(short) 0
);
ByteBuffer keyBuffer = MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
ApiMessageAndVersion value = new ApiMessageAndVersion(
new TransactionLogValue(),
(short) 0
);
ByteBuffer valueBuffer = MessageUtil.toVersionPrefixedByteBuffer(value.version(), value.message());
CoordinatorRecord record = serde.deserialize(keyBuffer, valueBuffer);
assertEquals(key, record.key());
assertEquals(value, record.value());
}
@Test
public void testDeserializeWithTombstoneForValue() {
TransactionCoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde();
ApiMessageAndVersion key = new ApiMessageAndVersion(
new TransactionLogKey().setTransactionalId("txnId"),
(short) 0
);
ByteBuffer keyBuffer = MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
CoordinatorRecord record = serde.deserialize(keyBuffer, null);
assertEquals(key, record.key());
assertNull(record.value());
}
@Test
public void testDeserializeWithInvalidRecordType() {
TransactionCoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde();
ByteBuffer keyBuffer = ByteBuffer.allocate(64);
keyBuffer.putShort((short) 255);
keyBuffer.rewind();
ByteBuffer valueBuffer = ByteBuffer.allocate(64);
CoordinatorLoader.UnknownRecordTypeException ex =
assertThrows(CoordinatorLoader.UnknownRecordTypeException.class,
() -> serde.deserialize(keyBuffer, valueBuffer));
assertEquals((short) 255, ex.unknownType());
}
@Test
public void testDeserializeWithKeyEmptyBuffer() {
TransactionCoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde();
ByteBuffer keyBuffer = ByteBuffer.allocate(0);
ByteBuffer valueBuffer = ByteBuffer.allocate(64);
RuntimeException ex =
assertThrows(RuntimeException.class,
() -> serde.deserialize(keyBuffer, valueBuffer));
assertEquals("Could not read version from key's buffer.", ex.getMessage());
}
@Test
public void testDeserializeWithValueEmptyBuffer() {
TransactionCoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde();
ApiMessageAndVersion key = new ApiMessageAndVersion(
new TransactionLogKey().setTransactionalId("txnId"),
(short) 0
);
ByteBuffer keyBuffer = MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
ByteBuffer valueBuffer = ByteBuffer.allocate(0);
RuntimeException ex =
assertThrows(RuntimeException.class,
() -> serde.deserialize(keyBuffer, valueBuffer));
assertEquals("Could not read version from value's buffer.", ex.getMessage());
}
@Test
public void testDeserializeWithInvalidKeyBytes() {
TransactionCoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde();
ByteBuffer keyBuffer = ByteBuffer.allocate(2);
keyBuffer.putShort((short) 0);
keyBuffer.rewind();
ByteBuffer valueBuffer = ByteBuffer.allocate(2);
valueBuffer.putShort((short) 0);
valueBuffer.rewind();
RuntimeException ex =
assertThrows(RuntimeException.class,
() -> serde.deserialize(keyBuffer, valueBuffer));
assertTrue(ex.getMessage().startsWith("Could not read record with version 0 from key's buffer due to"),
ex.getMessage());
}
@Test
public void testDeserializeWithInvalidValueBytes() {
TransactionCoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde();
ApiMessageAndVersion key = new ApiMessageAndVersion(
new TransactionLogKey().setTransactionalId("txnId"),
(short) 0
);
ByteBuffer keyBuffer = MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
ByteBuffer valueBuffer = ByteBuffer.allocate(2);
valueBuffer.putShort((short) 0);
valueBuffer.rewind();
RuntimeException ex =
assertThrows(RuntimeException.class,
() -> serde.deserialize(keyBuffer, valueBuffer));
assertTrue(ex.getMessage().startsWith("Could not read record with version 0 from value's buffer due to"),
ex.getMessage());
}
@Test
public void testDeserializeAllRecordTypes() {
roundTrip((short) 0, new TransactionLogKey().setTransactionalId("id"), new TransactionLogValue());
}
private void roundTrip(
short recordType,
ApiMessage key,
ApiMessage val
) {
TransactionCoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde();
for (short version = val.lowestSupportedVersion(); version < val.highestSupportedVersion(); version++) {
ApiMessageAndVersion keyMessageAndVersion = new ApiMessageAndVersion(key, recordType);
ApiMessageAndVersion valMessageAndVersion = new ApiMessageAndVersion(val, version);
CoordinatorRecord record = serde.deserialize(
MessageUtil.toVersionPrefixedByteBuffer(recordType, key),
MessageUtil.toVersionPrefixedByteBuffer(version, val)
);
assertEquals(keyMessageAndVersion, record.key());
assertEquals(valMessageAndVersion, record.value());
}
}
}