Cherry Pick KAFKA-19367 to 4.0 (#19958)
CI / build (push) Has been cancelled Details

[0b2e410d61](url)
Bug fix in 4.0
**Conflicts:** 

- The Transaction Coordinator had some conflicts, mainly with the
transaction states. Ex: ongoing in 4.0 is TransactionState.ONGOING in
4.1.
- The TransactionCoordinatorTest file had conflicts w.r.t the 2PC
changes from KIP-939 in 4.1 and the above mentioned state changes

Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits
<alivshits@confluent.io>
This commit is contained in:
Ritika Reddy 2025-06-14 11:40:00 -07:00 committed by GitHub
parent 254c1fa519
commit c6b44b5d66
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 136 additions and 4 deletions

View File

@ -802,11 +802,9 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
} }
if (nextState == PrepareAbort && isEpochFence) { if (nextState == PrepareAbort && isEpochFence) {
// We should clear the pending state to make way for the transition to PrepareAbort and also bump // We should clear the pending state to make way for the transition to PrepareAbort
// the epoch in the transaction metadata we are about to append.
txnMetadata.pendingState = None txnMetadata.pendingState = None
txnMetadata.producerEpoch = producerEpoch // For TV2+, don't manually set the epoch - let prepareAbortOrCommit handle it naturally.
txnMetadata.lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH
} }
nextProducerIdOrErrors.flatMap { nextProducerIdOrErrors.flatMap {

View File

@ -1165,6 +1165,140 @@ class TransactionCoordinatorTest {
any()) any())
} }
@Test
def shouldNotCauseEpochOverflowWhenInitPidDuringOngoingTxnV2(): Unit = {
// When InitProducerId is called with an ongoing transaction at epoch 32766 (Short.MaxValue - 1),
// it should not cause an epoch overflow by incrementing twice.
// The only true increment happens in prepareAbortOrCommit
val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerId, RecordBatch.NO_PRODUCER_ID,
(Short.MaxValue - 1).toShort, (Short.MaxValue - 2).toShort, txnTimeoutMs, Ongoing, partitions, time.milliseconds(), time.milliseconds(), TV_2)
when(transactionManager.validateTransactionTimeoutMs(anyInt()))
.thenReturn(true)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
// Capture the transition metadata to verify epoch increments
val capturedTxnTransitMetadata: ArgumentCaptor[TxnTransitMetadata] = ArgumentCaptor.forClass(classOf[TxnTransitMetadata])
when(transactionManager.appendTransactionToLog(
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(coordinatorEpoch),
capturedTxnTransitMetadata.capture(),
capturedErrorsCallback.capture(),
any(),
any())
).thenAnswer(invocation => {
val transitMetadata = invocation.getArgument[TxnTransitMetadata](2)
// Simulate the metadata update that would happen in the real appendTransactionToLog
txnMetadata.completeTransitionTo(transitMetadata)
capturedErrorsCallback.getValue.apply(Errors.NONE)
})
// Handle InitProducerId with ongoing transaction at epoch 32766
coordinator.handleInitProducerId(
transactionalId,
txnTimeoutMs,
None,
initProducerIdMockCallback
)
// Verify that the epoch did not overflow (should be Short.MaxValue = 32767, not negative)
assertEquals(Short.MaxValue, txnMetadata.producerEpoch)
assertEquals(PrepareAbort, txnMetadata.state)
verify(transactionManager).validateTransactionTimeoutMs(anyInt())
verify(transactionManager, times(3)).getTransactionState(ArgumentMatchers.eq(transactionalId))
verify(transactionManager).appendTransactionToLog(
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(coordinatorEpoch),
any[TxnTransitMetadata],
any(),
any(),
any())
}
@Test
def shouldHandleTimeoutAtEpochOverflowBoundaryCorrectlyTV2(): Unit = {
// Test the scenario where we have an ongoing transaction at epoch 32766 (Short.MaxValue - 1)
// and the producer crashes/times out. This test verifies that the timeout handling
// correctly manages the epoch overflow scenario without causing failures.
val epochAtMaxBoundary = (Short.MaxValue - 1).toShort // 32766
val now = time.milliseconds()
// Create transaction metadata at the epoch boundary that would cause overflow IFF double-incremented
val txnMetadata = new TransactionMetadata(
transactionalId = transactionalId,
producerId = producerId,
previousProducerId = RecordBatch.NO_PRODUCER_ID,
nextProducerId = RecordBatch.NO_PRODUCER_ID,
producerEpoch = epochAtMaxBoundary,
lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
txnTimeoutMs = txnTimeoutMs,
state = Ongoing,
topicPartitions = partitions,
txnStartTimestamp = now,
txnLastUpdateTimestamp = now,
clientTransactionVersion = TV_2
)
assertTrue(txnMetadata.isProducerEpochExhausted)
// Mock the transaction manager to return our test transaction as timed out
when(transactionManager.timedOutTransactions())
.thenReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId, producerId, epochAtMaxBoundary)))
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
// Mock the append operation to simulate successful write and update the metadata
when(transactionManager.appendTransactionToLog(
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(coordinatorEpoch),
any[TxnTransitMetadata],
capturedErrorsCallback.capture(),
any(),
any())
).thenAnswer(invocation => {
val transitMetadata = invocation.getArgument[TxnTransitMetadata](2)
// Simulate the metadata update that would happen in the real appendTransactionToLog
txnMetadata.completeTransitionTo(transitMetadata)
capturedErrorsCallback.getValue.apply(Errors.NONE)
})
// Track the actual behavior
var callbackInvoked = false
var resultError: Errors = null
var resultProducerId: Long = -1
var resultEpoch: Short = -1
def checkOnEndTransactionComplete(txnIdAndPidEpoch: TransactionalIdAndProducerIdEpoch)
(error: Errors, newProducerId: Long, newProducerEpoch: Short): Unit = {
callbackInvoked = true
resultError = error
resultProducerId = newProducerId
resultEpoch = newProducerEpoch
}
// Execute the timeout abort process
coordinator.abortTimedOutTransactions(checkOnEndTransactionComplete)
assertTrue(callbackInvoked, "Callback should have been invoked")
assertEquals(Errors.NONE, resultError, "Expected no errors in the callback")
assertEquals(producerId, resultProducerId, "Expected producer ID to match")
assertEquals(Short.MaxValue, resultEpoch, "Expected producer epoch to be Short.MaxValue (32767) single epoch bump")
// Verify the transaction metadata was correctly updated to the final epoch
assertEquals(Short.MaxValue, txnMetadata.producerEpoch,
s"Expected transaction metadata producer epoch to be ${Short.MaxValue} " +
s"after timeout handling, but was ${txnMetadata.producerEpoch}"
)
// Verify the basic flow was attempted
verify(transactionManager).timedOutTransactions()
verify(transactionManager, atLeast(1)).getTransactionState(ArgumentMatchers.eq(transactionalId))
}
@Test @Test
def testInitProducerIdWithNoLastProducerData(): Unit = { def testInitProducerIdWithNoLastProducerData(): Unit = {
// If the metadata doesn't include the previous producer data (for example, if it was written to the log by a broker // If the metadata doesn't include the previous producer data (for example, if it was written to the log by a broker