KAFKA-14472: Move TransactionIndex and related to storage module (#12996)

For broader context on this change, please check:

* KAFKA-14470: Move log layer to storage module

Reviewers: Jun Rao <junrao@gmail.com>, Satish Duggana <satishd@apache.org>
This commit is contained in:
Ismael Juma 2022-12-19 11:31:37 -08:00 committed by GitHub
parent 802fb11d4e
commit e2678d57d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 556 additions and 344 deletions

View File

@ -30,7 +30,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeEx
import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.log.internals.OffsetPosition import org.apache.kafka.server.log.internals.{AbortedTxn, OffsetPosition}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.collection.{Seq, immutable} import scala.collection.{Seq, immutable}
@ -448,7 +448,7 @@ class LocalLog(@volatile private var _dir: File,
} }
val abortedTransactions = ListBuffer.empty[FetchResponseData.AbortedTransaction] val abortedTransactions = ListBuffer.empty[FetchResponseData.AbortedTransaction]
def accumulator(abortedTxns: List[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction) def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction)
collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator) collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator)
FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata, FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata,
@ -459,13 +459,13 @@ class LocalLog(@volatile private var _dir: File,
private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long, private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long,
startingSegment: LogSegment, startingSegment: LogSegment,
accumulator: List[AbortedTxn] => Unit): Unit = { accumulator: Seq[AbortedTxn] => Unit): Unit = {
val higherSegments = segments.higherSegments(startingSegment.baseOffset).iterator val higherSegments = segments.higherSegments(startingSegment.baseOffset).iterator
var segmentEntryOpt = Option(startingSegment) var segmentEntryOpt = Option(startingSegment)
while (segmentEntryOpt.isDefined) { while (segmentEntryOpt.isDefined) {
val segment = segmentEntryOpt.get val segment = segmentEntryOpt.get
val searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset) val searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset)
accumulator(searchResult.abortedTransactions) accumulator(searchResult.abortedTransactions.asScala)
if (searchResult.isComplete) if (searchResult.isComplete)
return return
segmentEntryOpt = nextOption(higherSegments) segmentEntryOpt = nextOption(higherSegments)
@ -475,7 +475,7 @@ class LocalLog(@volatile private var _dir: File,
private[log] def collectAbortedTransactions(logStartOffset: Long, baseOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { private[log] def collectAbortedTransactions(logStartOffset: Long, baseOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = {
val segmentEntry = segments.floorSegment(baseOffset) val segmentEntry = segments.floorSegment(baseOffset)
val allAbortedTxns = ListBuffer.empty[AbortedTxn] val allAbortedTxns = ListBuffer.empty[AbortedTxn]
def accumulator(abortedTxns: List[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns
segmentEntry.foreach(segment => collectAbortedTransactions(logStartOffset, upperBoundOffset, segment, accumulator)) segmentEntry.foreach(segment => collectAbortedTransactions(logStartOffset, upperBoundOffset, segment, accumulator))
allAbortedTxns.toList allAbortedTxns.toList
} }

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.server.log.internals.{AbortedTxn, TransactionIndex}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ListBuffer
@ -1123,7 +1124,7 @@ private[log] class CleanedTransactionMetadata {
private val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata] private val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata]
// Minheap of aborted transactions sorted by the transaction first offset // Minheap of aborted transactions sorted by the transaction first offset
private val abortedTransactions = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] { private val abortedTransactions = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] {
override def compare(x: AbortedTxn, y: AbortedTxn): Int = x.firstOffset compare y.firstOffset override def compare(x: AbortedTxn, y: AbortedTxn): Int = java.lang.Long.compare(x.firstOffset, y.firstOffset)
}.reverse) }.reverse)
// Output cleaned index to write retained aborted transactions // Output cleaned index to write retained aborted transactions

View File

@ -32,7 +32,7 @@ import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset} import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset}
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.server.log.internals.{OffsetPosition, TimestampOffset} import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn, OffsetPosition, TimestampOffset, TransactionIndex, TxnIndexSearchResult}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.math._ import scala.math._

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.types._ import org.apache.kafka.common.protocol.types._
import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch} import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch}
import org.apache.kafka.common.utils.{ByteUtils, Crc32C, Time, Utils} import org.apache.kafka.common.utils.{ByteUtils, Crc32C, Time, Utils}
import org.apache.kafka.server.log.internals.CompletedTxn
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ListBuffer
@ -318,7 +319,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
// without any associated data will not have any impact on the last stable offset // without any associated data will not have any impact on the last stable offset
// and would not need to be reflected in the transaction index. // and would not need to be reflected in the transaction index.
val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset => val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset =>
CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT) new CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT)
} }
updatedEntry.maybeUpdateProducerEpoch(producerEpoch) updatedEntry.maybeUpdateProducerEpoch(producerEpoch)

View File

@ -1,264 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io.{Closeable, File, IOException}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.{Files, StandardOpenOption}
import kafka.utils.{Logging, nonthreadsafe}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.CorruptIndexException
import scala.collection.mutable.ListBuffer
private[log] case class TxnIndexSearchResult(abortedTransactions: List[AbortedTxn], isComplete: Boolean)
/**
* The transaction index maintains metadata about the aborted transactions for each segment. This includes
* the start and end offsets for the aborted transactions and the last stable offset (LSO) at the time of
* the abort. This index is used to find the aborted transactions in the range of a given fetch request at
* the READ_COMMITTED isolation level.
*
* There is at most one transaction index for each log segment. The entries correspond to the transactions
* whose commit markers were written in the corresponding log segment. Note, however, that individual transactions
* may span multiple segments. Recovering the index therefore requires scanning the earlier segments in
* order to find the start of the transactions.
*/
@nonthreadsafe
class TransactionIndex(val startOffset: Long, @volatile private var _file: File) extends Closeable with Logging {
// note that the file is not created until we need it
@volatile private var maybeChannel: Option[FileChannel] = None
private var lastOffset: Option[Long] = None
if (_file.exists)
openChannel()
def append(abortedTxn: AbortedTxn): Unit = {
lastOffset.foreach { offset =>
if (offset >= abortedTxn.lastOffset)
throw new IllegalArgumentException(s"The last offset of appended transactions must increase sequentially, but " +
s"${abortedTxn.lastOffset} is not greater than current last offset $offset of index ${file.getAbsolutePath}")
}
lastOffset = Some(abortedTxn.lastOffset)
Utils.writeFully(channel(), abortedTxn.buffer.duplicate())
}
def flush(): Unit = maybeChannel.foreach(_.force(true))
def file: File = _file
def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName)
/**
* Delete this index.
*
* @throws IOException if deletion fails due to an I/O error
* @return `true` if the file was deleted by this method; `false` if the file could not be deleted because it did
* not exist
*/
def deleteIfExists(): Boolean = {
close()
Files.deleteIfExists(file.toPath)
}
private def channel(): FileChannel = {
maybeChannel match {
case Some(channel) => channel
case None => openChannel()
}
}
private def openChannel(): FileChannel = {
val channel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.READ,
StandardOpenOption.WRITE)
maybeChannel = Some(channel)
channel.position(channel.size)
channel
}
/**
* Remove all the entries from the index. Unlike `AbstractIndex`, this index is not resized ahead of time.
*/
def reset(): Unit = {
maybeChannel.foreach(_.truncate(0))
lastOffset = None
}
def close(): Unit = {
maybeChannel.foreach(_.close())
maybeChannel = None
}
def renameTo(f: File): Unit = {
try {
if (file.exists)
Utils.atomicMoveWithFallback(file.toPath, f.toPath, false)
} finally _file = f
}
def truncateTo(offset: Long): Unit = {
val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize)
var newLastOffset: Option[Long] = None
for ((abortedTxn, position) <- iterator(() => buffer)) {
if (abortedTxn.lastOffset >= offset) {
channel().truncate(position)
lastOffset = newLastOffset
return
}
newLastOffset = Some(abortedTxn.lastOffset)
}
}
private def iterator(allocate: () => ByteBuffer = () => ByteBuffer.allocate(AbortedTxn.TotalSize)): Iterator[(AbortedTxn, Int)] = {
maybeChannel match {
case None => Iterator.empty
case Some(channel) =>
var position = 0
new Iterator[(AbortedTxn, Int)] {
override def hasNext: Boolean = channel.position - position >= AbortedTxn.TotalSize
override def next(): (AbortedTxn, Int) = {
try {
val buffer = allocate()
Utils.readFully(channel, buffer, position)
buffer.flip()
val abortedTxn = new AbortedTxn(buffer)
if (abortedTxn.version > AbortedTxn.CurrentVersion)
throw new KafkaException(s"Unexpected aborted transaction version ${abortedTxn.version} " +
s"in transaction index ${file.getAbsolutePath}, current version is ${AbortedTxn.CurrentVersion}")
val nextEntry = (abortedTxn, position)
position += AbortedTxn.TotalSize
nextEntry
} catch {
case e: IOException =>
// We received an unexpected error reading from the index file. We propagate this as an
// UNKNOWN error to the consumer, which will cause it to retry the fetch.
throw new KafkaException(s"Failed to read from the transaction index ${file.getAbsolutePath}", e)
}
}
}
}
}
def allAbortedTxns: List[AbortedTxn] = {
iterator().map(_._1).toList
}
/**
* Collect all aborted transactions which overlap with a given fetch range.
*
* @param fetchOffset Inclusive first offset of the fetch range
* @param upperBoundOffset Exclusive last offset in the fetch range
* @return An object containing the aborted transactions and whether the search needs to continue
* into the next log segment.
*/
def collectAbortedTxns(fetchOffset: Long, upperBoundOffset: Long): TxnIndexSearchResult = {
val abortedTransactions = ListBuffer.empty[AbortedTxn]
for ((abortedTxn, _) <- iterator()) {
if (abortedTxn.lastOffset >= fetchOffset && abortedTxn.firstOffset < upperBoundOffset)
abortedTransactions += abortedTxn
if (abortedTxn.lastStableOffset >= upperBoundOffset)
return TxnIndexSearchResult(abortedTransactions.toList, isComplete = true)
}
TxnIndexSearchResult(abortedTransactions.toList, isComplete = false)
}
/**
* Do a basic sanity check on this index to detect obvious problems.
*
* @throws CorruptIndexException if any problems are found.
*/
def sanityCheck(): Unit = {
val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize)
for ((abortedTxn, _) <- iterator(() => buffer)) {
if (abortedTxn.lastOffset < startOffset)
throw new CorruptIndexException(s"Last offset of aborted transaction $abortedTxn in index " +
s"${file.getAbsolutePath} is less than start offset $startOffset")
}
}
}
private[log] object AbortedTxn {
val VersionOffset = 0
val VersionSize = 2
val ProducerIdOffset = VersionOffset + VersionSize
val ProducerIdSize = 8
val FirstOffsetOffset = ProducerIdOffset + ProducerIdSize
val FirstOffsetSize = 8
val LastOffsetOffset = FirstOffsetOffset + FirstOffsetSize
val LastOffsetSize = 8
val LastStableOffsetOffset = LastOffsetOffset + LastOffsetSize
val LastStableOffsetSize = 8
val TotalSize = LastStableOffsetOffset + LastStableOffsetSize
val CurrentVersion: Short = 0
}
private[log] class AbortedTxn(val buffer: ByteBuffer) {
import AbortedTxn._
def this(producerId: Long,
firstOffset: Long,
lastOffset: Long,
lastStableOffset: Long) = {
this(ByteBuffer.allocate(AbortedTxn.TotalSize))
buffer.putShort(CurrentVersion)
buffer.putLong(producerId)
buffer.putLong(firstOffset)
buffer.putLong(lastOffset)
buffer.putLong(lastStableOffset)
buffer.flip()
}
def this(completedTxn: CompletedTxn, lastStableOffset: Long) =
this(completedTxn.producerId, completedTxn.firstOffset, completedTxn.lastOffset, lastStableOffset)
def version: Short = buffer.get(VersionOffset)
def producerId: Long = buffer.getLong(ProducerIdOffset)
def firstOffset: Long = buffer.getLong(FirstOffsetOffset)
def lastOffset: Long = buffer.getLong(LastOffsetOffset)
def lastStableOffset: Long = buffer.getLong(LastStableOffsetOffset)
def asAbortedTransaction: FetchResponseData.AbortedTransaction = new FetchResponseData.AbortedTransaction()
.setProducerId(producerId)
.setFirstOffset(firstOffset)
override def toString: String =
s"AbortedTxn(version=$version, producerId=$producerId, firstOffset=$firstOffset, " +
s"lastOffset=$lastOffset, lastStableOffset=$lastStableOffset)"
override def equals(any: Any): Boolean = {
any match {
case that: AbortedTxn => this.buffer.equals(that.buffer)
case _ => false
}
}
override def hashCode(): Int = buffer.hashCode
}

View File

@ -44,6 +44,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0 import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn}
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import scala.annotation.nowarn import scala.annotation.nowarn
@ -159,26 +160,6 @@ case class LogReadInfo(fetchedData: FetchDataInfo,
logEndOffset: Long, logEndOffset: Long,
lastStableOffset: Long) lastStableOffset: Long)
/**
* A class used to hold useful metadata about a completed transaction. This is used to build
* the transaction index after appending to the log.
*
* @param producerId The ID of the producer
* @param firstOffset The first offset (inclusive) of the transaction
* @param lastOffset The last offset (inclusive) of the transaction. This is always the offset of the
* COMMIT/ABORT control record which indicates the transaction's completion.
* @param isAborted Whether or not the transaction was aborted
*/
case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, isAborted: Boolean) {
override def toString: String = {
"CompletedTxn(" +
s"producerId=$producerId, " +
s"firstOffset=$firstOffset, " +
s"lastOffset=$lastOffset, " +
s"isAborted=$isAborted)"
}
}
/** /**
* A class used to hold params required to decide to rotate a log segment or not. * A class used to hold params required to decide to rotate a log segment or not.
*/ */

View File

@ -22,7 +22,7 @@ import kafka.utils.{CoreUtils, Logging, ShutdownableThread}
import org.apache.kafka.common.Uuid import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.OffsetPosition import org.apache.kafka.server.log.internals.{OffsetPosition, TransactionIndex}
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageManager} import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageManager}

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.bootstrap.BootstrapDirectory import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
import org.apache.kafka.server.log.internals.TransactionIndex
import org.apache.kafka.snapshot.Snapshots import org.apache.kafka.snapshot.Snapshots
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -94,7 +95,7 @@ object DumpLogSegments {
private def dumpTxnIndex(file: File): Unit = { private def dumpTxnIndex(file: File): Unit = {
val index = new TransactionIndex(UnifiedLog.offsetFromFile(file), file) val index = new TransactionIndex(UnifiedLog.offsetFromFile(file), file)
for (abortedTxn <- index.allAbortedTxns) { for (abortedTxn <- index.allAbortedTxns.asScala) {
println(s"version: ${abortedTxn.version} producerId: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " + println(s"version: ${abortedTxn.version} producerId: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " +
s"lastOffset: ${abortedTxn.lastOffset} lastStableOffset: ${abortedTxn.lastStableOffset}") s"lastOffset: ${abortedTxn.lastOffset} lastStableOffset: ${abortedTxn.lastStableOffset}")
} }

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.AbortedTxn
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.api.{AfterEach, Test}
@ -328,8 +329,8 @@ class LogCleanerTest {
assertEquals(20L, log.logEndOffset) assertEquals(20L, log.logEndOffset)
val expectedAbortedTxns = List( val expectedAbortedTxns = List(
new AbortedTxn(producerId=producerId1, firstOffset=8, lastOffset=10, lastStableOffset=11), new AbortedTxn(producerId1, 8, 10, 11),
new AbortedTxn(producerId=producerId2, firstOffset=11, lastOffset=16, lastStableOffset=17) new AbortedTxn(producerId2, 11, 16, 17)
) )
assertAllTransactionsComplete(log) assertAllTransactionsComplete(log)

View File

@ -31,6 +31,7 @@ import org.apache.kafka.common.record.{CompressionType, ControlRecordType, Defau
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0 import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
import org.apache.kafka.server.log.internals.AbortedTxn
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue} import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

View File

@ -346,7 +346,7 @@ class LogSegmentTest {
var abortedTxns = segment.txnIndex.allAbortedTxns var abortedTxns = segment.txnIndex.allAbortedTxns
assertEquals(1, abortedTxns.size) assertEquals(1, abortedTxns.size)
var abortedTxn = abortedTxns.head var abortedTxn = abortedTxns.get(0)
assertEquals(pid2, abortedTxn.producerId) assertEquals(pid2, abortedTxn.producerId)
assertEquals(102L, abortedTxn.firstOffset) assertEquals(102L, abortedTxn.firstOffset)
assertEquals(106L, abortedTxn.lastOffset) assertEquals(106L, abortedTxn.lastOffset)
@ -362,7 +362,7 @@ class LogSegmentTest {
abortedTxns = segment.txnIndex.allAbortedTxns abortedTxns = segment.txnIndex.allAbortedTxns
assertEquals(1, abortedTxns.size) assertEquals(1, abortedTxns.size)
abortedTxn = abortedTxns.head abortedTxn = abortedTxns.get(0)
assertEquals(pid2, abortedTxn.producerId) assertEquals(pid2, abortedTxn.producerId)
assertEquals(75L, abortedTxn.firstOffset) assertEquals(75L, abortedTxn.firstOffset)
assertEquals(106L, abortedTxn.lastOffset) assertEquals(106L, abortedTxn.lastOffset)

View File

@ -21,7 +21,6 @@ import kafka.log.remote.RemoteLogManager
import java.io.File import java.io.File
import java.util.Properties import java.util.Properties
import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, LogDirFailureChannel} import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, LogDirFailureChannel}
import kafka.utils.{Scheduler, TestUtils} import kafka.utils.{Scheduler, TestUtils}
@ -29,10 +28,11 @@ import org.apache.kafka.common.Uuid
import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord} import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
import java.nio.file.Files import java.nio.file.Files
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import kafka.log import kafka.log
import org.apache.kafka.server.log.internals.{AbortedTxn, TransactionIndex}
import scala.collection.Iterable import scala.collection.Iterable
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -237,7 +237,7 @@ object LogTestUtils {
log.read(startOffset, maxLength, isolation, minOneMessage) log.read(startOffset, maxLength, isolation, minOneMessage)
} }
def allAbortedTransactions(log: UnifiedLog): Iterable[AbortedTxn] = log.logSegments.flatMap(_.txnIndex.allAbortedTxns) def allAbortedTransactions(log: UnifiedLog): Iterable[AbortedTxn] = log.logSegments.flatMap(_.txnIndex.allAbortedTxns.asScala)
def deleteProducerSnapshotFiles(logDir: File): Unit = { def deleteProducerSnapshotFiles(logDir: File): Unit = {
val files = logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(UnifiedLog.ProducerSnapshotFileSuffix)) val files = logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(UnifiedLog.ProducerSnapshotFileSuffix))

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{MockTime, Utils} import org.apache.kafka.common.utils.{MockTime, Utils}
import org.apache.kafka.server.log.internals.CompletedTxn
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.Mockito.{mock, when} import org.mockito.Mockito.{mock, when}
@ -246,7 +247,7 @@ class ProducerStateManagerTest {
// incomplete transaction // incomplete transaction
val secondAppend = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Client) val secondAppend = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Client)
val firstCompletedTxn = appendEndTxn(ControlRecordType.COMMIT, 21, secondAppend) val firstCompletedTxn = appendEndTxn(ControlRecordType.COMMIT, 21, secondAppend)
assertEquals(Some(CompletedTxn(producerId, 16L, 21, isAborted = false)), firstCompletedTxn) assertEquals(Some(new CompletedTxn(producerId, 16L, 21, false)), firstCompletedTxn)
assertEquals(None, appendEndTxn(ControlRecordType.COMMIT, 22, secondAppend)) assertEquals(None, appendEndTxn(ControlRecordType.COMMIT, 22, secondAppend))
assertEquals(None, appendEndTxn(ControlRecordType.ABORT, 23, secondAppend)) assertEquals(None, appendEndTxn(ControlRecordType.ABORT, 23, secondAppend))
appendData(24L, 27L, secondAppend) appendData(24L, 27L, secondAppend)
@ -392,21 +393,21 @@ class ProducerStateManagerTest {
beginTxn(producerId3, startOffset3) beginTxn(producerId3, startOffset3)
val lastOffset1 = startOffset3 + 15 val lastOffset1 = startOffset3 + 15
val completedTxn1 = CompletedTxn(producerId1, startOffset1, lastOffset1, isAborted = false) val completedTxn1 = new CompletedTxn(producerId1, startOffset1, lastOffset1, false)
assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn1)) assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn1))
stateManager.completeTxn(completedTxn1) stateManager.completeTxn(completedTxn1)
stateManager.onHighWatermarkUpdated(lastOffset1 + 1) stateManager.onHighWatermarkUpdated(lastOffset1 + 1)
assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset)) assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset))
val lastOffset3 = lastOffset1 + 20 val lastOffset3 = lastOffset1 + 20
val completedTxn3 = CompletedTxn(producerId3, startOffset3, lastOffset3, isAborted = false) val completedTxn3 = new CompletedTxn(producerId3, startOffset3, lastOffset3, false)
assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn3)) assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn3))
stateManager.completeTxn(completedTxn3) stateManager.completeTxn(completedTxn3)
stateManager.onHighWatermarkUpdated(lastOffset3 + 1) stateManager.onHighWatermarkUpdated(lastOffset3 + 1)
assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset)) assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset))
val lastOffset2 = lastOffset3 + 78 val lastOffset2 = lastOffset3 + 78
val completedTxn2 = CompletedTxn(producerId2, startOffset2, lastOffset2, isAborted = false) val completedTxn2 = new CompletedTxn(producerId2, startOffset2, lastOffset2, false)
assertEquals(lastOffset2 + 1, stateManager.lastStableOffset(completedTxn2)) assertEquals(lastOffset2 + 1, stateManager.lastStableOffset(completedTxn2))
stateManager.completeTxn(completedTxn2) stateManager.completeTxn(completedTxn2)
stateManager.onHighWatermarkUpdated(lastOffset2 + 1) stateManager.onHighWatermarkUpdated(lastOffset2 + 1)

View File

@ -18,11 +18,13 @@ package kafka.log
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.server.log.internals.CorruptIndexException import org.apache.kafka.server.log.internals.{AbortedTxn, CorruptIndexException, TransactionIndex}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import scala.jdk.CollectionConverters._
import java.io.File import java.io.File
import java.util.Collections
class TransactionIndexTest { class TransactionIndexTest {
var file: File = _ var file: File = _
@ -43,26 +45,26 @@ class TransactionIndexTest {
@Test @Test
def testPositionSetCorrectlyWhenOpened(): Unit = { def testPositionSetCorrectlyWhenOpened(): Unit = {
val abortedTxns = List( val abortedTxns = List(
new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11), new AbortedTxn(0L, 0, 10, 11),
new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13), new AbortedTxn(1L, 5, 15, 13),
new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25), new AbortedTxn(2L, 18, 35, 25),
new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40)) new AbortedTxn(3L, 32, 50, 40))
abortedTxns.foreach(index.append) abortedTxns.foreach(index.append)
index.close() index.close()
val reopenedIndex = new TransactionIndex(0L, file) val reopenedIndex = new TransactionIndex(0L, file)
val anotherAbortedTxn = new AbortedTxn(producerId = 3L, firstOffset = 50, lastOffset = 60, lastStableOffset = 55) val anotherAbortedTxn = new AbortedTxn(3L, 50, 60, 55)
reopenedIndex.append(anotherAbortedTxn) reopenedIndex.append(anotherAbortedTxn)
assertEquals(abortedTxns ++ List(anotherAbortedTxn), reopenedIndex.allAbortedTxns) assertEquals((abortedTxns ++ List(anotherAbortedTxn)).asJava, reopenedIndex.allAbortedTxns)
} }
@Test @Test
def testSanityCheck(): Unit = { def testSanityCheck(): Unit = {
val abortedTxns = List( val abortedTxns = List(
new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11), new AbortedTxn(0L, 0, 10, 11),
new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13), new AbortedTxn(1L, 5, 15, 13),
new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25), new AbortedTxn(2L, 18, 35, 25),
new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40)) new AbortedTxn(3L, 32, 50, 40))
abortedTxns.foreach(index.append) abortedTxns.foreach(index.append)
index.close() index.close()
@ -73,71 +75,71 @@ class TransactionIndexTest {
@Test @Test
def testLastOffsetMustIncrease(): Unit = { def testLastOffsetMustIncrease(): Unit = {
index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13)) index.append(new AbortedTxn(1L, 5, 15, 13))
assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(0L, 0,
lastOffset = 15, lastStableOffset = 11))) 15, 11)))
} }
@Test @Test
def testLastOffsetCannotDecrease(): Unit = { def testLastOffsetCannotDecrease(): Unit = {
index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13)) index.append(new AbortedTxn(1L, 5, 15, 13))
assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(0L, 0,
lastOffset = 10, lastStableOffset = 11))) 10, 11)))
} }
@Test @Test
def testCollectAbortedTransactions(): Unit = { def testCollectAbortedTransactions(): Unit = {
val abortedTransactions = List( val abortedTransactions = List(
new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11), new AbortedTxn(0L, 0, 10, 11),
new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13), new AbortedTxn(1L, 5, 15, 13),
new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25), new AbortedTxn(2L, 18, 35, 25),
new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40)) new AbortedTxn(3L, 32, 50, 40))
abortedTransactions.foreach(index.append) abortedTransactions.foreach(index.append)
var result = index.collectAbortedTxns(0L, 100L) var result = index.collectAbortedTxns(0L, 100L)
assertEquals(abortedTransactions, result.abortedTransactions) assertEquals(abortedTransactions.asJava, result.abortedTransactions)
assertFalse(result.isComplete) assertFalse(result.isComplete)
result = index.collectAbortedTxns(0L, 32) result = index.collectAbortedTxns(0L, 32)
assertEquals(abortedTransactions.take(3), result.abortedTransactions) assertEquals(abortedTransactions.take(3).asJava, result.abortedTransactions)
assertTrue(result.isComplete) assertTrue(result.isComplete)
result = index.collectAbortedTxns(0L, 35) result = index.collectAbortedTxns(0L, 35)
assertEquals(abortedTransactions, result.abortedTransactions) assertEquals(abortedTransactions.asJava, result.abortedTransactions)
assertTrue(result.isComplete) assertTrue(result.isComplete)
result = index.collectAbortedTxns(10, 35) result = index.collectAbortedTxns(10, 35)
assertEquals(abortedTransactions, result.abortedTransactions) assertEquals(abortedTransactions.asJava, result.abortedTransactions)
assertTrue(result.isComplete) assertTrue(result.isComplete)
result = index.collectAbortedTxns(11, 35) result = index.collectAbortedTxns(11, 35)
assertEquals(abortedTransactions.slice(1, 4), result.abortedTransactions) assertEquals(abortedTransactions.slice(1, 4).asJava, result.abortedTransactions)
assertTrue(result.isComplete) assertTrue(result.isComplete)
result = index.collectAbortedTxns(20, 41) result = index.collectAbortedTxns(20, 41)
assertEquals(abortedTransactions.slice(2, 4), result.abortedTransactions) assertEquals(abortedTransactions.slice(2, 4).asJava, result.abortedTransactions)
assertFalse(result.isComplete) assertFalse(result.isComplete)
} }
@Test @Test
def testTruncate(): Unit = { def testTruncate(): Unit = {
val abortedTransactions = List( val abortedTransactions = List(
new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2), new AbortedTxn(0L, 0, 10, 2),
new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16), new AbortedTxn(1L, 5, 15, 16),
new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25), new AbortedTxn(2L, 18, 35, 25),
new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40)) new AbortedTxn(3L, 32, 50, 40))
abortedTransactions.foreach(index.append) abortedTransactions.foreach(index.append)
index.truncateTo(51) index.truncateTo(51)
assertEquals(abortedTransactions, index.collectAbortedTxns(0L, 100L).abortedTransactions) assertEquals(abortedTransactions.asJava, index.collectAbortedTxns(0L, 100L).abortedTransactions)
index.truncateTo(50) index.truncateTo(50)
assertEquals(abortedTransactions.take(3), index.collectAbortedTxns(0L, 100L).abortedTransactions) assertEquals(abortedTransactions.take(3).asJava, index.collectAbortedTxns(0L, 100L).abortedTransactions)
index.reset() index.reset()
assertEquals(List.empty[FetchResponseData.AbortedTransaction], index.collectAbortedTxns(0L, 100L).abortedTransactions) assertEquals(Collections.emptyList[FetchResponseData.AbortedTransaction], index.collectAbortedTxns(0L, 100L).abortedTransactions)
} }
@Test @Test
@ -148,7 +150,7 @@ class TransactionIndexTest {
val lastStableOffset = 200L val lastStableOffset = 200L
val abortedTxn = new AbortedTxn(pid, firstOffset, lastOffset, lastStableOffset) val abortedTxn = new AbortedTxn(pid, firstOffset, lastOffset, lastStableOffset)
assertEquals(AbortedTxn.CurrentVersion, abortedTxn.version) assertEquals(AbortedTxn.CURRENT_VERSION, abortedTxn.version)
assertEquals(pid, abortedTxn.producerId) assertEquals(pid, abortedTxn.producerId)
assertEquals(firstOffset, abortedTxn.firstOffset) assertEquals(firstOffset, abortedTxn.firstOffset)
assertEquals(lastOffset, abortedTxn.lastOffset) assertEquals(lastOffset, abortedTxn.lastOffset)
@ -158,15 +160,15 @@ class TransactionIndexTest {
@Test @Test
def testRenameIndex(): Unit = { def testRenameIndex(): Unit = {
val renamed = TestUtils.tempFile() val renamed = TestUtils.tempFile()
index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2)) index.append(new AbortedTxn(0L, 0, 10, 2))
index.renameTo(renamed) index.renameTo(renamed)
index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16)) index.append(new AbortedTxn(1L, 5, 15, 16))
val abortedTxns = index.collectAbortedTxns(0L, 100L).abortedTransactions val abortedTxns = index.collectAbortedTxns(0L, 100L).abortedTransactions
assertEquals(2, abortedTxns.size) assertEquals(2, abortedTxns.size)
assertEquals(0, abortedTxns(0).firstOffset) assertEquals(0, abortedTxns.get(0).firstOffset)
assertEquals(5, abortedTxns(1).firstOffset) assertEquals(5, abortedTxns.get(1).firstOffset)
} }
@Test @Test

View File

@ -37,6 +37,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse} import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils} import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.server.log.internals.AbortedTxn
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
import org.apache.kafka.common.message.FetchResponseData;
import java.nio.ByteBuffer;
import java.util.Objects;
public class AbortedTxn {
static final int VERSION_OFFSET = 0;
static final int VERSION_SIZE = 2;
static final int PRODUCER_ID_OFFSET = VERSION_OFFSET + VERSION_SIZE;
static final int PRODUCER_ID_SIZE = 8;
static final int FIRST_OFFSET_OFFSET = PRODUCER_ID_OFFSET + PRODUCER_ID_SIZE;
static final int FIRST_OFFSET_SIZE = 8;
static final int LAST_OFFSET_OFFSET = FIRST_OFFSET_OFFSET + FIRST_OFFSET_SIZE;
static final int LAST_OFFSET_SIZE = 8;
static final int LAST_STABLE_OFFSET_OFFSET = LAST_OFFSET_OFFSET + LAST_OFFSET_SIZE;
static final int LAST_STABLE_OFFSET_SIZE = 8;
static final int TOTAL_SIZE = LAST_STABLE_OFFSET_OFFSET + LAST_STABLE_OFFSET_SIZE;
public static final short CURRENT_VERSION = 0;
final ByteBuffer buffer;
AbortedTxn(ByteBuffer buffer) {
Objects.requireNonNull(buffer);
this.buffer = buffer;
}
public AbortedTxn(CompletedTxn completedTxn, long lastStableOffset) {
this(completedTxn.producerId, completedTxn.firstOffset, completedTxn.lastOffset, lastStableOffset);
}
public AbortedTxn(long producerId, long firstOffset, long lastOffset, long lastStableOffset) {
this(toByteBuffer(producerId, firstOffset, lastOffset, lastStableOffset));
}
private static ByteBuffer toByteBuffer(long producerId, long firstOffset, long lastOffset, long lastStableOffset) {
ByteBuffer buffer = ByteBuffer.allocate(TOTAL_SIZE);
buffer.putShort(CURRENT_VERSION);
buffer.putLong(producerId);
buffer.putLong(firstOffset);
buffer.putLong(lastOffset);
buffer.putLong(lastStableOffset);
buffer.flip();
return buffer;
}
public short version() {
return buffer.get(VERSION_OFFSET);
}
public long producerId() {
return buffer.getLong(PRODUCER_ID_OFFSET);
}
public long firstOffset() {
return buffer.getLong(FIRST_OFFSET_OFFSET);
}
public long lastOffset() {
return buffer.getLong(LAST_OFFSET_OFFSET);
}
public long lastStableOffset() {
return buffer.getLong(LAST_STABLE_OFFSET_OFFSET);
}
public FetchResponseData.AbortedTransaction asAbortedTransaction() {
return new FetchResponseData.AbortedTransaction()
.setProducerId(producerId())
.setFirstOffset(firstOffset());
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
AbortedTxn that = (AbortedTxn) o;
return buffer.equals(that.buffer);
}
@Override
public int hashCode() {
return buffer.hashCode();
}
@Override
public String toString() {
return "AbortedTxn(version=" + version()
+ ", producerId=" + producerId()
+ ", firstOffset=" + firstOffset()
+ ", lastOffset=" + lastOffset()
+ ", lastStableOffset=" + lastStableOffset()
+ ")";
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
/**
* A class used to hold useful metadata about a completed transaction. This is used to build
* the transaction index after appending to the log.
*/
public class CompletedTxn {
public final long producerId;
public final long firstOffset;
public final long lastOffset;
public final boolean isAborted;
/**
* Create an instance of this class.
*
* @param producerId The ID of the producer
* @param firstOffset The first offset (inclusive) of the transaction
* @param lastOffset The last offset (inclusive) of the transaction. This is always the offset of the
* COMMIT/ABORT control record which indicates the transaction's completion.
* @param isAborted Whether the transaction was aborted
*/
public CompletedTxn(long producerId, long firstOffset, long lastOffset, boolean isAborted) {
this.producerId = producerId;
this.firstOffset = firstOffset;
this.lastOffset = lastOffset;
this.isAborted = isAborted;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CompletedTxn that = (CompletedTxn) o;
return producerId == that.producerId
&& firstOffset == that.firstOffset
&& lastOffset == that.lastOffset
&& isAborted == that.isAborted;
}
@Override
public int hashCode() {
int result = Long.hashCode(producerId);
result = 31 * result + Long.hashCode(firstOffset);
result = 31 * result + Long.hashCode(lastOffset);
result = 31 * result + Boolean.hashCode(isAborted);
return result;
}
@Override
public String toString() {
return "CompletedTxn(producerId=" + producerId +
", firstOffset=" + firstOffset +
", lastOffset=" + lastOffset +
", isAborted=" + isAborted +
')';
}
}

View File

@ -0,0 +1,264 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.PrimitiveRef;
import org.apache.kafka.common.utils.Utils;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Supplier;
/**
* The transaction index maintains metadata about the aborted transactions for each segment. This includes
* the start and end offsets for the aborted transactions and the last stable offset (LSO) at the time of
* the abort. This index is used to find the aborted transactions in the range of a given fetch request at
* the READ_COMMITTED isolation level.
*
* There is at most one transaction index for each log segment. The entries correspond to the transactions
* whose commit markers were written in the corresponding log segment. Note, however, that individual transactions
* may span multiple segments. Recovering the index therefore requires scanning the earlier segments in
* order to find the start of the transactions.
*/
public class TransactionIndex implements Closeable {
private static class AbortedTxnWithPosition {
final AbortedTxn txn;
final int position;
AbortedTxnWithPosition(AbortedTxn txn, int position) {
this.txn = txn;
this.position = position;
}
}
private final long startOffset;
private volatile File file;
// note that the file is not created until we need it
private Optional<FileChannel> maybeChannel = Optional.empty();
private OptionalLong lastOffset = OptionalLong.empty();
public TransactionIndex(long startOffset, File file) throws IOException {
this.startOffset = startOffset;
this.file = file;
if (file.exists())
openChannel();
}
public File file() {
return file;
}
public void updateParentDir(File parentDir) {
this.file = new File(parentDir, file.getName());
}
public void append(AbortedTxn abortedTxn) throws IOException {
lastOffset.ifPresent(offset -> {
if (offset >= abortedTxn.lastOffset())
throw new IllegalArgumentException("The last offset of appended transactions must increase sequentially, but "
+ abortedTxn.lastOffset() + " is not greater than current last offset " + offset + " of index "
+ file.getAbsolutePath());
});
lastOffset = OptionalLong.of(abortedTxn.lastOffset());
Utils.writeFully(channel(), abortedTxn.buffer.duplicate());
}
public void flush() throws IOException {
FileChannel channel = channelOrNull();
if (channel != null)
channel.force(true);
}
/**
* Remove all the entries from the index. Unlike `AbstractIndex`, this index is not resized ahead of time.
*/
public void reset() throws IOException {
FileChannel channel = channelOrNull();
if (channel != null)
channel.truncate(0);
lastOffset = OptionalLong.empty();
}
public void close() throws IOException {
FileChannel channel = channelOrNull();
if (channel != null)
channel.close();
maybeChannel = Optional.empty();
}
/**
* Delete this index.
*
* @throws IOException if deletion fails due to an I/O error
* @return `true` if the file was deleted by this method; `false` if the file could not be deleted because it did
* not exist
*/
public boolean deleteIfExists() throws IOException {
close();
return Files.deleteIfExists(file.toPath());
}
public void renameTo(File f) throws IOException {
try {
if (file.exists())
Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), false);
} finally {
this.file = f;
}
}
public void truncateTo(long offset) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE);
OptionalLong newLastOffset = OptionalLong.empty();
for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) {
AbortedTxn abortedTxn = txnWithPosition.txn;
long position = txnWithPosition.position;
if (abortedTxn.lastOffset() >= offset) {
channel().truncate(position);
lastOffset = newLastOffset;
return;
}
newLastOffset = OptionalLong.of(abortedTxn.lastOffset());
}
}
public List<AbortedTxn> allAbortedTxns() {
List<AbortedTxn> result = new ArrayList<>();
for (AbortedTxnWithPosition txnWithPosition : iterable())
result.add(txnWithPosition.txn);
return result;
}
/**
* Collect all aborted transactions which overlap with a given fetch range.
*
* @param fetchOffset Inclusive first offset of the fetch range
* @param upperBoundOffset Exclusive last offset in the fetch range
* @return An object containing the aborted transactions and whether the search needs to continue
* into the next log segment.
*/
public TxnIndexSearchResult collectAbortedTxns(long fetchOffset, long upperBoundOffset) {
List<AbortedTxn> abortedTransactions = new ArrayList<>();
for (AbortedTxnWithPosition txnWithPosition : iterable()) {
AbortedTxn abortedTxn = txnWithPosition.txn;
if (abortedTxn.lastOffset() >= fetchOffset && abortedTxn.firstOffset() < upperBoundOffset)
abortedTransactions.add(abortedTxn);
if (abortedTxn.lastStableOffset() >= upperBoundOffset)
return new TxnIndexSearchResult(abortedTransactions, true);
}
return new TxnIndexSearchResult(abortedTransactions, false);
}
/**
* Do a basic sanity check on this index to detect obvious problems.
*
* @throws CorruptIndexException if any problems are found.
*/
public void sanityCheck() {
ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE);
for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) {
AbortedTxn abortedTxn = txnWithPosition.txn;
if (abortedTxn.lastOffset() < startOffset)
throw new CorruptIndexException("Last offset of aborted transaction " + abortedTxn + " in index "
+ file.getAbsolutePath() + " is less than start offset " + startOffset);
}
}
private FileChannel openChannel() throws IOException {
FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE,
StandardOpenOption.READ, StandardOpenOption.WRITE);
maybeChannel = Optional.of(channel);
channel.position(channel.size());
return channel;
}
private FileChannel channel() throws IOException {
FileChannel channel = channelOrNull();
if (channel == null)
return openChannel();
else
return channel;
}
private FileChannel channelOrNull() {
return maybeChannel.orElse(null);
}
private Iterable<AbortedTxnWithPosition> iterable() {
return iterable(() -> ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE));
}
private Iterable<AbortedTxnWithPosition> iterable(Supplier<ByteBuffer> allocate) {
FileChannel channel = channelOrNull();
if (channel == null)
return Collections.emptyList();
PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0);
return () -> new Iterator<AbortedTxnWithPosition>() {
@Override
public boolean hasNext() {
try {
return channel.position() - position.value >= AbortedTxn.TOTAL_SIZE;
} catch (IOException e) {
throw new KafkaException("Failed read position from the transaction index " + file.getAbsolutePath(), e);
}
}
@Override
public AbortedTxnWithPosition next() {
try {
ByteBuffer buffer = allocate.get();
Utils.readFully(channel, buffer, position.value);
buffer.flip();
AbortedTxn abortedTxn = new AbortedTxn(buffer);
if (abortedTxn.version() > AbortedTxn.CURRENT_VERSION)
throw new KafkaException("Unexpected aborted transaction version " + abortedTxn.version()
+ " in transaction index " + file.getAbsolutePath() + ", current version is "
+ AbortedTxn.CURRENT_VERSION);
AbortedTxnWithPosition nextEntry = new AbortedTxnWithPosition(abortedTxn, position.value);
position.value += AbortedTxn.TOTAL_SIZE;
return nextEntry;
} catch (IOException e) {
// We received an unexpected error reading from the index file. We propagate this as an
// UNKNOWN error to the consumer, which will cause it to retry the fetch.
throw new KafkaException("Failed to read from the transaction index " + file.getAbsolutePath(), e);
}
}
};
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
import java.util.Collections;
import java.util.List;
public class TxnIndexSearchResult {
public final List<AbortedTxn> abortedTransactions;
public final boolean isComplete;
public TxnIndexSearchResult(List<AbortedTxn> abortedTransactions, boolean isComplete) {
this.abortedTransactions = Collections.unmodifiableList(abortedTransactions);
this.isComplete = isComplete;
}
}