KAFKA-19250 : txnProducer.abortTransaction() API should not return abortable exception (#19656)
CI / build (push) Waiting to run Details

## Problem
When an `txnProducer.abortTransaction()`operation encounters a
`TRANSACTION_ABORTABLE` error, it currently tries to transition to
`ABORTABLE_ERROR` state. This can create an infinite retry loop since:
1. The abort operation fails with `TRANSACTION_ABORTABLE`
2. We transition to `ABORTABLE_ERROR` state
3. The application recieves instance of TransactionAbortableException
and it retries the abort
4. The cycle repeats

## Solution
For `txnProducer.abortTransaction()`API,  convert
`TRANSACTION_ABORTABLE` errors to fatal errors (`KafkaException`) during
abort operations to ensure clean transaction termination. This prevents
retry loops by:
1. Treating abort failures as fatal errors at application layer
2. Ensuring the transaction can be cleanly terminated
3. Providing clear error messages to the application

## Changes
- Modified `EndTxnHandler.handleResponse()` to convert
`TRANSACTION_ABORTABLE` errors to `KafkaException` during abort
operations
- Set TransactionManager state to FATAL
- Updated test `testAbortableErrorIsConvertedToFatalErrorDuringAbort` to
verify this behavior

## Testing
- Added test case verifying that abort operations convert
`TRANSACTION_ABORTABLE` errors to `KafkaException`
    - Verified that Commit API with TRANSACTION_ABORTABLE error should
set TM to Abortable state
    - Verified that Abort API with TRANSACTION_ABORTABLE error should
convert to Fatal error i.e. KafkaException

## Impact
At application layer, this change improves transaction reliability by
preventing infinite retry loops during abort operations.

Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
Kaushik Raina 2025-06-04 05:57:15 +05:30 committed by GitHub
parent 8c71ab03b5
commit b1ea280ab1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 61 additions and 1 deletions

View File

@ -1738,7 +1738,7 @@ public class TransactionManager {
public void handleResponse(AbstractResponse response) { public void handleResponse(AbstractResponse response) {
EndTxnResponse endTxnResponse = (EndTxnResponse) response; EndTxnResponse endTxnResponse = (EndTxnResponse) response;
Errors error = endTxnResponse.error(); Errors error = endTxnResponse.error();
boolean isAbort = !builder.data.committed();
if (error == Errors.NONE) { if (error == Errors.NONE) {
// For End Txn version 5+, the broker includes the producerId and producerEpoch in the EndTxnResponse. // For End Txn version 5+, the broker includes the producerId and producerEpoch in the EndTxnResponse.
// For versions lower than 5, the producer Id and epoch are set to -1 by default. // For versions lower than 5, the producer Id and epoch are set to -1 by default.
@ -1771,6 +1771,11 @@ public class TransactionManager {
fatalError(error.exception()); fatalError(error.exception());
} else if (error == Errors.UNKNOWN_PRODUCER_ID) { } else if (error == Errors.UNKNOWN_PRODUCER_ID) {
abortableErrorIfPossible(error.exception()); abortableErrorIfPossible(error.exception());
} else if (isAbort && error.exception() instanceof TransactionAbortableException) {
// When aborting a transaction, we must convert TRANSACTION_ABORTABLE errors to KafkaException
// because if an abort operation itself encounters an abortable error, retrying the abort would create a cycle.
// Instead, we treat this as fatal error at the application layer to ensure the transaction can be cleanly terminated.
fatalError(new KafkaException("Failed to abort transaction", error.exception()));
} else if (error == Errors.TRANSACTION_ABORTABLE) { } else if (error == Errors.TRANSACTION_ABORTABLE) {
abortableError(error.exception()); abortableError(error.exception());
} else { } else {

View File

@ -3236,6 +3236,61 @@ public class SenderTest {
txnManager.beginTransaction(); txnManager.beginTransaction();
} }
@Test
public void testAbortableErrorIsConvertedToFatalErrorDuringAbort() throws Exception {
// Initialize and begin transaction
TransactionManager transactionManager = new TransactionManager(logContext, "testAbortableErrorIsConvertedToFatalErrorDuringAbort", 60000, 100, apiVersions, false);
setupWithTransactionState(transactionManager);
doInitTransactions(transactionManager, new ProducerIdAndEpoch(1L, (short) 0));
transactionManager.beginTransaction();
// Add partition and send record
TopicPartition tp = new TopicPartition("test", 0);
addPartitionToTxn(sender, transactionManager, tp);
appendToAccumulator(tp);
// Send record and get response
sender.runOnce();
sendIdempotentProducerResponse(0, tp, Errors.NONE, 0);
sender.runOnce();
// Commit API with TRANSACTION_ABORTABLE error should set TM to Abortable state
client.prepareResponse(new EndTxnResponse(new EndTxnResponseData()
.setErrorCode(Errors.TRANSACTION_ABORTABLE.code())));
// Attempt to commit transaction
TransactionalRequestResult commitResult = transactionManager.beginCommit();
sender.runOnce();
try {
commitResult.await(1000, TimeUnit.MILLISECONDS);
fail("Expected abortable error to be thrown for commit");
} catch (KafkaException e) {
assertTrue(transactionManager.hasAbortableError());
assertEquals(commitResult.error().getClass(), TransactionAbortableException.class);
}
// Abort API with TRANSACTION_ABORTABLE error should convert to Fatal error i.e. KafkaException
client.prepareResponse(new EndTxnResponse(new EndTxnResponseData()
.setErrorCode(Errors.TRANSACTION_ABORTABLE.code())));
// Attempt to abort transaction
TransactionalRequestResult abortResult = transactionManager.beginAbort();
sender.runOnce();
// Verify the error is converted to KafkaException (not TransactionAbortableException)
try {
abortResult.await(1000, TimeUnit.MILLISECONDS);
fail("Expected KafkaException to be thrown");
} catch (KafkaException e) {
// Verify TM is in FATAL_ERROR state
assertTrue(transactionManager.hasFatalError());
assertFalse(e instanceof TransactionAbortableException);
assertEquals(abortResult.error().getClass(), KafkaException.class);
}
}
@Test @Test
public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws Exception { public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws Exception {
Metrics m = new Metrics(); Metrics m = new Metrics();