mirror of https://github.com/apache/kafka.git
KAFKA-19367: Follow up bug fix (#19976)
This is a follow up to [https://github.com/apache/kafka/pull/19910](url) `The coordinator failed to write an epoch fence transition for producer jt142 to the transaction log with error COORDINATOR_NOT_AVAILABLE. The epoch was increased to 2 but not returned to the client (kafka.coordinator.transaction.TransactionCoordinator)` -- as we don't bump the epoch with this change, we should also update the message to not say "increased" and remove the **epochAndMetadata.transactionMetadata.hasFailedEpochFence = true** line In the test, the expected behavior is: 1) First append transaction to the log fails with COORDINATOR_NOT_AVAILABLE (epoch 1) 2) We try init_pid again, this time the SINGLE epoch bump succeeds, and the following things happen simultaneously (epoch 2) -> Transition to COMPLETE_ABORT -> Return CONCURRENT_TRANSACTION error to the client 3) The client retries, and there is another epoch bump; state transitions to EMPTY (epoch 3) Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits <alivshits@confluent.io>
This commit is contained in:
parent
88eced0c0f
commit
e9871efef8
|
@ -977,10 +977,10 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
|
||||||
|
|
||||||
case Some(epochAndMetadata) =>
|
case Some(epochAndMetadata) =>
|
||||||
if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
|
if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
|
||||||
// This was attempted epoch fence that failed, so mark this state on the metadata
|
// For TV2, we allow re-bumping the epoch on retry, since we don't complete the epoch bump.
|
||||||
epochAndMetadata.transactionMetadata.hasFailedEpochFence = true
|
// Therefore, we don't set hasFailedEpochFence = true.
|
||||||
warn(s"The coordinator failed to write an epoch fence transition for producer $transactionalId to the transaction log " +
|
warn(s"The coordinator failed to write an epoch fence transition for producer $transactionalId to the transaction log " +
|
||||||
s"with error $error. The epoch was increased to ${newMetadata.producerEpoch} but not returned to the client")
|
s"with error $error")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
|
||||||
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt}
|
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt}
|
||||||
import org.mockito.Mockito._
|
import org.mockito.Mockito._
|
||||||
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
|
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
|
||||||
|
import org.mockito.Mockito.doAnswer
|
||||||
|
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
|
@ -1978,4 +1979,153 @@ class TransactionCoordinatorTest {
|
||||||
else
|
else
|
||||||
producerEpoch
|
producerEpoch
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testTV2AllowsEpochReBumpingAfterFailedWrite(): Unit = {
|
||||||
|
// Test the complete TV2 flow: failed write → epoch fence → abort → retry with epoch bump
|
||||||
|
// This demonstrates that TV2 allows epoch re-bumping after failed writes (unlike TV1)
|
||||||
|
val producerEpoch = 1.toShort
|
||||||
|
val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerId, RecordBatch.NO_PRODUCER_ID,
|
||||||
|
producerEpoch, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, TransactionState.ONGOING, partitions, time.milliseconds(), time.milliseconds(), TV_2)
|
||||||
|
|
||||||
|
when(transactionManager.validateTransactionTimeoutMs(anyBoolean(), anyInt()))
|
||||||
|
.thenReturn(true)
|
||||||
|
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
|
||||||
|
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
|
||||||
|
when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
|
||||||
|
|
||||||
|
// First attempt fails with COORDINATOR_NOT_AVAILABLE
|
||||||
|
when(transactionManager.appendTransactionToLog(
|
||||||
|
ArgumentMatchers.eq(transactionalId),
|
||||||
|
ArgumentMatchers.eq(coordinatorEpoch),
|
||||||
|
any(),
|
||||||
|
any(),
|
||||||
|
any(),
|
||||||
|
any()
|
||||||
|
)).thenAnswer(invocation => {
|
||||||
|
val callback = invocation.getArgument[Errors => Unit](3)
|
||||||
|
|
||||||
|
// Simulate the real TransactionStateManager behavior: reset pendingState on failure
|
||||||
|
// since handleInitProducerId doesn't provide a custom retryOnError function
|
||||||
|
txnMetadata.pendingState = None
|
||||||
|
|
||||||
|
// For TV2, hasFailedEpochFence is NOT set to true, allowing epoch bumps on retry
|
||||||
|
// The epoch remains at its original value (1) since completeTransitionTo was never called
|
||||||
|
|
||||||
|
callback.apply(Errors.COORDINATOR_NOT_AVAILABLE)
|
||||||
|
})
|
||||||
|
|
||||||
|
coordinator.handleInitProducerId(
|
||||||
|
transactionalId,
|
||||||
|
txnTimeoutMs,
|
||||||
|
enableTwoPCFlag = false,
|
||||||
|
keepPreparedTxn = false,
|
||||||
|
None,
|
||||||
|
initProducerIdMockCallback
|
||||||
|
)
|
||||||
|
assertEquals(InitProducerIdResult(-1, -1, Errors.COORDINATOR_NOT_AVAILABLE), result)
|
||||||
|
|
||||||
|
// After the first failed attempt, the state should be:
|
||||||
|
// - hasFailedEpochFence = false (NOT set for TV2)
|
||||||
|
// - pendingState = None (reset by TransactionStateManager)
|
||||||
|
// - producerEpoch = 1 (unchanged since completeTransitionTo was never called)
|
||||||
|
// - transaction still ONGOING
|
||||||
|
|
||||||
|
// Second attempt: Should abort the ongoing transaction
|
||||||
|
reset(transactionManager)
|
||||||
|
when(transactionManager.validateTransactionTimeoutMs(anyBoolean(), anyInt()))
|
||||||
|
.thenReturn(true)
|
||||||
|
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
|
||||||
|
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
|
||||||
|
when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
|
||||||
|
|
||||||
|
// Mock the appendTransactionToLog to succeed for the endTransaction call
|
||||||
|
when(transactionManager.appendTransactionToLog(
|
||||||
|
ArgumentMatchers.eq(transactionalId),
|
||||||
|
ArgumentMatchers.eq(coordinatorEpoch),
|
||||||
|
any(),
|
||||||
|
any(),
|
||||||
|
any(),
|
||||||
|
any()
|
||||||
|
)).thenAnswer(invocation => {
|
||||||
|
val newMetadata = invocation.getArgument[TxnTransitMetadata](2)
|
||||||
|
val callback = invocation.getArgument[Errors => Unit](3)
|
||||||
|
|
||||||
|
// Complete the transition and call the callback with success
|
||||||
|
txnMetadata.completeTransitionTo(newMetadata)
|
||||||
|
callback.apply(Errors.NONE)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Mock the transactionMarkerChannelManager to simulate the second write (PREPARE_ABORT -> COMPLETE_ABORT)
|
||||||
|
doAnswer(invocation => {
|
||||||
|
val newMetadata = invocation.getArgument[TxnTransitMetadata](3)
|
||||||
|
// Simulate the completion of transaction markers and the second write
|
||||||
|
// This would normally happen asynchronously after markers are sent
|
||||||
|
txnMetadata.completeTransitionTo(newMetadata) // This transitions to COMPLETE_ABORT
|
||||||
|
txnMetadata.pendingState = None
|
||||||
|
|
||||||
|
null
|
||||||
|
}).when(transactionMarkerChannelManager).addTxnMarkersToSend(
|
||||||
|
ArgumentMatchers.eq(coordinatorEpoch),
|
||||||
|
ArgumentMatchers.eq(TransactionResult.ABORT),
|
||||||
|
ArgumentMatchers.eq(txnMetadata),
|
||||||
|
any()
|
||||||
|
)
|
||||||
|
|
||||||
|
coordinator.handleInitProducerId(
|
||||||
|
transactionalId,
|
||||||
|
txnTimeoutMs,
|
||||||
|
enableTwoPCFlag = false,
|
||||||
|
keepPreparedTxn = false,
|
||||||
|
None,
|
||||||
|
initProducerIdMockCallback
|
||||||
|
)
|
||||||
|
|
||||||
|
// The second attempt should return CONCURRENT_TRANSACTIONS (this is intentional)
|
||||||
|
assertEquals(InitProducerIdResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
|
||||||
|
|
||||||
|
// The transactionMarkerChannelManager mock should have completed the transition to COMPLETE_ABORT
|
||||||
|
// Verify that hasFailedEpochFence was never set to true for TV2, allowing future epoch bumps
|
||||||
|
assertFalse(txnMetadata.hasFailedEpochFence)
|
||||||
|
|
||||||
|
// Third attempt: Client retries after CONCURRENT_TRANSACTIONS
|
||||||
|
reset(transactionManager)
|
||||||
|
when(transactionManager.validateTransactionTimeoutMs(anyBoolean(), anyInt()))
|
||||||
|
.thenReturn(true)
|
||||||
|
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
|
||||||
|
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
|
||||||
|
when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
|
||||||
|
|
||||||
|
when(transactionManager.appendTransactionToLog(
|
||||||
|
ArgumentMatchers.eq(transactionalId),
|
||||||
|
ArgumentMatchers.eq(coordinatorEpoch),
|
||||||
|
any(),
|
||||||
|
any(),
|
||||||
|
any(),
|
||||||
|
any()
|
||||||
|
)).thenAnswer(invocation => {
|
||||||
|
val newMetadata = invocation.getArgument[TxnTransitMetadata](2)
|
||||||
|
val callback = invocation.getArgument[Errors => Unit](3)
|
||||||
|
|
||||||
|
// Complete the transition and call the callback with success
|
||||||
|
txnMetadata.completeTransitionTo(newMetadata)
|
||||||
|
callback.apply(Errors.NONE)
|
||||||
|
})
|
||||||
|
|
||||||
|
coordinator.handleInitProducerId(
|
||||||
|
transactionalId,
|
||||||
|
txnTimeoutMs,
|
||||||
|
enableTwoPCFlag = false,
|
||||||
|
keepPreparedTxn = false,
|
||||||
|
None,
|
||||||
|
initProducerIdMockCallback
|
||||||
|
)
|
||||||
|
|
||||||
|
// The third attempt should succeed with epoch 3 (2 + 1)
|
||||||
|
// This demonstrates that TV2 allows epoch re-bumping after failed writes
|
||||||
|
assertEquals(InitProducerIdResult(producerId, 3.toShort, Errors.NONE), result)
|
||||||
|
|
||||||
|
// Final verification that hasFailedEpochFence was never set to true for TV2
|
||||||
|
assertFalse(txnMetadata.hasFailedEpochFence)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue