KAFKA-10702; Skip bookkeeping of empty transactions (#9632)

Compacted topics can accumulate a large number of empty transaction markers as the data from the transactions gets cleaned. For each transaction, there is some bookkeeping that leaders and followers must do to keep the transaction index up to date. The cost of this overhead can degrade performance when a replica needs to catch up if the log has mostly empty or small transactions. This patch improves the cost by skipping over empty transactions since these will have no effect on the last stable offset and do not need to be reflected in the transaction index.

Reviewers: Lucas Bradstreet <lucas@confluent.io>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Jason Gustafson 2020-11-30 14:48:28 -08:00 committed by GitHub
parent 6add0594a7
commit e7de280b0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 102 additions and 40 deletions

View File

@ -44,7 +44,11 @@ class CorruptSnapshotException(msg: String) extends KafkaException(msg)
case class LastRecord(lastDataOffset: Option[Long], producerEpoch: Short)
private[log] case class TxnMetadata(producerId: Long, var firstOffset: LogOffsetMetadata, var lastOffset: Option[Long] = None) {
private[log] case class TxnMetadata(
producerId: Long,
firstOffset: LogOffsetMetadata,
var lastOffset: Option[Long] = None
) {
def this(producerId: Long, firstOffset: Long) = this(producerId, LogOffsetMetadata(firstOffset))
override def toString: String = {
@ -247,8 +251,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
if (recordIterator.hasNext) {
val record = recordIterator.next()
val endTxnMarker = EndTransactionMarker.deserialize(record)
val completedTxn = appendEndTxnMarker(endTxnMarker, batch.producerEpoch, batch.baseOffset, record.timestamp)
Some(completedTxn)
appendEndTxnMarker(endTxnMarker, batch.producerEpoch, batch.baseOffset, record.timestamp)
} else {
// An empty control batch means the entire transaction has been cleaned from the log, so no need to append
None
@ -301,18 +304,20 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
}
}
def appendEndTxnMarker(endTxnMarker: EndTransactionMarker,
producerEpoch: Short,
offset: Long,
timestamp: Long): CompletedTxn = {
def appendEndTxnMarker(
endTxnMarker: EndTransactionMarker,
producerEpoch: Short,
offset: Long,
timestamp: Long
): Option[CompletedTxn] = {
checkProducerEpoch(producerEpoch, offset)
checkCoordinatorEpoch(endTxnMarker, offset)
val firstOffset = updatedEntry.currentTxnFirstOffset match {
case Some(txnFirstOffset) => txnFirstOffset
case None =>
transactions += new TxnMetadata(producerId, offset)
offset
// Only emit the `CompletedTxn` for non-empty transactions. A transaction marker
// without any associated data will not have any impact on the last stable offset
// and would not need to be reflected in the transaction index.
val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset =>
CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT)
}
updatedEntry.maybeUpdateProducerEpoch(producerEpoch)
@ -320,7 +325,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
updatedEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch
updatedEntry.lastTimestamp = timestamp
CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT)
completedTxn
}
def toEntry: ProducerStateEntry = updatedEntry
@ -575,9 +580,10 @@ class ProducerStateManager(val topicPartition: TopicPartition,
/**
* The first undecided offset is the earliest transactional message which has not yet been committed
* or aborted.
* or aborted. Unlike [[firstUnstableOffset]], this does not reflect the state of replication (i.e.
* whether a completed transaction marker is beyond the high watermark).
*/
def firstUndecidedOffset: Option[Long] = Option(ongoingTxns.firstEntry).map(_.getValue.firstOffset.messageOffset)
private[log] def firstUndecidedOffset: Option[Long] = Option(ongoingTxns.firstEntry).map(_.getValue.firstOffset.messageOffset)
/**
* Returns the last offset of this map

View File

@ -4411,9 +4411,11 @@ class LogTest {
assertEquals(0L, log.lastStableOffset)
// Try the append a second time. The appended offset in the log should still increase.
assertThrows[KafkaStorageException] {
appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)
}
// Note that the second append does not write to the transaction index because the producer
// state has already been updated and we do not write index entries for empty transactions.
// In the future, we may strengthen the fencing logic so that additional writes to the
// log are not possible after an IO error (see KAFKA-10778).
appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)
assertEquals(12L, log.logEndOffset)
assertEquals(0L, log.lastStableOffset)
@ -4425,7 +4427,7 @@ class LogTest {
val reopenedLog = createLog(logDir, logConfig, lastShutdownClean = false)
assertEquals(12L, reopenedLog.logEndOffset)
assertEquals(2, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size)
assertEquals(1, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size)
reopenedLog.updateHighWatermark(12L)
assertEquals(None, reopenedLog.firstUnstableOffset)
}

View File

@ -22,6 +22,7 @@ import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.StandardOpenOption
import java.util.Collections
import java.util.concurrent.atomic.AtomicInteger
import kafka.server.LogOffsetMetadata
import kafka.utils.TestUtils
@ -178,29 +179,24 @@ class ProducerStateManagerTest {
}
@Test
def testControlRecordBumpsEpoch(): Unit = {
val epoch = 0.toShort
append(stateManager, producerId, epoch, 0, 0L)
def testControlRecordBumpsProducerEpoch(): Unit = {
val producerEpoch = 0.toShort
append(stateManager, producerId, producerEpoch, 0, 0L)
val bumpedEpoch = 1.toShort
val (completedTxn, lastStableOffset) = appendEndTxnMarker(stateManager, producerId, bumpedEpoch, ControlRecordType.ABORT, 1L)
assertEquals(1L, completedTxn.firstOffset)
assertEquals(1L, completedTxn.lastOffset)
assertEquals(2L, lastStableOffset)
assertTrue(completedTxn.isAborted)
assertEquals(producerId, completedTxn.producerId)
val bumpedProducerEpoch = 1.toShort
appendEndTxnMarker(stateManager, producerId, bumpedProducerEpoch, ControlRecordType.ABORT, 1L)
val maybeLastEntry = stateManager.lastEntry(producerId)
assertTrue(maybeLastEntry.isDefined)
val lastEntry = maybeLastEntry.get
assertEquals(bumpedEpoch, lastEntry.producerEpoch)
assertEquals(bumpedProducerEpoch, lastEntry.producerEpoch)
assertEquals(None, lastEntry.currentTxnFirstOffset)
assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.firstSeq)
assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.lastSeq)
// should be able to append with the new epoch if we start at sequence 0
append(stateManager, producerId, bumpedEpoch, 0, 2L)
append(stateManager, producerId, bumpedProducerEpoch, 0, 2L)
assertEquals(Some(0), stateManager.lastEntry(producerId).map(_.firstSeq))
}
@ -220,6 +216,64 @@ class ProducerStateManagerTest {
assertEquals(Some(firstOffsetMetadata), stateManager.firstUnstableOffset)
}
@Test
def testSkipEmptyTransactions(): Unit = {
val producerEpoch = 0.toShort
val coordinatorEpoch = 27
val seq = new AtomicInteger(0)
def appendEndTxn(
recordType: ControlRecordType,
offset: Long,
appendInfo: ProducerAppendInfo
): Option[CompletedTxn] = {
appendInfo.appendEndTxnMarker(new EndTransactionMarker(recordType, coordinatorEpoch),
producerEpoch, offset, time.milliseconds())
}
def appendData(
startOffset: Long,
endOffset: Long,
appendInfo: ProducerAppendInfo
): Unit = {
val count = (endOffset - startOffset).toInt
appendInfo.appendDataBatch(producerEpoch, seq.get(), seq.addAndGet(count), time.milliseconds(),
LogOffsetMetadata(startOffset), endOffset, isTransactional = true)
seq.incrementAndGet()
}
// Start one transaction in a separate append
val firstAppend = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Client)
appendData(16L, 20L, firstAppend)
assertEquals(new TxnMetadata(producerId, 16L), firstAppend.startedTransactions.head)
stateManager.update(firstAppend)
stateManager.onHighWatermarkUpdated(21L)
assertEquals(Some(LogOffsetMetadata(16L)), stateManager.firstUnstableOffset)
// Now do a single append which completes the old transaction, mixes in
// some empty transactions, one non-empty complete transaction, and one
// incomplete transaction
val secondAppend = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Client)
val firstCompletedTxn = appendEndTxn(ControlRecordType.COMMIT, 21, secondAppend)
assertEquals(Some(CompletedTxn(producerId, 16L, 21, isAborted = false)), firstCompletedTxn)
assertEquals(None, appendEndTxn(ControlRecordType.COMMIT, 22, secondAppend))
assertEquals(None, appendEndTxn(ControlRecordType.ABORT, 23, secondAppend))
appendData(24L, 27L, secondAppend)
val secondCompletedTxn = appendEndTxn(ControlRecordType.ABORT, 28L, secondAppend)
assertTrue(secondCompletedTxn.isDefined)
assertEquals(None, appendEndTxn(ControlRecordType.ABORT, 29L, secondAppend))
appendData(30L, 31L, secondAppend)
assertEquals(2, secondAppend.startedTransactions.size)
assertEquals(TxnMetadata(producerId, LogOffsetMetadata(24L)), secondAppend.startedTransactions.head)
assertEquals(TxnMetadata(producerId, LogOffsetMetadata(30L)), secondAppend.startedTransactions.last)
stateManager.update(secondAppend)
stateManager.completeTxn(firstCompletedTxn.get)
stateManager.completeTxn(secondCompletedTxn.get)
stateManager.onHighWatermarkUpdated(32L)
assertEquals(Some(LogOffsetMetadata(30L)), stateManager.firstUnstableOffset)
}
@Test
def testLastStableOffsetCompletedTxn(): Unit = {
val producerEpoch = 0.toShort
@ -333,7 +387,10 @@ class ProducerStateManagerTest {
assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch)
val completedTxn = appendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, 40L, time.milliseconds())
val completedTxnOpt = appendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, 40L, time.milliseconds())
assertTrue(completedTxnOpt.isDefined)
val completedTxn = completedTxnOpt.get
assertEquals(producerId, completedTxn.producerId)
assertEquals(16L, completedTxn.firstOffset)
assertEquals(40L, completedTxn.lastOffset)
@ -821,7 +878,6 @@ class ProducerStateManagerTest {
@Test
def testAppendEmptyControlBatch(): Unit = {
val producerId = 23423L
val producerEpoch = 145.toShort
val baseOffset = 15
val batch: RecordBatch = EasyMock.createMock(classOf[RecordBatch])
@ -830,7 +886,7 @@ class ProducerStateManagerTest {
EasyMock.replay(batch)
// Appending the empty control batch should not throw and a new transaction shouldn't be started
append(stateManager, producerId, producerEpoch, baseOffset, batch, origin = AppendOrigin.Client)
append(stateManager, producerId, baseOffset, batch, origin = AppendOrigin.Client)
assertEquals(None, stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
}
@ -904,15 +960,14 @@ class ProducerStateManagerTest {
controlType: ControlRecordType,
offset: Long,
coordinatorEpoch: Int = 0,
timestamp: Long = time.milliseconds()): (CompletedTxn, Long) = {
timestamp: Long = time.milliseconds()): Option[CompletedTxn] = {
val producerAppendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Coordinator)
val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch)
val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp)
val completedTxnOpt = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp)
mapping.update(producerAppendInfo)
val lastStableOffset = mapping.lastStableOffset(completedTxn)
mapping.completeTxn(completedTxn)
completedTxnOpt.foreach(mapping.completeTxn)
mapping.updateMapEndOffset(offset + 1)
(completedTxn, lastStableOffset)
completedTxnOpt
}
private def append(stateManager: ProducerStateManager,
@ -932,7 +987,6 @@ class ProducerStateManagerTest {
private def append(stateManager: ProducerStateManager,
producerId: Long,
producerEpoch: Short,
offset: Long,
batch: RecordBatch,
origin: AppendOrigin): Unit = {