From b1ea280ab1012e857d4c8354fc57951f9c88f667 Mon Sep 17 00:00:00 2001 From: Kaushik Raina <103954755+k-raina@users.noreply.github.com> Date: Wed, 4 Jun 2025 05:57:15 +0530 Subject: [PATCH] KAFKA-19250 : txnProducer.abortTransaction() API should not return abortable exception (#19656) ## Problem When an `txnProducer.abortTransaction()`operation encounters a `TRANSACTION_ABORTABLE` error, it currently tries to transition to `ABORTABLE_ERROR` state. This can create an infinite retry loop since: 1. The abort operation fails with `TRANSACTION_ABORTABLE` 2. We transition to `ABORTABLE_ERROR` state 3. The application recieves instance of TransactionAbortableException and it retries the abort 4. The cycle repeats ## Solution For `txnProducer.abortTransaction()`API, convert `TRANSACTION_ABORTABLE` errors to fatal errors (`KafkaException`) during abort operations to ensure clean transaction termination. This prevents retry loops by: 1. Treating abort failures as fatal errors at application layer 2. Ensuring the transaction can be cleanly terminated 3. Providing clear error messages to the application ## Changes - Modified `EndTxnHandler.handleResponse()` to convert `TRANSACTION_ABORTABLE` errors to `KafkaException` during abort operations - Set TransactionManager state to FATAL - Updated test `testAbortableErrorIsConvertedToFatalErrorDuringAbort` to verify this behavior ## Testing - Added test case verifying that abort operations convert `TRANSACTION_ABORTABLE` errors to `KafkaException` - Verified that Commit API with TRANSACTION_ABORTABLE error should set TM to Abortable state - Verified that Abort API with TRANSACTION_ABORTABLE error should convert to Fatal error i.e. KafkaException ## Impact At application layer, this change improves transaction reliability by preventing infinite retry loops during abort operations. Reviewers: Justine Olshan --- .../internals/TransactionManager.java | 7 ++- .../producer/internals/SenderTest.java | 55 +++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index a19ab9c7a98..5d83cbc0b1b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -1738,7 +1738,7 @@ public class TransactionManager { public void handleResponse(AbstractResponse response) { EndTxnResponse endTxnResponse = (EndTxnResponse) response; Errors error = endTxnResponse.error(); - + boolean isAbort = !builder.data.committed(); if (error == Errors.NONE) { // For End Txn version 5+, the broker includes the producerId and producerEpoch in the EndTxnResponse. // For versions lower than 5, the producer Id and epoch are set to -1 by default. @@ -1771,6 +1771,11 @@ public class TransactionManager { fatalError(error.exception()); } else if (error == Errors.UNKNOWN_PRODUCER_ID) { abortableErrorIfPossible(error.exception()); + } else if (isAbort && error.exception() instanceof TransactionAbortableException) { + // When aborting a transaction, we must convert TRANSACTION_ABORTABLE errors to KafkaException + // because if an abort operation itself encounters an abortable error, retrying the abort would create a cycle. + // Instead, we treat this as fatal error at the application layer to ensure the transaction can be cleanly terminated. + fatalError(new KafkaException("Failed to abort transaction", error.exception())); } else if (error == Errors.TRANSACTION_ABORTABLE) { abortableError(error.exception()); } else { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 5e095c6783d..f9f94af1806 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -3236,6 +3236,61 @@ public class SenderTest { txnManager.beginTransaction(); } + + @Test + public void testAbortableErrorIsConvertedToFatalErrorDuringAbort() throws Exception { + + // Initialize and begin transaction + TransactionManager transactionManager = new TransactionManager(logContext, "testAbortableErrorIsConvertedToFatalErrorDuringAbort", 60000, 100, apiVersions, false); + setupWithTransactionState(transactionManager); + doInitTransactions(transactionManager, new ProducerIdAndEpoch(1L, (short) 0)); + transactionManager.beginTransaction(); + + // Add partition and send record + TopicPartition tp = new TopicPartition("test", 0); + addPartitionToTxn(sender, transactionManager, tp); + appendToAccumulator(tp); + + // Send record and get response + sender.runOnce(); + sendIdempotentProducerResponse(0, tp, Errors.NONE, 0); + sender.runOnce(); + + // Commit API with TRANSACTION_ABORTABLE error should set TM to Abortable state + client.prepareResponse(new EndTxnResponse(new EndTxnResponseData() + .setErrorCode(Errors.TRANSACTION_ABORTABLE.code()))); + + // Attempt to commit transaction + TransactionalRequestResult commitResult = transactionManager.beginCommit(); + sender.runOnce(); + try { + commitResult.await(1000, TimeUnit.MILLISECONDS); + fail("Expected abortable error to be thrown for commit"); + } catch (KafkaException e) { + assertTrue(transactionManager.hasAbortableError()); + assertEquals(commitResult.error().getClass(), TransactionAbortableException.class); + } + + // Abort API with TRANSACTION_ABORTABLE error should convert to Fatal error i.e. KafkaException + client.prepareResponse(new EndTxnResponse(new EndTxnResponseData() + .setErrorCode(Errors.TRANSACTION_ABORTABLE.code()))); + + // Attempt to abort transaction + TransactionalRequestResult abortResult = transactionManager.beginAbort(); + sender.runOnce(); + + // Verify the error is converted to KafkaException (not TransactionAbortableException) + try { + abortResult.await(1000, TimeUnit.MILLISECONDS); + fail("Expected KafkaException to be thrown"); + } catch (KafkaException e) { + // Verify TM is in FATAL_ERROR state + assertTrue(transactionManager.hasFatalError()); + assertFalse(e instanceof TransactionAbortableException); + assertEquals(abortResult.error().getClass(), KafkaException.class); + } + } + @Test public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws Exception { Metrics m = new Metrics();