diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index a19ab9c7a98..5d83cbc0b1b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -1738,7 +1738,7 @@ public class TransactionManager { public void handleResponse(AbstractResponse response) { EndTxnResponse endTxnResponse = (EndTxnResponse) response; Errors error = endTxnResponse.error(); - + boolean isAbort = !builder.data.committed(); if (error == Errors.NONE) { // For End Txn version 5+, the broker includes the producerId and producerEpoch in the EndTxnResponse. // For versions lower than 5, the producer Id and epoch are set to -1 by default. @@ -1771,6 +1771,11 @@ public class TransactionManager { fatalError(error.exception()); } else if (error == Errors.UNKNOWN_PRODUCER_ID) { abortableErrorIfPossible(error.exception()); + } else if (isAbort && error.exception() instanceof TransactionAbortableException) { + // When aborting a transaction, we must convert TRANSACTION_ABORTABLE errors to KafkaException + // because if an abort operation itself encounters an abortable error, retrying the abort would create a cycle. + // Instead, we treat this as fatal error at the application layer to ensure the transaction can be cleanly terminated. + fatalError(new KafkaException("Failed to abort transaction", error.exception())); } else if (error == Errors.TRANSACTION_ABORTABLE) { abortableError(error.exception()); } else { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 5e095c6783d..f9f94af1806 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -3236,6 +3236,61 @@ public class SenderTest { txnManager.beginTransaction(); } + + @Test + public void testAbortableErrorIsConvertedToFatalErrorDuringAbort() throws Exception { + + // Initialize and begin transaction + TransactionManager transactionManager = new TransactionManager(logContext, "testAbortableErrorIsConvertedToFatalErrorDuringAbort", 60000, 100, apiVersions, false); + setupWithTransactionState(transactionManager); + doInitTransactions(transactionManager, new ProducerIdAndEpoch(1L, (short) 0)); + transactionManager.beginTransaction(); + + // Add partition and send record + TopicPartition tp = new TopicPartition("test", 0); + addPartitionToTxn(sender, transactionManager, tp); + appendToAccumulator(tp); + + // Send record and get response + sender.runOnce(); + sendIdempotentProducerResponse(0, tp, Errors.NONE, 0); + sender.runOnce(); + + // Commit API with TRANSACTION_ABORTABLE error should set TM to Abortable state + client.prepareResponse(new EndTxnResponse(new EndTxnResponseData() + .setErrorCode(Errors.TRANSACTION_ABORTABLE.code()))); + + // Attempt to commit transaction + TransactionalRequestResult commitResult = transactionManager.beginCommit(); + sender.runOnce(); + try { + commitResult.await(1000, TimeUnit.MILLISECONDS); + fail("Expected abortable error to be thrown for commit"); + } catch (KafkaException e) { + assertTrue(transactionManager.hasAbortableError()); + assertEquals(commitResult.error().getClass(), TransactionAbortableException.class); + } + + // Abort API with TRANSACTION_ABORTABLE error should convert to Fatal error i.e. KafkaException + client.prepareResponse(new EndTxnResponse(new EndTxnResponseData() + .setErrorCode(Errors.TRANSACTION_ABORTABLE.code()))); + + // Attempt to abort transaction + TransactionalRequestResult abortResult = transactionManager.beginAbort(); + sender.runOnce(); + + // Verify the error is converted to KafkaException (not TransactionAbortableException) + try { + abortResult.await(1000, TimeUnit.MILLISECONDS); + fail("Expected KafkaException to be thrown"); + } catch (KafkaException e) { + // Verify TM is in FATAL_ERROR state + assertTrue(transactionManager.hasFatalError()); + assertFalse(e instanceof TransactionAbortableException); + assertEquals(abortResult.error().getClass(), KafkaException.class); + } + } + @Test public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws Exception { Metrics m = new Metrics();