From 3fafa096b113ee3bc9c9472e6aa5dfd2e46b7352 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Thu, 19 Dec 2024 21:39:09 +0100 Subject: [PATCH] KAFKA-18207: Serde for handling transaction records (#18136) Reviewers: Andrew Schofield --- build.gradle | 2 +- ...import-control-transaction-coordinator.xml | 1 + .../transaction/TransactionLog.scala | 29 +-- .../scala/kafka/tools/DumpLogSegments.scala | 59 ++++- .../transaction/TransactionLogTest.scala | 35 --- .../kafka/tools/DumpLogSegmentsTest.scala | 125 ++++++++- .../storage/internals/log/LogSegmentTest.java | 3 +- .../log/ProducerStateManagerTest.java | 3 +- .../TransactionCoordinatorRecordSerde.java | 47 ++++ ...TransactionCoordinatorRecordSerdeTest.java | 241 ++++++++++++++++++ 10 files changed, 472 insertions(+), 73 deletions(-) create mode 100644 transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerde.java create mode 100644 transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerdeTest.java diff --git a/build.gradle b/build.gradle index 6d47cd22ed6..7f28a508db0 100644 --- a/build.gradle +++ b/build.gradle @@ -1671,6 +1671,7 @@ project(':transaction-coordinator') { implementation libs.jacksonDatabind implementation project(':clients') implementation project(':server-common') + implementation project(':coordinator-common') implementation libs.slf4jApi testImplementation libs.junitJupiter @@ -2284,7 +2285,6 @@ project(':storage') { implementation project(':storage:storage-api') implementation project(':server-common') implementation project(':clients') - implementation project(':transaction-coordinator') implementation(libs.caffeine) { exclude group: 'org.checkerframework', module: 'checker-qual' } diff --git a/checkstyle/import-control-transaction-coordinator.xml b/checkstyle/import-control-transaction-coordinator.xml index 4a61407cfa9..f4045907257 100644 --- a/checkstyle/import-control-transaction-coordinator.xml +++ b/checkstyle/import-control-transaction-coordinator.xml @@ -34,6 +34,7 @@ + diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala index 2d8a8c5a840..15d7bcf7fab 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala @@ -19,7 +19,7 @@ package kafka.coordinator.transaction import java.nio.ByteBuffer import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil} -import org.apache.kafka.common.record.{Record, RecordBatch} +import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.TopicPartition import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogValue} import org.apache.kafka.server.common.TransactionVersion @@ -143,33 +143,6 @@ object TransactionLog { } else throw new IllegalStateException(s"Unknown version $version from the transaction log message value") } } - - /** - * Exposed for printing records using [[kafka.tools.DumpLogSegments]] - */ - def formatRecordKeyAndValue(record: Record): (Option[String], Option[String]) = { - TransactionLog.readTxnRecordKey(record.key) match { - case txnKey: TxnKey => - val keyString = s"transaction_metadata::transactionalId=${txnKey.transactionalId}" - - val valueString = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value) match { - case None => "" - - case Some(txnMetadata) => s"producerId:${txnMetadata.producerId}," + - s"producerEpoch:${txnMetadata.producerEpoch}," + - s"state=${txnMetadata.state}," + - s"partitions=${txnMetadata.topicPartitions.mkString("[", ",", "]")}," + - s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," + - s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}" - } - - (Some(keyString), Some(valueString)) - - case unknownKey: UnknownKey => - (Some(s"unknown::version=${unknownKey.version}"), None) - } - } - } sealed trait BaseKey{ diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index da5565b87d4..f41e14c1360 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.JsonNode import java.io._ import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, ObjectNode, TextNode} -import kafka.coordinator.transaction.TransactionLog import kafka.log._ import kafka.utils.CoreUtils import org.apache.kafka.clients.consumer.internals.ConsumerProtocol @@ -42,6 +41,8 @@ import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownReco import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, ShareSnapshotKeyJsonConverter, ShareSnapshotValue, ShareSnapshotValueJsonConverter, ShareUpdateKey, ShareUpdateKeyJsonConverter, ShareUpdateValue, ShareUpdateValueJsonConverter} +import org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde +import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogKeyJsonConverter, TransactionLogValue, TransactionLogValueJsonConverter} import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.metadata.bootstrap.BootstrapDirectory import org.apache.kafka.snapshot.Snapshots @@ -588,9 +589,61 @@ object DumpLogSegments { } } - private class TransactionLogMessageParser extends MessageParser[String, String] { + // Package private for testing. + class TransactionLogMessageParser extends MessageParser[String, String] { + private val serde = new TransactionCoordinatorRecordSerde() + + private def prepareKey(message: Message, version: Short): String = { + val messageAsJson = message match { + case m: TransactionLogKey => + TransactionLogKeyJsonConverter.write(m, version) + case _ => throw new UnknownRecordTypeException(version) + } + + val json = new ObjectNode(JsonNodeFactory.instance) + json.set("type", new TextNode(version.toString)) + json.set("data", messageAsJson) + json.toString + } + + private def prepareValue(message: Message, version: Short): String = { + val messageAsJson = message match { + case m: TransactionLogValue => + TransactionLogValueJsonConverter.write(m, version) + case _ => throw new UnknownRecordTypeException(version) + } + + val json = new ObjectNode(JsonNodeFactory.instance) + json.set("type", new TextNode(version.toString)) + json.set("data", messageAsJson) + json.toString + } + override def parse(record: Record): (Option[String], Option[String]) = { - TransactionLog.formatRecordKeyAndValue(record) + if (!record.hasKey) + throw new RuntimeException(s"Failed to decode message at offset ${record.offset} using offset " + + "transaction-log decoder (message had a missing key)") + + try { + val r = serde.deserialize(record.key, record.value) + ( + Some(prepareKey(r.key.message, r.key.version)), + Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("")) + ) + } catch { + case e: UnknownRecordTypeException => + ( + Some(s"Unknown record type ${e.unknownType} at offset ${record.offset}, skipping."), + None + ) + + case e: Throwable => + e.printStackTrace() + ( + Some(s"Error at offset ${record.offset}, skipping. ${e.getMessage}"), + None + ) + } } } diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala index fd5f1e37a65..87601feb5a6 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala @@ -17,7 +17,6 @@ package kafka.coordinator.transaction -import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil} @@ -110,40 +109,6 @@ class TransactionLogTest { assertEquals(pidMappings.size, count) } - @Test - def testTransactionMetadataParsing(): Unit = { - val transactionalId = "id" - val producerId = 1334L - val topicPartition = new TopicPartition("topic", 0) - - val txnMetadata = new TransactionMetadata(transactionalId, producerId, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_ID, producerEpoch, - RecordBatch.NO_PRODUCER_EPOCH, transactionTimeoutMs, Ongoing, collection.mutable.Set.empty[TopicPartition], 0, 0, TV_0) - txnMetadata.addPartitions(Set(topicPartition)) - - val keyBytes = TransactionLog.keyToBytes(transactionalId) - val valueBytes = TransactionLog.valueToBytes(txnMetadata.prepareNoTransit(), TV_2) - val transactionMetadataRecord = TestUtils.records(Seq( - new SimpleRecord(keyBytes, valueBytes) - )).records.asScala.head - - val (keyStringOpt, valueStringOpt) = TransactionLog.formatRecordKeyAndValue(transactionMetadataRecord) - assertEquals(Some(s"transaction_metadata::transactionalId=$transactionalId"), keyStringOpt) - assertEquals(Some(s"producerId:$producerId,producerEpoch:$producerEpoch,state=Ongoing," + - s"partitions=[$topicPartition],txnLastUpdateTimestamp=0,txnTimeoutMs=$transactionTimeoutMs"), valueStringOpt) - } - - @Test - def testTransactionMetadataTombstoneParsing(): Unit = { - val transactionalId = "id" - val transactionMetadataRecord = TestUtils.records(Seq( - new SimpleRecord(TransactionLog.keyToBytes(transactionalId), null) - )).records.asScala.head - - val (keyStringOpt, valueStringOpt) = TransactionLog.formatRecordKeyAndValue(transactionMetadataRecord) - assertEquals(Some(s"transaction_metadata::transactionalId=$transactionalId"), keyStringOpt) - assertEquals(Some(""), valueStringOpt) - } - @Test def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = { val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500, TV_0) diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index 9bbfa7242c3..8aaeb892466 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -27,7 +27,7 @@ import java.util.stream.IntStream import kafka.log.{LogTestUtils, UnifiedLog} import kafka.raft.{KafkaMetadataLog, MetadataLogConfig} import kafka.server.KafkaRaftServer -import kafka.tools.DumpLogSegments.{OffsetsMessageParser, ShareGroupStateMessageParser, TimeIndexDumpErrors} +import kafka.tools.DumpLogSegments.{OffsetsMessageParser, ShareGroupStateMessageParser, TimeIndexDumpErrors, TransactionLogMessageParser} import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.{Assignment, Subscription} import org.apache.kafka.clients.consumer.internals.ConsumerProtocol @@ -43,7 +43,8 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde import org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue, ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey, GroupMetadataValue} import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, ShareSnapshotValue, ShareUpdateKey, ShareUpdateValue} import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRecordSerde} -import org.apache.kafka.coordinator.transaction.TransactionLogConfig +import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogValue} +import org.apache.kafka.coordinator.transaction.{TransactionCoordinatorRecordSerde, TransactionLogConfig} import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch, VoterSetTest} import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion} @@ -831,6 +832,126 @@ class DumpLogSegmentsTest { ) } + @Test + def testTransactionLogMessageParser(): Unit = { + val serde = new TransactionCoordinatorRecordSerde() + val parser = new TransactionLogMessageParser() + + def serializedRecord(key: ApiMessageAndVersion, value: ApiMessageAndVersion): Record = { + val record = new CoordinatorRecord(key, value) + TestUtils.singletonRecords( + key = serde.serializeKey(record), + value = serde.serializeValue(record) + ).records.iterator.next + } + + // The key is mandatory. + assertEquals( + "Failed to decode message at offset 0 using offset transaction-log decoder (message had a missing key)", + assertThrows( + classOf[RuntimeException], + () => parser.parse(TestUtils.singletonRecords(key = null, value = null).records.iterator.next) + ).getMessage + ) + + // A valid key and value should work. + assertEquals( + ( + Some("{\"type\":\"0\",\"data\":{\"transactionalId\":\"txnId\"}}"), + Some("{\"type\":\"0\",\"data\":{\"producerId\":123,\"producerEpoch\":0,\"transactionTimeoutMs\":0," + + "\"transactionStatus\":0,\"transactionPartitions\":[],\"transactionLastUpdateTimestampMs\":0," + + "\"transactionStartTimestampMs\":0}}") + ), + parser.parse(serializedRecord( + new ApiMessageAndVersion( + new TransactionLogKey() + .setTransactionalId("txnId"), + 0.toShort + ), + new ApiMessageAndVersion( + new TransactionLogValue() + .setProducerId(123L), + 0.toShort + ) + )) + ) + + // A valid key with a tombstone should work. + assertEquals( + ( + Some("{\"type\":\"0\",\"data\":{\"transactionalId\":\"txnId\"}}"), + Some("") + ), + parser.parse(serializedRecord( + new ApiMessageAndVersion( + new TransactionLogKey() + .setTransactionalId("txnId"), + 0.toShort + ), + null + )) + ) + + // An unknown record type should be handled and reported as such. + assertEquals( + ( + Some("Unknown record type 32767 at offset 0, skipping."), + None + ), + parser.parse(serializedRecord( + new ApiMessageAndVersion( + new TransactionLogKey() + .setTransactionalId("txnId"), + Short.MaxValue // Invalid record id. + ), + new ApiMessageAndVersion( + new TransactionLogValue(), + 0.toShort + ) + )) + ) + + // A valid key and value with all fields set should work. + assertEquals( + ( + Some("{\"type\":\"0\",\"data\":{\"transactionalId\":\"txnId\"}}"), + Some("{\"type\":\"1\",\"data\":{\"producerId\":12,\"previousProducerId\":11,\"nextProducerId\":10," + + "\"producerEpoch\":2,\"transactionTimeoutMs\":14,\"transactionStatus\":0," + + "\"transactionPartitions\":[{\"topic\":\"topic1\",\"partitionIds\":[0,1,2]}," + + "{\"topic\":\"topic2\",\"partitionIds\":[3,4,5]}],\"transactionLastUpdateTimestampMs\":123," + + "\"transactionStartTimestampMs\":13}}") + ), + parser.parse(serializedRecord( + new ApiMessageAndVersion( + new TransactionLogKey() + .setTransactionalId("txnId"), + 0.toShort + ), + new ApiMessageAndVersion( + new TransactionLogValue() + .setClientTransactionVersion(0.toShort) + .setNextProducerId(10L) + .setPreviousProducerId(11L) + .setProducerEpoch(2.toShort) + .setProducerId(12L) + .setTransactionLastUpdateTimestampMs(123L) + .setTransactionPartitions(List( + new TransactionLogValue.PartitionsSchema() + .setTopic("topic1") + .setPartitionIds(List(0, 1, 2).map(Integer.valueOf).asJava), + new TransactionLogValue.PartitionsSchema() + .setTopic("topic2") + .setPartitionIds(List(3, 4, 5).map(Integer.valueOf).asJava) + ).asJava) + .setTransactionStartTimestampMs(13L) + .setTransactionStatus(0) + .setTransactionTimeoutMs(14), + 1.toShort + ) + )) + ) + } + private def readBatchMetadata(lines: util.ListIterator[String]): Option[String] = { while (lines.hasNext) { val line = lines.next() diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java index 616671a6549..5e06c073dc5 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java @@ -32,7 +32,6 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.coordinator.transaction.TransactionLogConfig; import org.apache.kafka.server.util.MockScheduler; import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; @@ -785,7 +784,7 @@ public class LogSegmentTest { topicPartition, logDir, (int) (Duration.ofMinutes(5).toMillis()), - new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + new ProducerStateManagerConfig(86400000, false), new MockTime() ); } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java index acff9bc6f29..f3c4ab5e8fa 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java @@ -59,7 +59,6 @@ import static java.util.Arrays.asList; import static java.util.Collections.emptySet; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; -import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT; import static org.apache.kafka.storage.internals.log.ProducerStateManager.LATE_TRANSACTION_BUFFER_MS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -89,7 +88,7 @@ public class ProducerStateManagerTest { public ProducerStateManagerTest() throws IOException { logDir = TestUtils.tempDirectory(); partition = new TopicPartition("test", 0); - producerStateManagerConfig = new ProducerStateManagerConfig(PRODUCER_ID_EXPIRATION_MS_DEFAULT, true); + producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true); time = new MockTime(); stateManager = new ProducerStateManager(partition, logDir, maxTransactionTimeoutMs, producerStateManagerConfig, time); diff --git a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerde.java b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerde.java new file mode 100644 index 00000000000..f8d1c72f4ef --- /dev/null +++ b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerde.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; + +public class TransactionCoordinatorRecordSerde extends CoordinatorRecordSerde { + + @Override + protected ApiMessage apiMessageKeyFor(short recordVersion) { + switch (recordVersion) { + case 0: + return new TransactionLogKey(); + default: + throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion); + } + } + + @Override + protected ApiMessage apiMessageValueFor(short recordVersion) { + switch (recordVersion) { + case 0: + case 1: + return new TransactionLogValue(); + default: + throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion); + } + } +} diff --git a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerdeTest.java b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerdeTest.java new file mode 100644 index 00000000000..055cba0a9eb --- /dev/null +++ b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerdeTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TransactionCoordinatorRecordSerdeTest { + + @Test + public void testSerializeKey() { + TransactionCoordinatorRecordSerde serializer = new TransactionCoordinatorRecordSerde(); + CoordinatorRecord record = new CoordinatorRecord( + new ApiMessageAndVersion( + new TransactionLogKey().setTransactionalId("txnId"), + (short) 0 + ), + new ApiMessageAndVersion( + new TransactionLogValue(), + (short) 0 + ) + ); + + assertArrayEquals( + MessageUtil.toVersionPrefixedBytes(record.key().version(), record.key().message()), + serializer.serializeKey(record) + ); + } + + @Test + public void testSerializeValue() { + TransactionCoordinatorRecordSerde serializer = new TransactionCoordinatorRecordSerde(); + CoordinatorRecord record = new CoordinatorRecord( + new ApiMessageAndVersion( + new TransactionLogKey().setTransactionalId("txnId"), + (short) 0 + ), + new ApiMessageAndVersion( + new TransactionLogValue(), + (short) 0 + ) + ); + + assertArrayEquals( + MessageUtil.toVersionPrefixedBytes(record.value().version(), record.value().message()), + serializer.serializeValue(record) + ); + } + + @Test + public void testSerializeNullValue() { + TransactionCoordinatorRecordSerde serializer = new TransactionCoordinatorRecordSerde(); + CoordinatorRecord record = new CoordinatorRecord( + new ApiMessageAndVersion( + new TransactionLogKey().setTransactionalId("txnId"), + (short) 0 + ), + null + ); + + assertNull(serializer.serializeValue(record)); + } + + @Test + public void testDeserialize() { + TransactionCoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde(); + + ApiMessageAndVersion key = new ApiMessageAndVersion( + new TransactionLogKey().setTransactionalId("txnId"), + (short) 0 + ); + ByteBuffer keyBuffer = MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message()); + + ApiMessageAndVersion value = new ApiMessageAndVersion( + new TransactionLogValue(), + (short) 0 + ); + ByteBuffer valueBuffer = MessageUtil.toVersionPrefixedByteBuffer(value.version(), value.message()); + + CoordinatorRecord record = serde.deserialize(keyBuffer, valueBuffer); + assertEquals(key, record.key()); + assertEquals(value, record.value()); + } + + @Test + public void testDeserializeWithTombstoneForValue() { + TransactionCoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde(); + + ApiMessageAndVersion key = new ApiMessageAndVersion( + new TransactionLogKey().setTransactionalId("txnId"), + (short) 0 + ); + ByteBuffer keyBuffer = MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message()); + + CoordinatorRecord record = serde.deserialize(keyBuffer, null); + assertEquals(key, record.key()); + assertNull(record.value()); + } + + @Test + public void testDeserializeWithInvalidRecordType() { + TransactionCoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde(); + + ByteBuffer keyBuffer = ByteBuffer.allocate(64); + keyBuffer.putShort((short) 255); + keyBuffer.rewind(); + + ByteBuffer valueBuffer = ByteBuffer.allocate(64); + + CoordinatorLoader.UnknownRecordTypeException ex = + assertThrows(CoordinatorLoader.UnknownRecordTypeException.class, + () -> serde.deserialize(keyBuffer, valueBuffer)); + assertEquals((short) 255, ex.unknownType()); + } + + @Test + public void testDeserializeWithKeyEmptyBuffer() { + TransactionCoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde(); + + ByteBuffer keyBuffer = ByteBuffer.allocate(0); + ByteBuffer valueBuffer = ByteBuffer.allocate(64); + + RuntimeException ex = + assertThrows(RuntimeException.class, + () -> serde.deserialize(keyBuffer, valueBuffer)); + assertEquals("Could not read version from key's buffer.", ex.getMessage()); + } + + @Test + public void testDeserializeWithValueEmptyBuffer() { + TransactionCoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde(); + + ApiMessageAndVersion key = new ApiMessageAndVersion( + new TransactionLogKey().setTransactionalId("txnId"), + (short) 0 + ); + ByteBuffer keyBuffer = MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message()); + + ByteBuffer valueBuffer = ByteBuffer.allocate(0); + + RuntimeException ex = + assertThrows(RuntimeException.class, + () -> serde.deserialize(keyBuffer, valueBuffer)); + assertEquals("Could not read version from value's buffer.", ex.getMessage()); + } + + @Test + public void testDeserializeWithInvalidKeyBytes() { + TransactionCoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde(); + + ByteBuffer keyBuffer = ByteBuffer.allocate(2); + keyBuffer.putShort((short) 0); + keyBuffer.rewind(); + + ByteBuffer valueBuffer = ByteBuffer.allocate(2); + valueBuffer.putShort((short) 0); + valueBuffer.rewind(); + + RuntimeException ex = + assertThrows(RuntimeException.class, + () -> serde.deserialize(keyBuffer, valueBuffer)); + assertTrue(ex.getMessage().startsWith("Could not read record with version 0 from key's buffer due to"), + ex.getMessage()); + } + + @Test + public void testDeserializeWithInvalidValueBytes() { + TransactionCoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde(); + + ApiMessageAndVersion key = new ApiMessageAndVersion( + new TransactionLogKey().setTransactionalId("txnId"), + (short) 0 + ); + ByteBuffer keyBuffer = MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message()); + + ByteBuffer valueBuffer = ByteBuffer.allocate(2); + valueBuffer.putShort((short) 0); + valueBuffer.rewind(); + + RuntimeException ex = + assertThrows(RuntimeException.class, + () -> serde.deserialize(keyBuffer, valueBuffer)); + assertTrue(ex.getMessage().startsWith("Could not read record with version 0 from value's buffer due to"), + ex.getMessage()); + } + + @Test + public void testDeserializeAllRecordTypes() { + roundTrip((short) 0, new TransactionLogKey().setTransactionalId("id"), new TransactionLogValue()); + } + + private void roundTrip( + short recordType, + ApiMessage key, + ApiMessage val + ) { + TransactionCoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde(); + + for (short version = val.lowestSupportedVersion(); version < val.highestSupportedVersion(); version++) { + ApiMessageAndVersion keyMessageAndVersion = new ApiMessageAndVersion(key, recordType); + ApiMessageAndVersion valMessageAndVersion = new ApiMessageAndVersion(val, version); + + CoordinatorRecord record = serde.deserialize( + MessageUtil.toVersionPrefixedByteBuffer(recordType, key), + MessageUtil.toVersionPrefixedByteBuffer(version, val) + ); + + assertEquals(keyMessageAndVersion, record.key()); + assertEquals(valMessageAndVersion, record.value()); + } + } +}