KAFKA-18882 Remove BaseKey, TxnKey, and UnknownKey (#19054)

Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi <kitingiao@gmail.com>, PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Xuan-Zhang Gong 2025-03-05 21:16:18 +08:00 committed by GitHub
parent 69ff5d1e70
commit 18eca0229d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 38 additions and 53 deletions

View File

@ -90,19 +90,15 @@ object TransactionLog {
/**
* Decodes the transaction log messages' key
*
* @return the key
* @return left with the version if the key is not a transaction log key, right with the transactional id otherwise
*/
def readTxnRecordKey(buffer: ByteBuffer): BaseKey = {
def readTxnRecordKey(buffer: ByteBuffer): Either[Short, String] = {
val version = buffer.getShort
if (version == CoordinatorRecordType.TRANSACTION_LOG.id) {
val value = new TransactionLogKey(new ByteBufferAccessor(buffer), 0.toShort)
TxnKey(
version = version,
transactionalId = value.transactionalId
)
} else {
UnknownKey(version)
}
Either.cond(
version == CoordinatorRecordType.TRANSACTION_LOG.id,
new TransactionLogKey(new ByteBufferAccessor(buffer), 0.toShort).transactionalId,
version
)
}
/**
@ -143,18 +139,3 @@ object TransactionLog {
}
}
}
sealed trait BaseKey{
def version: Short
def transactionalId: String
}
case class TxnKey(version: Short, transactionalId: String) extends BaseKey {
override def toString: String = transactionalId
}
case class UnknownKey(version: Short) extends BaseKey {
override def transactionalId: String = null
override def toString: String = transactionalId
}

View File

@ -465,25 +465,22 @@ class TransactionStateManager(brokerId: Int,
fileRecords.readInto(buffer, 0)
MemoryRecords.readableRecords(buffer)
}
memRecords.batches.forEach { batch =>
for (record <- batch.asScala) {
require(record.hasKey, "Transaction state log's key should not be null")
TransactionLog.readTxnRecordKey(record.key) match {
case txnKey: TxnKey =>
case Left(version) =>
warn(s"Unknown message key with version $version" +
s" while loading transaction state from $topicPartition. Ignoring it. " +
"It could be a left over from an aborted upgrade.")
case Right(transactionalId) =>
// load transaction metadata along with transaction state
val transactionalId = txnKey.transactionalId
TransactionLog.readTxnRecordValue(transactionalId, record.value) match {
case None =>
loadedTransactions.remove(transactionalId)
case Some(txnMetadata) =>
loadedTransactions.put(transactionalId, txnMetadata)
}
case unknownKey: UnknownKey =>
warn(s"Unknown message key with version ${unknownKey.version}" +
s" while loading transaction state from $topicPartition. Ignoring it. " +
"It could be a left over from an aborted upgrade.")
}
}
currOffset = batch.nextOffset

View File

@ -25,7 +25,7 @@ import org.apache.kafka.common.protocol.types.{CompactArrayOf, Field, Schema, St
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogValue}
import org.apache.kafka.server.common.TransactionVersion.{TV_0, TV_2}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
import org.junit.jupiter.api.Test
import java.nio.ByteBuffer
@ -89,21 +89,23 @@ class TransactionLogTest {
var count = 0
for (record <- records.records.asScala) {
val txnKey = TransactionLog.readTxnRecordKey(record.key)
val transactionalId = txnKey.transactionalId
val txnMetadata = TransactionLog.readTxnRecordValue(transactionalId, record.value).get
TransactionLog.readTxnRecordKey(record.key) match {
case Left(version) => fail(s"Unexpected record version: $version")
case Right(transactionalId) =>
val txnMetadata = TransactionLog.readTxnRecordValue(transactionalId, record.value).get
assertEquals(pidMappings(transactionalId), txnMetadata.producerId)
assertEquals(producerEpoch, txnMetadata.producerEpoch)
assertEquals(transactionTimeoutMs, txnMetadata.txnTimeoutMs)
assertEquals(transactionStates(txnMetadata.producerId), txnMetadata.state)
assertEquals(pidMappings(transactionalId), txnMetadata.producerId)
assertEquals(producerEpoch, txnMetadata.producerEpoch)
assertEquals(transactionTimeoutMs, txnMetadata.txnTimeoutMs)
assertEquals(transactionStates(txnMetadata.producerId), txnMetadata.state)
if (txnMetadata.state.equals(Empty))
assertEquals(Set.empty[TopicPartition], txnMetadata.topicPartitions)
else
assertEquals(topicPartitions, txnMetadata.topicPartitions)
if (txnMetadata.state.equals(Empty))
assertEquals(Set.empty[TopicPartition], txnMetadata.topicPartitions)
else
assertEquals(topicPartitions, txnMetadata.topicPartitions)
count = count + 1
count = count + 1
}
}
assertEquals(pidMappings.size, count)
@ -235,7 +237,9 @@ class TransactionLogTest {
def testReadTxnRecordKeyCanReadUnknownMessage(): Unit = {
val record = new TransactionLogKey()
val unknownRecord = MessageUtil.toVersionPrefixedBytes(Short.MaxValue, record)
val key = TransactionLog.readTxnRecordKey(ByteBuffer.wrap(unknownRecord))
assertEquals(UnknownKey(Short.MaxValue), key)
TransactionLog.readTxnRecordKey(ByteBuffer.wrap(unknownRecord)) match {
case Left(version) => assertEquals(Short.MaxValue, version)
case Right(_) => fail("Expected to read unknown message")
}
}
}

View File

@ -889,10 +889,13 @@ class TransactionStateManagerTest {
appendedRecords.values.foreach { batches =>
batches.foreach { records =>
records.records.forEach { record =>
val transactionalId = TransactionLog.readTxnRecordKey(record.key).transactionalId
assertNull(record.value)
expiredTransactionalIds += transactionalId
assertEquals(Right(None), transactionManager.getTransactionState(transactionalId))
TransactionLog.readTxnRecordKey(record.key) match {
case Right(transactionalId) =>
assertNull(record.value)
expiredTransactionalIds += transactionalId
assertEquals(Right(None), transactionManager.getTransactionState(transactionalId))
case Left(value) => fail(s"Failed to read transactional id from tombstone: $value")
}
}
}
}