KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module (#13043)

For broader context on this change, see:
* KAFKA-14470: Move log layer to storage module.

Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Satish Duggana 2023-01-08 09:43:38 +05:30 committed by GitHub
parent d798ec779c
commit 2dec39d6e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 665 additions and 377 deletions

View File

@ -32,7 +32,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, LogDirFailureChannel, OffsetMap, SkimpyOffsetMap, TransactionIndex}
import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, LastRecord, LogDirFailureChannel, OffsetMap, SkimpyOffsetMap, TransactionIndex}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
@ -680,9 +680,10 @@ private[log] class Cleaner(val id: Int,
// 3) The last entry in the log is a transaction marker. We retain this marker since it has the
// last producer epoch, which is needed to ensure fencing.
lastRecordsOfActiveProducers.get(batch.producerId).exists { lastRecord =>
lastRecord.lastDataOffset match {
case Some(offset) => batch.lastOffset == offset
case None => batch.isControlBatch && batch.producerEpoch == lastRecord.producerEpoch
if (lastRecord.lastDataOffset.isPresent) {
batch.lastOffset == lastRecord.lastDataOffset.getAsLong
} else {
batch.isControlBatch && batch.producerEpoch == lastRecord.producerEpoch
}
}
}

View File

@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LazyIndex, LogConfig, LogOffsetMetadata, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult}
import java.util.Optional
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.math._
@ -249,7 +250,7 @@ class LogSegment private[log] (val log: FileRecords,
if (batch.hasProducerId) {
val producerId = batch.producerId
val appendInfo = producerStateManager.prepareUpdate(producerId, origin = AppendOrigin.REPLICATION)
val maybeCompletedTxn = appendInfo.append(batch, firstOffsetMetadataOpt = None)
val maybeCompletedTxn = appendInfo.append(batch, Optional.empty()).asScala
producerStateManager.update(appendInfo)
maybeCompletedTxn.foreach { completedTxn =>
val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)

View File

@ -20,9 +20,8 @@ import kafka.server.{BrokerReconfigurable, KafkaConfig}
import kafka.utils.{Logging, nonthreadsafe, threadsafe}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.types._
import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.utils.{ByteUtils, Crc32C, Time}
import org.apache.kafka.server.log.internals._
@ -30,320 +29,11 @@ import java.io.File
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.{Files, NoSuchFileException, StandardOpenOption}
import java.util.{Optional, OptionalLong}
import java.util.concurrent.ConcurrentSkipListMap
import scala.collection.mutable.ListBuffer
import scala.collection.{immutable, mutable}
import scala.jdk.CollectionConverters._
/**
* The last written record for a given producer. The last data offset may be undefined
* if the only log entry for a producer is a transaction marker.
*/
case class LastRecord(lastDataOffset: Option[Long], producerEpoch: Short)
private[log] case class TxnMetadata(
producerId: Long,
firstOffset: LogOffsetMetadata,
var lastOffset: Option[Long] = None
) {
def this(producerId: Long, firstOffset: Long) = this(producerId, new LogOffsetMetadata(firstOffset))
override def toString: String = {
"TxnMetadata(" +
s"producerId=$producerId, " +
s"firstOffset=$firstOffset, " +
s"lastOffset=$lastOffset)"
}
}
private[log] object ProducerStateEntry {
private[log] val NumBatchesToRetain = 5
def empty(producerId: Long) = new ProducerStateEntry(producerId,
batchMetadata = mutable.Queue[BatchMetadata](),
producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
coordinatorEpoch = -1,
lastTimestamp = RecordBatch.NO_TIMESTAMP,
currentTxnFirstOffset = None)
}
private[log] case class BatchMetadata(lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long) {
def firstSeq: Int = DefaultRecordBatch.decrementSequence(lastSeq, offsetDelta)
def firstOffset: Long = lastOffset - offsetDelta
override def toString: String = {
"BatchMetadata(" +
s"firstSeq=$firstSeq, " +
s"lastSeq=$lastSeq, " +
s"firstOffset=$firstOffset, " +
s"lastOffset=$lastOffset, " +
s"timestamp=$timestamp)"
}
}
// the batchMetadata is ordered such that the batch with the lowest sequence is at the head of the queue while the
// batch with the highest sequence is at the tail of the queue. We will retain at most ProducerStateEntry.NumBatchesToRetain
// elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch.
private[log] class ProducerStateEntry(val producerId: Long,
val batchMetadata: mutable.Queue[BatchMetadata],
var producerEpoch: Short,
var coordinatorEpoch: Int,
var lastTimestamp: Long,
var currentTxnFirstOffset: Option[Long]) {
def firstSeq: Int = if (isEmpty) RecordBatch.NO_SEQUENCE else batchMetadata.front.firstSeq
def firstDataOffset: Long = if (isEmpty) -1L else batchMetadata.front.firstOffset
def lastSeq: Int = if (isEmpty) RecordBatch.NO_SEQUENCE else batchMetadata.last.lastSeq
def lastDataOffset: Long = if (isEmpty) -1L else batchMetadata.last.lastOffset
def lastOffsetDelta : Int = if (isEmpty) 0 else batchMetadata.last.offsetDelta
def isEmpty: Boolean = batchMetadata.isEmpty
def addBatch(producerEpoch: Short, lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long): Unit = {
maybeUpdateProducerEpoch(producerEpoch)
addBatchMetadata(BatchMetadata(lastSeq, lastOffset, offsetDelta, timestamp))
this.lastTimestamp = timestamp
}
def maybeUpdateProducerEpoch(producerEpoch: Short): Boolean = {
if (this.producerEpoch != producerEpoch) {
batchMetadata.clear()
this.producerEpoch = producerEpoch
true
} else {
false
}
}
private def addBatchMetadata(batch: BatchMetadata): Unit = {
if (batchMetadata.size == ProducerStateEntry.NumBatchesToRetain)
batchMetadata.dequeue()
batchMetadata.enqueue(batch)
}
def update(nextEntry: ProducerStateEntry): Unit = {
maybeUpdateProducerEpoch(nextEntry.producerEpoch)
while (nextEntry.batchMetadata.nonEmpty)
addBatchMetadata(nextEntry.batchMetadata.dequeue())
this.coordinatorEpoch = nextEntry.coordinatorEpoch
this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset
this.lastTimestamp = nextEntry.lastTimestamp
}
def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = {
if (batch.producerEpoch != producerEpoch)
None
else
batchWithSequenceRange(batch.baseSequence, batch.lastSequence)
}
// Return the batch metadata of the cached batch having the exact sequence range, if any.
def batchWithSequenceRange(firstSeq: Int, lastSeq: Int): Option[BatchMetadata] = {
val duplicate = batchMetadata.filter { metadata =>
firstSeq == metadata.firstSeq && lastSeq == metadata.lastSeq
}
duplicate.headOption
}
override def toString: String = {
"ProducerStateEntry(" +
s"producerId=$producerId, " +
s"producerEpoch=$producerEpoch, " +
s"currentTxnFirstOffset=$currentTxnFirstOffset, " +
s"coordinatorEpoch=$coordinatorEpoch, " +
s"lastTimestamp=$lastTimestamp, " +
s"batchMetadata=$batchMetadata"
}
}
/**
* This class is used to validate the records appended by a given producer before they are written to the log.
* It is initialized with the producer's state after the last successful append, and transitively validates the
* sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata
* as the incoming records are validated.
*
* @param producerId The id of the producer appending to the log
* @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of
* the most recent appends made by the producer. Validation of the first incoming append will
* be made against the latest append in the current entry. New appends will replace older appends
* in the current entry so that the space overhead is constant.
* @param origin Indicates the origin of the append which implies the extent of validation. For example, offset
* commits, which originate from the group coordinator, do not have sequence numbers and therefore
* only producer epoch validation is done. Appends which come through replication are not validated
* (we assume the validation has already been done) and appends from clients require full validation.
*/
private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
val producerId: Long,
val currentEntry: ProducerStateEntry,
val origin: AppendOrigin) extends Logging {
private val transactions = ListBuffer.empty[TxnMetadata]
private val updatedEntry = ProducerStateEntry.empty(producerId)
updatedEntry.producerEpoch = currentEntry.producerEpoch
updatedEntry.coordinatorEpoch = currentEntry.coordinatorEpoch
updatedEntry.lastTimestamp = currentEntry.lastTimestamp
updatedEntry.currentTxnFirstOffset = currentEntry.currentTxnFirstOffset
private def maybeValidateDataBatch(producerEpoch: Short, firstSeq: Int, offset: Long): Unit = {
checkProducerEpoch(producerEpoch, offset)
if (origin == AppendOrigin.CLIENT) {
checkSequence(producerEpoch, firstSeq, offset)
}
}
private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = {
if (producerEpoch < updatedEntry.producerEpoch) {
val message = s"Epoch of producer $producerId at offset $offset in $topicPartition is $producerEpoch, " +
s"which is smaller than the last seen epoch ${updatedEntry.producerEpoch}"
if (origin == AppendOrigin.REPLICATION) {
warn(message)
} else {
// Starting from 2.7, we replaced ProducerFenced error with InvalidProducerEpoch in the
// producer send response callback to differentiate from the former fatal exception,
// letting client abort the ongoing transaction and retry.
throw new InvalidProducerEpochException(message)
}
}
}
private def checkSequence(producerEpoch: Short, appendFirstSeq: Int, offset: Long): Unit = {
if (producerEpoch != updatedEntry.producerEpoch) {
if (appendFirstSeq != 0) {
if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch of producer $producerId " +
s"at offset $offset in partition $topicPartition: $producerEpoch (request epoch), $appendFirstSeq (seq. number), " +
s"${updatedEntry.producerEpoch} (current producer epoch)")
}
}
} else {
val currentLastSeq = if (!updatedEntry.isEmpty)
updatedEntry.lastSeq
else if (producerEpoch == currentEntry.producerEpoch)
currentEntry.lastSeq
else
RecordBatch.NO_SEQUENCE
// If there is no current producer epoch (possibly because all producer records have been deleted due to
// retention or the DeleteRecords API) accept writes with any sequence number
if (!(currentEntry.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) {
throw new OutOfOrderSequenceException(s"Out of order sequence number for producer $producerId at " +
s"offset $offset in partition $topicPartition: $appendFirstSeq (incoming seq. number), " +
s"$currentLastSeq (current end sequence number)")
}
}
}
private def inSequence(lastSeq: Int, nextSeq: Int): Boolean = {
nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Int.MaxValue)
}
def append(batch: RecordBatch, firstOffsetMetadataOpt: Option[LogOffsetMetadata]): Option[CompletedTxn] = {
if (batch.isControlBatch) {
val recordIterator = batch.iterator
if (recordIterator.hasNext) {
val record = recordIterator.next()
val endTxnMarker = EndTransactionMarker.deserialize(record)
appendEndTxnMarker(endTxnMarker, batch.producerEpoch, batch.baseOffset, record.timestamp)
} else {
// An empty control batch means the entire transaction has been cleaned from the log, so no need to append
None
}
} else {
val firstOffsetMetadata = firstOffsetMetadataOpt.getOrElse(new LogOffsetMetadata(batch.baseOffset))
appendDataBatch(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp,
firstOffsetMetadata, batch.lastOffset, batch.isTransactional)
None
}
}
def appendDataBatch(epoch: Short,
firstSeq: Int,
lastSeq: Int,
lastTimestamp: Long,
firstOffsetMetadata: LogOffsetMetadata,
lastOffset: Long,
isTransactional: Boolean): Unit = {
val firstOffset = firstOffsetMetadata.messageOffset
maybeValidateDataBatch(epoch, firstSeq, firstOffset)
updatedEntry.addBatch(epoch, lastSeq, lastOffset, (lastOffset - firstOffset).toInt, lastTimestamp)
updatedEntry.currentTxnFirstOffset match {
case Some(_) if !isTransactional =>
// Received a non-transactional message while a transaction is active
throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId at " +
s"offset $firstOffsetMetadata in partition $topicPartition")
case None if isTransactional =>
// Began a new transaction
updatedEntry.currentTxnFirstOffset = Some(firstOffset)
transactions += TxnMetadata(producerId, firstOffsetMetadata)
case _ => // nothing to do
}
}
private def checkCoordinatorEpoch(endTxnMarker: EndTransactionMarker, offset: Long): Unit = {
if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch) {
if (origin == AppendOrigin.REPLICATION) {
info(s"Detected invalid coordinator epoch for producerId $producerId at " +
s"offset $offset in partition $topicPartition: ${endTxnMarker.coordinatorEpoch} " +
s"is older than previously known coordinator epoch ${updatedEntry.coordinatorEpoch}")
} else {
throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch for producerId $producerId at " +
s"offset $offset in partition $topicPartition: ${endTxnMarker.coordinatorEpoch} " +
s"(zombie), ${updatedEntry.coordinatorEpoch} (current)")
}
}
}
def appendEndTxnMarker(
endTxnMarker: EndTransactionMarker,
producerEpoch: Short,
offset: Long,
timestamp: Long
): Option[CompletedTxn] = {
checkProducerEpoch(producerEpoch, offset)
checkCoordinatorEpoch(endTxnMarker, offset)
// Only emit the `CompletedTxn` for non-empty transactions. A transaction marker
// without any associated data will not have any impact on the last stable offset
// and would not need to be reflected in the transaction index.
val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset =>
new CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT)
}
updatedEntry.maybeUpdateProducerEpoch(producerEpoch)
updatedEntry.currentTxnFirstOffset = None
updatedEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch
updatedEntry.lastTimestamp = timestamp
completedTxn
}
def toEntry: ProducerStateEntry = updatedEntry
def startedTransactions: List[TxnMetadata] = transactions.toList
override def toString: String = {
"ProducerAppendInfo(" +
s"producerId=$producerId, " +
s"producerEpoch=${updatedEntry.producerEpoch}, " +
s"firstSequence=${updatedEntry.firstSeq}, " +
s"lastSequence=${updatedEntry.lastSeq}, " +
s"currentTxnFirstOffset=${updatedEntry.currentTxnFirstOffset}, " +
s"coordinatorEpoch=${updatedEntry.coordinatorEpoch}, " +
s"lastTimestamp=${updatedEntry.lastTimestamp}, " +
s"startedTransactions=$transactions)"
}
}
object ProducerStateManager {
val LateTransactionBufferMs = 5 * 60 * 1000
@ -403,13 +93,11 @@ object ProducerStateManager {
val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField)
val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
val lastAppendedDataBatches = mutable.Queue.empty[BatchMetadata]
if (offset >= 0)
lastAppendedDataBatches += BatchMetadata(seq, offset, offsetDelta, timestamp)
val newEntry = new ProducerStateEntry(producerId, lastAppendedDataBatches, producerEpoch,
coordinatorEpoch, timestamp, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None)
newEntry
val batchMetadata =
if (offset >= 0) Optional.of(new BatchMetadata(seq, offset, offsetDelta, timestamp))
else Optional.empty[BatchMetadata]()
val currentTxnFirstOffsetValue = if (currentTxnFirstOffset >= 0) OptionalLong.of(currentTxnFirstOffset) else OptionalLong.empty()
new ProducerStateEntry(producerId, producerEpoch, coordinatorEpoch, timestamp, currentTxnFirstOffsetValue, batchMetadata)
}
} catch {
case e: SchemaException =>
@ -431,7 +119,7 @@ object ProducerStateManager {
.set(OffsetDeltaField, entry.lastOffsetDelta)
.set(TimestampField, entry.lastTimestamp)
.set(CoordinatorEpochField, entry.coordinatorEpoch)
.set(CurrentTxnFirstOffsetField, entry.currentTxnFirstOffset.getOrElse(-1L))
.set(CurrentTxnFirstOffsetField, entry.currentTxnFirstOffset.orElse(-1L))
producerEntryStruct
}.toArray
struct.set(ProducerEntriesField, entriesArray)
@ -518,7 +206,7 @@ class ProducerStateManager(
val lastTimestamp = oldestTxnLastTimestamp
lastTimestamp > 0 && (currentTimeMs - lastTimestamp) > maxTransactionTimeoutMs + ProducerStateManager.LateTransactionBufferMs
}
def truncateFullyAndReloadSnapshots(): Unit = {
info("Reloading the producer state snapshots")
truncateFullyAndStartAt(0L)
@ -652,13 +340,11 @@ class ProducerStateManager(
private[log] def loadProducerEntry(entry: ProducerStateEntry): Unit = {
val producerId = entry.producerId
producers.put(producerId, entry)
entry.currentTxnFirstOffset.foreach { offset =>
ongoingTxns.put(offset, new TxnMetadata(producerId, offset))
}
entry.currentTxnFirstOffset.ifPresent((offset: Long) => ongoingTxns.put(offset, new TxnMetadata(producerId, offset)))
}
private def isProducerExpired(currentTimeMs: Long, producerState: ProducerStateEntry): Boolean =
producerState.currentTxnFirstOffset.isEmpty && currentTimeMs - producerState.lastTimestamp >= producerStateManagerConfig.producerIdExpirationMs
!producerState.currentTxnFirstOffset.isPresent && currentTimeMs - producerState.lastTimestamp >= producerStateManagerConfig.producerIdExpirationMs
/**
* Expire any producer ids which have been idle longer than the configured maximum expiration timeout.
@ -706,8 +392,8 @@ class ProducerStateManager(
* Update the mapping with the given append information
*/
def update(appendInfo: ProducerAppendInfo): Unit = {
if (appendInfo.producerId == RecordBatch.NO_PRODUCER_ID)
throw new IllegalArgumentException(s"Invalid producer id ${appendInfo.producerId} passed to update " +
if (appendInfo.producerId() == RecordBatch.NO_PRODUCER_ID)
throw new IllegalArgumentException(s"Invalid producer id ${appendInfo.producerId()} passed to update " +
s"for partition $topicPartition")
trace(s"Updated producer ${appendInfo.producerId} state to $appendInfo")
@ -720,7 +406,7 @@ class ProducerStateManager(
producers.put(appendInfo.producerId, updatedEntry)
}
appendInfo.startedTransactions.foreach { txn =>
appendInfo.startedTransactions.asScala.foreach { txn =>
ongoingTxns.put(txn.firstOffset.messageOffset, txn)
}
@ -809,7 +495,7 @@ class ProducerStateManager(
while (iterator.hasNext) {
val txnEntry = iterator.next()
val lastOffset = txnEntry.getValue.lastOffset
if (lastOffset.exists(_ < offset))
if (lastOffset.isPresent && lastOffset.getAsLong < offset)
iterator.remove()
}
}
@ -849,7 +535,7 @@ class ProducerStateManager(
throw new IllegalArgumentException(s"Attempted to complete transaction $completedTxn on partition $topicPartition " +
s"which was not started")
txnMetadata.lastOffset = Some(completedTxn.lastOffset)
txnMetadata.lastOffset = OptionalLong.of(completedTxn.lastOffset)
unreplicatedTxns.put(completedTxn.firstOffset, txnMetadata)
updateOldestTxnTimestamp()
}

View File

@ -21,7 +21,7 @@ import com.yammer.metrics.core.MetricName
import java.io.{File, IOException}
import java.nio.file.Files
import java.util.Optional
import java.util.{Optional, OptionalLong}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, TimeUnit}
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
@ -42,13 +42,14 @@ import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator}
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, LastRecord, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator, ProducerAppendInfo}
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.record.BrokerCompressionType
import scala.annotation.nowarn
import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, immutable, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
object LogAppendInfo {
@ -237,7 +238,7 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason {
*/
@threadsafe
class UnifiedLog(@volatile var logStartOffset: Long,
private[log] val localLog: LocalLog,
private val localLog: LocalLog,
brokerTopicStats: BrokerTopicStats,
val producerIdExpirationCheckIntervalMs: Int,
@volatile var leaderEpochCache: Option[LeaderEpochFileCache],
@ -672,7 +673,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
.setLastSequence(state.lastSeq)
.setLastTimestamp(state.lastTimestamp)
.setCoordinatorEpoch(state.coordinatorEpoch)
.setCurrentTxnStartOffset(state.currentTxnFirstOffset.getOrElse(-1L))
.setCurrentTxnStartOffset(state.currentTxnFirstOffset.orElse(-1L))
}
}.toSeq
}
@ -685,8 +686,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
private[log] def lastRecordsOfActiveProducers: Map[Long, LastRecord] = lock synchronized {
producerStateManager.activeProducers.map { case (producerId, producerIdEntry) =>
val lastDataOffset = if (producerIdEntry.lastDataOffset >= 0 ) Some(producerIdEntry.lastDataOffset) else None
val lastRecord = LastRecord(lastDataOffset, producerIdEntry.producerEpoch)
val lastDataOffset =
if (producerIdEntry.lastDataOffset >= 0) OptionalLong.of(producerIdEntry.lastDataOffset)
else OptionalLong.empty()
val lastRecord = new LastRecord(lastDataOffset, producerIdEntry.producerEpoch)
producerId -> lastRecord
}
}
@ -1083,7 +1086,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
if (origin == AppendOrigin.CLIENT) {
val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)
maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach { duplicate =>
maybeLastEntry.flatMap(_.findDuplicateBatch(batch).asScala).foreach { duplicate =>
return (updatedProducers, completedTxns.toList, Some(duplicate))
}
}
@ -1978,7 +1981,7 @@ object UnifiedLog extends Logging {
origin: AppendOrigin): Option[CompletedTxn] = {
val producerId = batch.producerId
val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, origin))
appendInfo.append(batch, firstOffsetMetadata)
appendInfo.append(batch, firstOffsetMetadata.asJava).asScala
}
/**

View File

@ -107,7 +107,7 @@ object DumpLogSegments {
print(s"producerId: ${entry.producerId} producerEpoch: ${entry.producerEpoch} " +
s"coordinatorEpoch: ${entry.coordinatorEpoch} currentTxnFirstOffset: ${entry.currentTxnFirstOffset} " +
s"lastTimestamp: ${entry.lastTimestamp} ")
entry.batchMetadata.headOption.foreach { metadata =>
entry.batchMetadata.asScala.headOption.foreach { metadata =>
print(s"firstSequence: ${metadata.firstSeq} lastSequence: ${metadata.lastSeq} " +
s"lastOffset: ${metadata.lastOffset} offsetDelta: ${metadata.offsetDelta} timestamp: ${metadata.timestamp}")
}

View File

@ -17,6 +17,8 @@
package kafka.log
import java.io.File
import java.util.OptionalLong
import kafka.server.checkpoints.LeaderEpochCheckpoint
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.utils.TestUtils
@ -25,7 +27,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{MockTime, Time, Utils}
import org.apache.kafka.server.log.internals.LogConfig
import org.apache.kafka.server.log.internals.{BatchMetadata, LogConfig, ProducerStateEntry}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -355,9 +357,7 @@ class LogSegmentTest {
// recover again, but this time assuming the transaction from pid2 began on a previous segment
stateManager = newProducerStateManager()
stateManager.loadProducerEntry(new ProducerStateEntry(pid2,
mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch,
0, RecordBatch.NO_TIMESTAMP, Some(75L)))
stateManager.loadProducerEntry(new ProducerStateEntry(pid2, producerEpoch, 0, RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L), java.util.Optional.of(new BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP))))
segment.recover(stateManager)
assertEquals(108L, stateManager.mapEndOffset)

View File

@ -21,7 +21,7 @@ import java.io.File
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.{Files, StandardOpenOption}
import java.util.Collections
import java.util.{Collections, Optional, OptionalLong}
import java.util.concurrent.atomic.AtomicInteger
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
@ -29,11 +29,15 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{MockTime, Utils}
import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn, LogOffsetMetadata}
import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn, LogOffsetMetadata, ProducerAppendInfo, ProducerStateEntry, TxnMetadata}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.Mockito.{mock, when}
import java.util
import scala.compat.java8.OptionConverters.RichOptionalGeneric
import scala.jdk.CollectionConverters._
class ProducerStateManagerTest {
private var logDir: File = _
private var stateManager: ProducerStateManager = _
@ -130,7 +134,7 @@ class ProducerStateManagerTest {
val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.REPLICATION)
// Sequence number wrap around
appendInfo.appendDataBatch(epoch, Int.MaxValue - 10, 9, time.milliseconds(),
new LogOffsetMetadata(2000L), 2020L, isTransactional = false)
new LogOffsetMetadata(2000L), 2020L, false)
assertEquals(None, stateManager.lastEntry(producerId))
stateManager.update(appendInfo)
assertTrue(stateManager.lastEntry(producerId).isDefined)
@ -182,7 +186,7 @@ class ProducerStateManagerTest {
val lastEntry = maybeLastEntry.get
assertEquals(bumpedProducerEpoch, lastEntry.producerEpoch)
assertEquals(None, lastEntry.currentTxnFirstOffset)
assertEquals(OptionalLong.empty(), lastEntry.currentTxnFirstOffset)
assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.firstSeq)
assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.lastSeq)
@ -200,7 +204,7 @@ class ProducerStateManagerTest {
val firstOffsetMetadata = new LogOffsetMetadata(offset, 990000L, 234224)
producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, time.milliseconds(),
firstOffsetMetadata, offset, isTransactional = true)
firstOffsetMetadata, offset, true)
stateManager.update(producerAppendInfo)
assertEquals(Some(firstOffsetMetadata), stateManager.firstUnstableOffset)
@ -218,7 +222,7 @@ class ProducerStateManagerTest {
appendInfo: ProducerAppendInfo
): Option[CompletedTxn] = {
appendInfo.appendEndTxnMarker(new EndTransactionMarker(recordType, coordinatorEpoch),
producerEpoch, offset, time.milliseconds())
producerEpoch, offset, time.milliseconds()).asScala
}
def appendData(
@ -228,14 +232,14 @@ class ProducerStateManagerTest {
): Unit = {
val count = (endOffset - startOffset).toInt
appendInfo.appendDataBatch(producerEpoch, seq.get(), seq.addAndGet(count), time.milliseconds(),
new LogOffsetMetadata(startOffset), endOffset, isTransactional = true)
new LogOffsetMetadata(startOffset), endOffset, true)
seq.incrementAndGet()
}
// Start one transaction in a separate append
val firstAppend = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT)
appendData(16L, 20L, firstAppend)
assertEquals(new TxnMetadata(producerId, 16L), firstAppend.startedTransactions.head)
assertTxnMetadataEquals(new TxnMetadata(producerId, 16L), firstAppend.startedTransactions.asScala.head)
stateManager.update(firstAppend)
stateManager.onHighWatermarkUpdated(21L)
assertEquals(Some(new LogOffsetMetadata(16L)), stateManager.firstUnstableOffset)
@ -255,8 +259,8 @@ class ProducerStateManagerTest {
appendData(30L, 31L, secondAppend)
assertEquals(2, secondAppend.startedTransactions.size)
assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(24L)), secondAppend.startedTransactions.head)
assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(30L)), secondAppend.startedTransactions.last)
assertTxnMetadataEquals(new TxnMetadata(producerId, new LogOffsetMetadata(24L)), secondAppend.startedTransactions.asScala.head)
assertTxnMetadataEquals(new TxnMetadata(producerId, new LogOffsetMetadata(30L)), secondAppend.startedTransactions.asScala.last)
stateManager.update(secondAppend)
stateManager.completeTxn(firstCompletedTxn.get)
stateManager.completeTxn(secondCompletedTxn.get)
@ -264,6 +268,21 @@ class ProducerStateManagerTest {
assertEquals(Some(new LogOffsetMetadata(30L)), stateManager.firstUnstableOffset)
}
def assertTxnMetadataEquals(expected: java.util.List[TxnMetadata], actual: java.util.List[TxnMetadata]): Unit = {
val expectedIter = expected.iterator()
val actualIter = actual.iterator()
assertEquals(expected.size(), actual.size())
while (expectedIter.hasNext && actualIter.hasNext) {
assertTxnMetadataEquals(expectedIter.next(), actualIter.next())
}
}
def assertTxnMetadataEquals(expected: TxnMetadata, actual: TxnMetadata): Unit = {
assertEquals(expected.producerId, actual.producerId)
assertEquals(expected.firstOffset, actual.firstOffset)
assertEquals(expected.lastOffset, actual.lastOffset)
}
@Test
def testHasLateTransaction(): Unit = {
val producerId1 = 39L
@ -373,7 +392,7 @@ class ProducerStateManagerTest {
)
val firstOffsetMetadata = new LogOffsetMetadata(startOffset, segmentBaseOffset, 50 * relativeOffset)
producerAppendInfo.appendDataBatch(producerEpoch, 0, 0, time.milliseconds(),
firstOffsetMetadata, startOffset, isTransactional = true)
firstOffsetMetadata, startOffset, true)
stateManager.update(producerAppendInfo)
}
@ -417,14 +436,14 @@ class ProducerStateManagerTest {
val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT)
appendInfo.appendDataBatch(producerEpoch, 0, 5, time.milliseconds(),
new LogOffsetMetadata(15L), 20L, isTransactional = false)
new LogOffsetMetadata(15L), 20L, false)
assertEquals(None, stateManager.lastEntry(producerId))
stateManager.update(appendInfo)
assertTrue(stateManager.lastEntry(producerId).isDefined)
val nextAppendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT)
nextAppendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(),
new LogOffsetMetadata(26L), 30L, isTransactional = false)
new LogOffsetMetadata(26L), 30L, false)
assertTrue(stateManager.lastEntry(producerId).isDefined)
var lastEntry = stateManager.lastEntry(producerId).get
@ -448,30 +467,30 @@ class ProducerStateManagerTest {
val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT)
appendInfo.appendDataBatch(producerEpoch, 1, 5, time.milliseconds(),
new LogOffsetMetadata(16L), 20L, isTransactional = true)
new LogOffsetMetadata(16L), 20L, true)
var lastEntry = appendInfo.toEntry
assertEquals(producerEpoch, lastEntry.producerEpoch)
assertEquals(1, lastEntry.firstSeq)
assertEquals(5, lastEntry.lastSeq)
assertEquals(16L, lastEntry.firstDataOffset)
assertEquals(20L, lastEntry.lastDataOffset)
assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
assertEquals(OptionalLong.of(16L), lastEntry.currentTxnFirstOffset)
assertTxnMetadataEquals(java.util.Arrays.asList(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
appendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(),
new LogOffsetMetadata(26L), 30L, isTransactional = true)
new LogOffsetMetadata(26L), 30L, true)
lastEntry = appendInfo.toEntry
assertEquals(producerEpoch, lastEntry.producerEpoch)
assertEquals(1, lastEntry.firstSeq)
assertEquals(10, lastEntry.lastSeq)
assertEquals(16L, lastEntry.firstDataOffset)
assertEquals(30L, lastEntry.lastDataOffset)
assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
assertEquals(OptionalLong.of(16L), lastEntry.currentTxnFirstOffset)
assertTxnMetadataEquals(util.Arrays.asList(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch)
val completedTxnOpt = appendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, 40L, time.milliseconds())
assertTrue(completedTxnOpt.isDefined)
assertTrue(completedTxnOpt.isPresent)
val completedTxn = completedTxnOpt.get
assertEquals(producerId, completedTxn.producerId)
@ -487,8 +506,8 @@ class ProducerStateManagerTest {
assertEquals(16L, lastEntry.firstDataOffset)
assertEquals(30L, lastEntry.lastDataOffset)
assertEquals(coordinatorEpoch, lastEntry.coordinatorEpoch)
assertEquals(None, lastEntry.currentTxnFirstOffset)
assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
assertEquals(OptionalLong.empty(), lastEntry.currentTxnFirstOffset)
assertTxnMetadataEquals(java.util.Arrays.asList(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
}
@Test
@ -571,7 +590,7 @@ class ProducerStateManagerTest {
assertEquals(1, loadedEntry.get.firstSeq)
assertEquals(1, loadedEntry.get.lastDataOffset)
assertEquals(1, loadedEntry.get.lastSeq)
assertEquals(Some(0), loadedEntry.get.currentTxnFirstOffset)
assertEquals(OptionalLong.of(0), loadedEntry.get.currentTxnFirstOffset)
// entry added after recovery
append(recoveredMapping, producerId, epoch, 2, 2L, isTransactional = true)
@ -595,7 +614,7 @@ class ProducerStateManagerTest {
assertEquals(1, loadedEntry.get.firstSeq)
assertEquals(1, loadedEntry.get.lastDataOffset)
assertEquals(1, loadedEntry.get.lastSeq)
assertEquals(None, loadedEntry.get.currentTxnFirstOffset)
assertEquals(OptionalLong.empty(), loadedEntry.get.currentTxnFirstOffset)
}
@Test
@ -613,7 +632,7 @@ class ProducerStateManagerTest {
val lastEntry = recoveredMapping.lastEntry(producerId)
assertTrue(lastEntry.isDefined)
assertEquals(appendTimestamp, lastEntry.get.lastTimestamp)
assertEquals(None, lastEntry.get.currentTxnFirstOffset)
assertEquals(OptionalLong.empty(), lastEntry.get.currentTxnFirstOffset)
}
@Test
@ -623,7 +642,7 @@ class ProducerStateManagerTest {
appendEndTxnMarker(stateManager, producerId, (epoch + 1).toShort, ControlRecordType.ABORT, offset = 1L)
val lastEntry = stateManager.lastEntry(producerId).get
assertEquals(None, lastEntry.currentTxnFirstOffset)
assertEquals(OptionalLong.empty(), lastEntry.currentTxnFirstOffset)
assertEquals(-1, lastEntry.lastDataOffset)
assertEquals(-1, lastEntry.firstDataOffset)
@ -992,7 +1011,7 @@ class ProducerStateManagerTest {
// Appending the empty control batch should not throw and a new transaction shouldn't be started
append(stateManager, producerId, baseOffset, batch, origin = AppendOrigin.CLIENT)
assertEquals(None, stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
assertEquals(OptionalLong.empty(), stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
}
@Test
@ -1101,7 +1120,7 @@ class ProducerStateManagerTest {
timestamp: Long = time.milliseconds()): Option[CompletedTxn] = {
val producerAppendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.COORDINATOR)
val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch)
val completedTxnOpt = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp)
val completedTxnOpt = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp).asScala
mapping.update(producerAppendInfo)
completedTxnOpt.foreach(mapping.completeTxn)
mapping.updateMapEndOffset(offset + 1)
@ -1129,7 +1148,7 @@ class ProducerStateManagerTest {
batch: RecordBatch,
origin: AppendOrigin): Unit = {
val producerAppendInfo = stateManager.prepareUpdate(producerId, origin)
producerAppendInfo.append(batch, firstOffsetMetadataOpt = None)
producerAppendInfo.append(batch, Optional.empty())
stateManager.update(producerAppendInfo)
stateManager.updateMapEndOffset(offset + 1)
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
import org.apache.kafka.common.record.DefaultRecordBatch;
public class BatchMetadata {
public final int lastSeq;
public final long lastOffset;
public final int offsetDelta;
public final long timestamp;
public BatchMetadata(
int lastSeq,
long lastOffset,
int offsetDelta,
long timestamp) {
this.lastSeq = lastSeq;
this.lastOffset = lastOffset;
this.offsetDelta = offsetDelta;
this.timestamp = timestamp;
}
public int firstSeq() {
return DefaultRecordBatch.decrementSequence(lastSeq, offsetDelta);
}
public long firstOffset() {
return lastOffset - offsetDelta;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BatchMetadata that = (BatchMetadata) o;
return lastSeq == that.lastSeq &&
lastOffset == that.lastOffset &&
offsetDelta == that.offsetDelta &&
timestamp == that.timestamp;
}
@Override
public int hashCode() {
int result = lastSeq;
result = 31 * result + Long.hashCode(lastOffset);
result = 31 * result + offsetDelta;
result = 31 * result + Long.hashCode(timestamp);
return result;
}
@Override
public String toString() {
return "BatchMetadata(" +
"firstSeq=" + firstSeq() +
", lastSeq=" + lastSeq +
", firstOffset=" + firstOffset() +
", lastOffset=" + lastOffset +
", timestamp=" + timestamp +
')';
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
import java.util.Objects;
import java.util.OptionalLong;
/**
* The last written record for a given producer. The last data offset may be undefined
* if the only log entry for a producer is a transaction marker.
*/
public final class LastRecord {
public final OptionalLong lastDataOffset;
public final short producerEpoch;
public LastRecord(OptionalLong lastDataOffset, short producerEpoch) {
Objects.requireNonNull(lastDataOffset, "lastDataOffset must be non null");
this.lastDataOffset = lastDataOffset;
this.producerEpoch = producerEpoch;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LastRecord that = (LastRecord) o;
return producerEpoch == that.producerEpoch &&
lastDataOffset.equals(that.lastDataOffset);
}
@Override
public int hashCode() {
return 31 * lastDataOffset.hashCode() + producerEpoch;
}
@Override
public String toString() {
return "LastRecord(" +
"lastDataOffset=" + lastDataOffset +
", producerEpoch=" + producerEpoch +
')';
}
}

View File

@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
/**
* This class is used to validate the records appended by a given producer before they are written to the log.
* It is initialized with the producer's state after the last successful append, and transitively validates the
* sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata
* as the incoming records are validated.
*/
public class ProducerAppendInfo {
private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class);
private final TopicPartition topicPartition;
private final long producerId;
private final ProducerStateEntry currentEntry;
private final AppendOrigin origin;
private final List<TxnMetadata> transactions = new ArrayList<>();
private final ProducerStateEntry updatedEntry;
/**
* Creates a new instance with the provided parameters.
*
* @param topicPartition topic partition
* @param producerId The id of the producer appending to the log
* @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of
* the most recent appends made by the producer. Validation of the first incoming append will
* be made against the latest append in the current entry. New appends will replace older appends
* in the current entry so that the space overhead is constant.
* @param origin Indicates the origin of the append which implies the extent of validation. For example, offset
* commits, which originate from the group coordinator, do not have sequence numbers and therefore
* only producer epoch validation is done. Appends which come through replication are not validated
* (we assume the validation has already been done) and appends from clients require full validation.
*/
public ProducerAppendInfo(TopicPartition topicPartition,
long producerId,
ProducerStateEntry currentEntry,
AppendOrigin origin) {
this.topicPartition = topicPartition;
this.producerId = producerId;
this.currentEntry = currentEntry;
this.origin = origin;
updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), currentEntry.coordinatorEpoch, currentEntry.lastTimestamp, currentEntry.currentTxnFirstOffset, Optional.empty());
}
public long producerId() {
return producerId;
}
private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) {
checkProducerEpoch(producerEpoch, offset);
if (origin == AppendOrigin.CLIENT) {
checkSequence(producerEpoch, firstSeq, offset);
}
}
private void checkProducerEpoch(short producerEpoch, long offset) {
if (producerEpoch < updatedEntry.producerEpoch()) {
String message = "Epoch of producer " + producerId + " at offset " + offset + " in " + topicPartition +
" is " + producerEpoch + ", " + "which is smaller than the last seen epoch " + updatedEntry.producerEpoch();
if (origin == AppendOrigin.REPLICATION) {
log.warn(message);
} else {
// Starting from 2.7, we replaced ProducerFenced error with InvalidProducerEpoch in the
// producer send response callback to differentiate from the former fatal exception,
// letting client abort the ongoing transaction and retry.
throw new InvalidProducerEpochException(message);
}
}
}
private void checkSequence(short producerEpoch, int appendFirstSeq, long offset) {
if (producerEpoch != updatedEntry.producerEpoch()) {
if (appendFirstSeq != 0) {
if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) {
throw new OutOfOrderSequenceException("Invalid sequence number for new epoch of producer " + producerId +
"at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), "
+ appendFirstSeq + " (seq. number), " + updatedEntry.producerEpoch() + " (current producer epoch)");
}
}
} else {
int currentLastSeq;
if (!updatedEntry.isEmpty())
currentLastSeq = updatedEntry.lastSeq();
else if (producerEpoch == currentEntry.producerEpoch())
currentLastSeq = currentEntry.lastSeq();
else
currentLastSeq = RecordBatch.NO_SEQUENCE;
// If there is no current producer epoch (possibly because all producer records have been deleted due to
// retention or the DeleteRecords API) accept writes with any sequence number
if (!(currentEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) {
throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " +
"offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq +
" (incoming seq. number), " + currentLastSeq + " (current end sequence number)");
}
}
}
private boolean inSequence(int lastSeq, int nextSeq) {
return nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Integer.MAX_VALUE);
}
public Optional<CompletedTxn> append(RecordBatch batch, Optional<LogOffsetMetadata> firstOffsetMetadataOpt) {
if (batch.isControlBatch()) {
Iterator<Record> recordIterator = batch.iterator();
if (recordIterator.hasNext()) {
Record record = recordIterator.next();
EndTransactionMarker endTxnMarker = EndTransactionMarker.deserialize(record);
return appendEndTxnMarker(endTxnMarker, batch.producerEpoch(), batch.baseOffset(), record.timestamp());
} else {
// An empty control batch means the entire transaction has been cleaned from the log, so no need to append
return Optional.empty();
}
} else {
LogOffsetMetadata firstOffsetMetadata = firstOffsetMetadataOpt.orElse(new LogOffsetMetadata(batch.baseOffset()));
appendDataBatch(batch.producerEpoch(), batch.baseSequence(), batch.lastSequence(), batch.maxTimestamp(),
firstOffsetMetadata, batch.lastOffset(), batch.isTransactional());
return Optional.empty();
}
}
public void appendDataBatch(short epoch,
int firstSeq,
int lastSeq,
long lastTimestamp,
LogOffsetMetadata firstOffsetMetadata,
long lastOffset,
boolean isTransactional) {
long firstOffset = firstOffsetMetadata.messageOffset;
maybeValidateDataBatch(epoch, firstSeq, firstOffset);
updatedEntry.addBatch(epoch, lastSeq, lastOffset, (int) (lastOffset - firstOffset), lastTimestamp);
OptionalLong currentTxnFirstOffset = updatedEntry.currentTxnFirstOffset;
if (currentTxnFirstOffset.isPresent() && !isTransactional) {
// Received a non-transactional message while a transaction is active
throw new InvalidTxnStateException("Expected transactional write from producer " + producerId + " at " +
"offset " + firstOffsetMetadata + " in partition " + topicPartition);
} else if (!currentTxnFirstOffset.isPresent() && isTransactional) {
// Began a new transaction
updatedEntry.currentTxnFirstOffset = OptionalLong.of(firstOffset);
transactions.add(new TxnMetadata(producerId, firstOffsetMetadata));
}
}
private void checkCoordinatorEpoch(EndTransactionMarker endTxnMarker, long offset) {
if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch()) {
if (origin == AppendOrigin.REPLICATION) {
log.info("Detected invalid coordinator epoch for producerId {} at offset {} in partition {}: {} is older than previously known coordinator epoch {}",
producerId, offset, topicPartition, endTxnMarker.coordinatorEpoch(), updatedEntry.coordinatorEpoch);
} else {
throw new TransactionCoordinatorFencedException("Invalid coordinator epoch for producerId " + producerId + " at " +
"offset " + offset + " in partition " + topicPartition + ": " + endTxnMarker.coordinatorEpoch() +
" (zombie), " + updatedEntry.coordinatorEpoch + " (current)");
}
}
}
public Optional<CompletedTxn> appendEndTxnMarker(
EndTransactionMarker endTxnMarker,
short producerEpoch,
long offset,
long timestamp) {
checkProducerEpoch(producerEpoch, offset);
checkCoordinatorEpoch(endTxnMarker, offset);
// Only emit the `CompletedTxn` for non-empty transactions. A transaction marker
// without any associated data will not have any impact on the last stable offset
// and would not need to be reflected in the transaction index.
Optional<CompletedTxn> completedTxn = updatedEntry.currentTxnFirstOffset.isPresent() ?
Optional.of(new CompletedTxn(producerId, updatedEntry.currentTxnFirstOffset.getAsLong(), offset,
endTxnMarker.controlType() == ControlRecordType.ABORT))
: Optional.empty();
updatedEntry.maybeUpdateProducerEpoch(producerEpoch);
updatedEntry.currentTxnFirstOffset = OptionalLong.empty();
updatedEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch();
updatedEntry.lastTimestamp = timestamp;
return completedTxn;
}
public ProducerStateEntry toEntry() {
return updatedEntry;
}
public List<TxnMetadata> startedTransactions() {
return Collections.unmodifiableList(transactions);
}
@Override
public String toString() {
return "ProducerAppendInfo(" +
"producerId=" + producerId +
", producerEpoch=" + updatedEntry.producerEpoch() +
", firstSequence=" + updatedEntry.firstSeq() +
", lastSequence=" + updatedEntry.lastSeq() +
", currentTxnFirstOffset=" + updatedEntry.currentTxnFirstOffset +
", coordinatorEpoch=" + updatedEntry.coordinatorEpoch +
", lastTimestamp=" + updatedEntry.lastTimestamp +
", startedTransactions=" + transactions +
')';
}
}

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
import org.apache.kafka.common.record.RecordBatch;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Stream;
/**
* This class represents the state of a specific producer-id.
* It contains batchMetadata queue which is ordered such that the batch with the lowest sequence is at the head of the
* queue while the batch with the highest sequence is at the tail of the queue. We will retain at most {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN}
* elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch.
*/
public class ProducerStateEntry {
public static final int NUM_BATCHES_TO_RETAIN = 5;
public int coordinatorEpoch;
public long lastTimestamp;
public OptionalLong currentTxnFirstOffset;
private final long producerId;
private final Deque<BatchMetadata> batchMetadata = new ArrayDeque<>();
private short producerEpoch;
public static ProducerStateEntry empty(long producerId) {
return new ProducerStateEntry(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty(), Optional.empty());
}
public ProducerStateEntry(long producerId) {
this(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty(), Optional.empty());
}
public ProducerStateEntry(long producerId, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset, Optional<BatchMetadata> firstBatchMetadata) {
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.coordinatorEpoch = coordinatorEpoch;
this.lastTimestamp = lastTimestamp;
this.currentTxnFirstOffset = currentTxnFirstOffset;
firstBatchMetadata.ifPresent(batchMetadata::add);
}
public int firstSeq() {
return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.getFirst().firstSeq();
}
public int lastSeq() {
return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.getLast().lastSeq;
}
public long firstDataOffset() {
return isEmpty() ? -1L : batchMetadata.getFirst().firstOffset();
}
public long lastDataOffset() {
return isEmpty() ? -1L : batchMetadata.getLast().lastOffset;
}
public int lastOffsetDelta() {
return isEmpty() ? 0 : batchMetadata.getLast().offsetDelta;
}
public boolean isEmpty() {
return batchMetadata.isEmpty();
}
public void addBatch(short producerEpoch, int lastSeq, long lastOffset, int offsetDelta, long timestamp) {
maybeUpdateProducerEpoch(producerEpoch);
addBatchMetadata(new BatchMetadata(lastSeq, lastOffset, offsetDelta, timestamp));
this.lastTimestamp = timestamp;
}
public boolean maybeUpdateProducerEpoch(short producerEpoch) {
if (this.producerEpoch != producerEpoch) {
batchMetadata.clear();
this.producerEpoch = producerEpoch;
return true;
} else {
return false;
}
}
private void addBatchMetadata(BatchMetadata batch) {
if (batchMetadata.size() == ProducerStateEntry.NUM_BATCHES_TO_RETAIN) batchMetadata.removeFirst();
batchMetadata.add(batch);
}
public void update(ProducerStateEntry nextEntry) {
maybeUpdateProducerEpoch(nextEntry.producerEpoch);
while (!nextEntry.batchMetadata.isEmpty()) addBatchMetadata(nextEntry.batchMetadata.removeFirst());
this.coordinatorEpoch = nextEntry.coordinatorEpoch;
this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset;
this.lastTimestamp = nextEntry.lastTimestamp;
}
public Optional<BatchMetadata> findDuplicateBatch(RecordBatch batch) {
if (batch.producerEpoch() != producerEpoch) return Optional.empty();
else return batchWithSequenceRange(batch.baseSequence(), batch.lastSequence());
}
// Return the batch metadata of the cached batch having the exact sequence range, if any.
Optional<BatchMetadata> batchWithSequenceRange(int firstSeq, int lastSeq) {
Stream<BatchMetadata> duplicate = batchMetadata.stream().filter(metadata -> firstSeq == metadata.firstSeq() && lastSeq == metadata.lastSeq);
return duplicate.findFirst();
}
public Collection<BatchMetadata> batchMetadata() {
return Collections.unmodifiableCollection(batchMetadata);
}
public short producerEpoch() {
return producerEpoch;
}
public long producerId() {
return producerId;
}
@Override
public String toString() {
return "ProducerStateEntry(" +
"producerId=" + producerId +
", producerEpoch=" + producerEpoch +
", currentTxnFirstOffset=" + currentTxnFirstOffset +
", coordinatorEpoch=" + coordinatorEpoch +
", lastTimestamp=" + lastTimestamp +
", batchMetadata=" + batchMetadata +
')';
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
import java.util.Objects;
import java.util.OptionalLong;
public final class TxnMetadata {
public final long producerId;
public final LogOffsetMetadata firstOffset;
public OptionalLong lastOffset;
public TxnMetadata(long producerId,
LogOffsetMetadata firstOffset,
OptionalLong lastOffset) {
Objects.requireNonNull(firstOffset, "firstOffset must be non null");
this.producerId = producerId;
this.firstOffset = firstOffset;
this.lastOffset = lastOffset;
}
public TxnMetadata(long producerId, long firstOffset) {
this(producerId, new LogOffsetMetadata(firstOffset));
}
public TxnMetadata(long producerId, LogOffsetMetadata firstOffset) {
this(producerId, firstOffset, OptionalLong.empty());
}
@Override
public String toString() {
return "TxnMetadata(" +
"producerId=" + producerId +
", firstOffset=" + firstOffset +
", lastOffset=" + lastOffset +
')';
}
}