mirror of https://github.com/apache/kafka.git
KAFKA-18660: Transactions Version 2 doesn't handle epoch overflow correctly (#18730)
Fixed the typo that used the wrong producer ID and epoch when returning so that we handle epoch overflow correctly. We also had to rearrange the concurrent transaction handling so that we don't self-fence when we start the new transaction with the new producer ID. I also tested this with a modified version of the code where epoch overflow happens on the first epoch bump (every request has a new producer id) Reviewers: Artem Livshits <alivshits@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
This commit is contained in:
parent
6cf54c4dab
commit
ccab9eb8b4
|
@ -408,13 +408,16 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
|
|||
|
||||
// generate the new transaction metadata with added partitions
|
||||
txnMetadata.inLock {
|
||||
if (txnMetadata.producerId != producerId) {
|
||||
if (txnMetadata.pendingTransitionInProgress) {
|
||||
// return a retriable exception to let the client backoff and retry
|
||||
// This check is performed first so that the pending transition can complete before subsequent checks.
|
||||
// With TV2, we may be transitioning over a producer epoch overflow, and the producer may be using the
|
||||
// new producer ID that is still only in pending state.
|
||||
Left(Errors.CONCURRENT_TRANSACTIONS)
|
||||
} else if (txnMetadata.producerId != producerId) {
|
||||
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
|
||||
} else if (txnMetadata.producerEpoch != producerEpoch) {
|
||||
Left(Errors.PRODUCER_FENCED)
|
||||
} else if (txnMetadata.pendingTransitionInProgress) {
|
||||
// return a retriable exception to let the client backoff and retry
|
||||
Left(Errors.CONCURRENT_TRANSACTIONS)
|
||||
} else if (txnMetadata.state == PrepareCommit || txnMetadata.state == PrepareAbort) {
|
||||
Left(Errors.CONCURRENT_TRANSACTIONS)
|
||||
} else if (txnMetadata.state == Ongoing && partitions.subsetOf(txnMetadata.topicPartitions)) {
|
||||
|
@ -812,10 +815,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
|
|||
}
|
||||
}
|
||||
|
||||
if (txnMetadata.producerId != producerId && !retryOnOverflow)
|
||||
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
|
||||
else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence)
|
||||
if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence) {
|
||||
// This check is performed first so that the pending transition can complete before the next checks.
|
||||
// With TV2, we may be transitioning over a producer epoch overflow, and the producer may be using the
|
||||
// new producer ID that is still only in pending state.
|
||||
Left(Errors.CONCURRENT_TRANSACTIONS)
|
||||
} else if (txnMetadata.producerId != producerId && !retryOnOverflow)
|
||||
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
|
||||
else if (!isValidEpoch)
|
||||
Left(Errors.PRODUCER_FENCED)
|
||||
else txnMetadata.state match {
|
||||
|
@ -940,7 +946,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
|
|||
case Right((txnMetadata, newPreSendMetadata)) =>
|
||||
// we can respond to the client immediately and continue to write the txn markers if
|
||||
// the log append was successful
|
||||
responseCallback(Errors.NONE, txnMetadata.producerId, txnMetadata.producerEpoch)
|
||||
responseCallback(Errors.NONE, newPreSendMetadata.producerId, newPreSendMetadata.producerEpoch)
|
||||
|
||||
txnMarkerChannelManager.addTxnMarkersToSend(coordinatorEpoch, txnMarkerResult, txnMetadata, newPreSendMetadata)
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ import kafka.utils.{TestInfoUtils, TestUtils}
|
|||
import org.apache.kafka.clients.consumer._
|
||||
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException}
|
||||
import org.apache.kafka.common.errors.{ConcurrentTransactionsException, InvalidProducerEpochException, ProducerFencedException, TimeoutException}
|
||||
import org.apache.kafka.common.test.api.Flaky
|
||||
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
|
||||
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
|
||||
|
@ -617,15 +617,20 @@ class TransactionsTest extends IntegrationTestHarness {
|
|||
// Wait for the expiration cycle to kick in.
|
||||
Thread.sleep(600)
|
||||
|
||||
try {
|
||||
// Now that the transaction has expired, the second send should fail with a InvalidProducerEpochException.
|
||||
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
|
||||
fail("should have raised a InvalidProducerEpochException since the transaction has expired")
|
||||
} catch {
|
||||
case _: InvalidProducerEpochException =>
|
||||
case e: ExecutionException =>
|
||||
assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
|
||||
}
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
var foundException = false
|
||||
try {
|
||||
// Now that the transaction has expired, the second send should fail with a InvalidProducerEpochException. We may see some concurrentTransactionsExceptions.
|
||||
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
|
||||
fail("should have raised an error due to concurrent transactions or invalid producer epoch")
|
||||
} catch {
|
||||
case _: ConcurrentTransactionsException =>
|
||||
case _: InvalidProducerEpochException =>
|
||||
case e: ExecutionException =>
|
||||
foundException = e.getCause.isInstanceOf[InvalidProducerEpochException]
|
||||
}
|
||||
foundException
|
||||
}, "Never returned the expected InvalidProducerEpochException")
|
||||
|
||||
// Verify that the first message was aborted and the second one was never written at all.
|
||||
val nonTransactionalConsumer = nonTransactionalConsumers.head
|
||||
|
|
|
@ -182,6 +182,40 @@ class TransactionCoordinatorTest {
|
|||
assertEquals(Errors.NONE, result.error)
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldGenerateNewProducerIdIfEpochsExhaustedV2(): Unit = {
|
||||
initPidGenericMocks(transactionalId)
|
||||
|
||||
val txnMetadata1 = new TransactionMetadata(transactionalId, producerId, producerId, RecordBatch.NO_PRODUCER_ID, (Short.MaxValue - 1).toShort,
|
||||
(Short.MaxValue - 2).toShort, txnTimeoutMs, Ongoing, mutable.Set.empty, time.milliseconds(), time.milliseconds(), TV_2)
|
||||
// We start with txnMetadata1 so we can transform the metadata to PrepareCommit.
|
||||
val txnMetadata2 = new TransactionMetadata(transactionalId, producerId, producerId, RecordBatch.NO_PRODUCER_ID, (Short.MaxValue - 1).toShort,
|
||||
(Short.MaxValue - 2).toShort, txnTimeoutMs, Ongoing, mutable.Set.empty, time.milliseconds(), time.milliseconds(), TV_2)
|
||||
val transitMetadata = txnMetadata2.prepareAbortOrCommit(PrepareCommit, TV_2, producerId2, time.milliseconds(), false)
|
||||
txnMetadata2.completeTransitionTo(transitMetadata)
|
||||
|
||||
assertEquals(producerId, txnMetadata2.producerId)
|
||||
assertEquals(Short.MaxValue, txnMetadata2.producerEpoch)
|
||||
|
||||
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
|
||||
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))))
|
||||
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2))))
|
||||
|
||||
when(transactionManager.appendTransactionToLog(
|
||||
ArgumentMatchers.eq(transactionalId),
|
||||
ArgumentMatchers.eq(coordinatorEpoch),
|
||||
any[TxnTransitMetadata],
|
||||
capturedErrorsCallback.capture(),
|
||||
any(),
|
||||
any()
|
||||
)).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NONE))
|
||||
|
||||
coordinator.handleEndTransaction(transactionalId, producerId, (Short.MaxValue - 1).toShort, TransactionResult.COMMIT, TV_2, endTxnCallback)
|
||||
assertEquals(producerId2, newProducerId)
|
||||
assertEquals(0, newEpoch)
|
||||
assertEquals(Errors.NONE, error)
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinator(): Unit = {
|
||||
when(transactionManager.validateTransactionTimeoutMs(anyInt()))
|
||||
|
@ -519,7 +553,7 @@ class TransactionCoordinatorTest {
|
|||
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
|
||||
|
||||
val nextProducerEpoch = if (isRetry) producerEpoch - 1 else producerEpoch
|
||||
coordinator.handleEndTransaction(transactionalId, producerId, nextProducerEpoch.toShort , TransactionResult.ABORT, clientTransactionVersion, endTxnCallback)
|
||||
coordinator.handleEndTransaction(transactionalId, producerId, nextProducerEpoch.toShort, TransactionResult.ABORT, clientTransactionVersion, endTxnCallback)
|
||||
if (isRetry) {
|
||||
assertEquals(Errors.PRODUCER_FENCED, error)
|
||||
} else {
|
||||
|
@ -770,6 +804,39 @@ class TransactionCoordinatorTest {
|
|||
verify(transactionManager, times(2)).getTransactionState(ArgumentMatchers.eq(transactionalId))
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldReturnConcurrentTxnOnAddPartitionsIfEndTxnV2EpochOverflowAndNotComplete(): Unit = {
|
||||
val prepareWithPending = new TransactionMetadata(transactionalId, producerId, producerId,
|
||||
producerId2, Short.MaxValue, (Short.MaxValue - 1).toShort, 1, PrepareCommit, collection.mutable.Set.empty[TopicPartition], 0, time.milliseconds(), TV_2)
|
||||
val txnTransitMetadata = prepareWithPending.prepareComplete(time.milliseconds())
|
||||
|
||||
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
|
||||
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, prepareWithPending))))
|
||||
|
||||
// Return CONCURRENT_TRANSACTIONS while transaction is still completing
|
||||
coordinator.handleAddPartitionsToTransaction(transactionalId, producerId2, 0, partitions, errorsCallback, TV_2)
|
||||
assertEquals(Errors.CONCURRENT_TRANSACTIONS, error)
|
||||
verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId))
|
||||
|
||||
prepareWithPending.completeTransitionTo(txnTransitMetadata)
|
||||
assertEquals(CompleteCommit, prepareWithPending.state)
|
||||
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
|
||||
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, prepareWithPending))))
|
||||
when(transactionManager.appendTransactionToLog(
|
||||
ArgumentMatchers.eq(transactionalId),
|
||||
ArgumentMatchers.eq(coordinatorEpoch),
|
||||
any[TxnTransitMetadata],
|
||||
capturedErrorsCallback.capture(),
|
||||
any(),
|
||||
any())
|
||||
).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NONE))
|
||||
|
||||
coordinator.handleAddPartitionsToTransaction(transactionalId, producerId2, 0, partitions, errorsCallback, TV_2)
|
||||
|
||||
assertEquals(Errors.NONE, error)
|
||||
verify(transactionManager, times(2)).getTransactionState(ArgumentMatchers.eq(transactionalId))
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(shorts = Array(0, 2))
|
||||
def shouldAppendPrepareCommitToLogOnEndTxnWhenStatusIsOngoingAndResultIsCommit(transactionVersion: Short): Unit = {
|
||||
|
|
Loading…
Reference in New Issue