KAFKA-18660: Transactions Version 2 doesn't handle epoch overflow correctly (#18730)

Fixed the typo that used the wrong producer ID and epoch when returning so that we handle epoch overflow correctly.

We also had to rearrange the concurrent transaction handling so that we don't self-fence when we start the new transaction with the new producer ID.

I also tested this with a modified version of the code where epoch overflow happens on the first epoch bump (every request has a new producer id)

Reviewers: Artem Livshits <alivshits@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
This commit is contained in:
Justine Olshan 2025-01-30 13:42:10 -08:00 committed by GitHub
parent 6cf54c4dab
commit ccab9eb8b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 97 additions and 19 deletions

View File

@ -408,13 +408,16 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
// generate the new transaction metadata with added partitions
txnMetadata.inLock {
if (txnMetadata.producerId != producerId) {
if (txnMetadata.pendingTransitionInProgress) {
// return a retriable exception to let the client backoff and retry
// This check is performed first so that the pending transition can complete before subsequent checks.
// With TV2, we may be transitioning over a producer epoch overflow, and the producer may be using the
// new producer ID that is still only in pending state.
Left(Errors.CONCURRENT_TRANSACTIONS)
} else if (txnMetadata.producerId != producerId) {
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
} else if (txnMetadata.producerEpoch != producerEpoch) {
Left(Errors.PRODUCER_FENCED)
} else if (txnMetadata.pendingTransitionInProgress) {
// return a retriable exception to let the client backoff and retry
Left(Errors.CONCURRENT_TRANSACTIONS)
} else if (txnMetadata.state == PrepareCommit || txnMetadata.state == PrepareAbort) {
Left(Errors.CONCURRENT_TRANSACTIONS)
} else if (txnMetadata.state == Ongoing && partitions.subsetOf(txnMetadata.topicPartitions)) {
@ -812,10 +815,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
}
}
if (txnMetadata.producerId != producerId && !retryOnOverflow)
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence)
if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence) {
// This check is performed first so that the pending transition can complete before the next checks.
// With TV2, we may be transitioning over a producer epoch overflow, and the producer may be using the
// new producer ID that is still only in pending state.
Left(Errors.CONCURRENT_TRANSACTIONS)
} else if (txnMetadata.producerId != producerId && !retryOnOverflow)
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
else if (!isValidEpoch)
Left(Errors.PRODUCER_FENCED)
else txnMetadata.state match {
@ -940,7 +946,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
case Right((txnMetadata, newPreSendMetadata)) =>
// we can respond to the client immediately and continue to write the txn markers if
// the log append was successful
responseCallback(Errors.NONE, txnMetadata.producerId, txnMetadata.producerEpoch)
responseCallback(Errors.NONE, newPreSendMetadata.producerId, newPreSendMetadata.producerEpoch)
txnMarkerChannelManager.addTxnMarkersToSend(coordinatorEpoch, txnMarkerResult, txnMetadata, newPreSendMetadata)
}

View File

@ -22,7 +22,7 @@ import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException}
import org.apache.kafka.common.errors.{ConcurrentTransactionsException, InvalidProducerEpochException, ProducerFencedException, TimeoutException}
import org.apache.kafka.common.test.api.Flaky
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
@ -617,15 +617,20 @@ class TransactionsTest extends IntegrationTestHarness {
// Wait for the expiration cycle to kick in.
Thread.sleep(600)
try {
// Now that the transaction has expired, the second send should fail with a InvalidProducerEpochException.
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
fail("should have raised a InvalidProducerEpochException since the transaction has expired")
} catch {
case _: InvalidProducerEpochException =>
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
}
TestUtils.waitUntilTrue(() => {
var foundException = false
try {
// Now that the transaction has expired, the second send should fail with a InvalidProducerEpochException. We may see some concurrentTransactionsExceptions.
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
fail("should have raised an error due to concurrent transactions or invalid producer epoch")
} catch {
case _: ConcurrentTransactionsException =>
case _: InvalidProducerEpochException =>
case e: ExecutionException =>
foundException = e.getCause.isInstanceOf[InvalidProducerEpochException]
}
foundException
}, "Never returned the expected InvalidProducerEpochException")
// Verify that the first message was aborted and the second one was never written at all.
val nonTransactionalConsumer = nonTransactionalConsumers.head

View File

@ -182,6 +182,40 @@ class TransactionCoordinatorTest {
assertEquals(Errors.NONE, result.error)
}
@Test
def shouldGenerateNewProducerIdIfEpochsExhaustedV2(): Unit = {
initPidGenericMocks(transactionalId)
val txnMetadata1 = new TransactionMetadata(transactionalId, producerId, producerId, RecordBatch.NO_PRODUCER_ID, (Short.MaxValue - 1).toShort,
(Short.MaxValue - 2).toShort, txnTimeoutMs, Ongoing, mutable.Set.empty, time.milliseconds(), time.milliseconds(), TV_2)
// We start with txnMetadata1 so we can transform the metadata to PrepareCommit.
val txnMetadata2 = new TransactionMetadata(transactionalId, producerId, producerId, RecordBatch.NO_PRODUCER_ID, (Short.MaxValue - 1).toShort,
(Short.MaxValue - 2).toShort, txnTimeoutMs, Ongoing, mutable.Set.empty, time.milliseconds(), time.milliseconds(), TV_2)
val transitMetadata = txnMetadata2.prepareAbortOrCommit(PrepareCommit, TV_2, producerId2, time.milliseconds(), false)
txnMetadata2.completeTransitionTo(transitMetadata)
assertEquals(producerId, txnMetadata2.producerId)
assertEquals(Short.MaxValue, txnMetadata2.producerEpoch)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))))
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2))))
when(transactionManager.appendTransactionToLog(
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(coordinatorEpoch),
any[TxnTransitMetadata],
capturedErrorsCallback.capture(),
any(),
any()
)).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NONE))
coordinator.handleEndTransaction(transactionalId, producerId, (Short.MaxValue - 1).toShort, TransactionResult.COMMIT, TV_2, endTxnCallback)
assertEquals(producerId2, newProducerId)
assertEquals(0, newEpoch)
assertEquals(Errors.NONE, error)
}
@Test
def shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinator(): Unit = {
when(transactionManager.validateTransactionTimeoutMs(anyInt()))
@ -519,7 +553,7 @@ class TransactionCoordinatorTest {
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
val nextProducerEpoch = if (isRetry) producerEpoch - 1 else producerEpoch
coordinator.handleEndTransaction(transactionalId, producerId, nextProducerEpoch.toShort , TransactionResult.ABORT, clientTransactionVersion, endTxnCallback)
coordinator.handleEndTransaction(transactionalId, producerId, nextProducerEpoch.toShort, TransactionResult.ABORT, clientTransactionVersion, endTxnCallback)
if (isRetry) {
assertEquals(Errors.PRODUCER_FENCED, error)
} else {
@ -770,6 +804,39 @@ class TransactionCoordinatorTest {
verify(transactionManager, times(2)).getTransactionState(ArgumentMatchers.eq(transactionalId))
}
@Test
def shouldReturnConcurrentTxnOnAddPartitionsIfEndTxnV2EpochOverflowAndNotComplete(): Unit = {
val prepareWithPending = new TransactionMetadata(transactionalId, producerId, producerId,
producerId2, Short.MaxValue, (Short.MaxValue - 1).toShort, 1, PrepareCommit, collection.mutable.Set.empty[TopicPartition], 0, time.milliseconds(), TV_2)
val txnTransitMetadata = prepareWithPending.prepareComplete(time.milliseconds())
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, prepareWithPending))))
// Return CONCURRENT_TRANSACTIONS while transaction is still completing
coordinator.handleAddPartitionsToTransaction(transactionalId, producerId2, 0, partitions, errorsCallback, TV_2)
assertEquals(Errors.CONCURRENT_TRANSACTIONS, error)
verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId))
prepareWithPending.completeTransitionTo(txnTransitMetadata)
assertEquals(CompleteCommit, prepareWithPending.state)
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, prepareWithPending))))
when(transactionManager.appendTransactionToLog(
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(coordinatorEpoch),
any[TxnTransitMetadata],
capturedErrorsCallback.capture(),
any(),
any())
).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NONE))
coordinator.handleAddPartitionsToTransaction(transactionalId, producerId2, 0, partitions, errorsCallback, TV_2)
assertEquals(Errors.NONE, error)
verify(transactionManager, times(2)).getTransactionState(ArgumentMatchers.eq(transactionalId))
}
@ParameterizedTest
@ValueSource(shorts = Array(0, 2))
def shouldAppendPrepareCommitToLogOnEndTxnWhenStatusIsOngoingAndResultIsCommit(transactionVersion: Short): Unit = {