diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 316164c974c..a19ab9c7a98 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -34,10 +34,12 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InvalidPidMappingException; import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.TransactionAbortableException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.common.errors.UnknownProducerIdException; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -771,6 +773,15 @@ public class TransactionManager { || exception instanceof InvalidPidMappingException) { transitionToFatalError(exception); } else if (isTransactional()) { + // RetriableExceptions from the Sender thread are converted to Abortable errors + // because they indicate that the transaction cannot be completed after all retry attempts. + // This conversion ensures the application layer treats these errors as abortable, + // preventing duplicate message delivery. + if (exception instanceof RetriableException || + exception instanceof InvalidTxnStateException) { + exception = new TransactionAbortableException("Transaction Request was aborted after exhausting retries.", exception); + } + if (needToTriggerEpochBumpFromClient() && !isCompleting()) { clientSideEpochBumpRequired = true; } diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java b/clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java index aa592d552bf..544a5c122b2 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java @@ -17,6 +17,13 @@ package org.apache.kafka.common.errors; public class TransactionAbortableException extends ApiException { + + private static final long serialVersionUID = 1L; + + public TransactionAbortableException(String message, Throwable cause) { + super(message, cause); + } + public TransactionAbortableException(String message) { super(message); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 34392d58690..5e095c6783d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -96,6 +96,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.mockito.InOrder; import java.nio.ByteBuffer; @@ -205,7 +207,7 @@ public class SenderTest { })); return Collections.unmodifiableMap(partitionRecords); } - + @Test public void testSimple() throws Exception { long offset = 0; @@ -3031,8 +3033,62 @@ public class SenderTest { verifyErrorMessage(produceResponse(tp0, 0L, Errors.INVALID_REQUEST, 0, -1, errorMessage), errorMessage); } + @ParameterizedTest + @EnumSource(value = Errors.class, names = {"COORDINATOR_LOAD_IN_PROGRESS", "INVALID_TXN_STATE"}) + public void testTransactionShouldTransitionToAbortableForSenderAPI(Errors error) throws InterruptedException { + ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); + TransactionManager txnManager = new TransactionManager( + logContext, + "testRetriableException", + 60000, + RETRY_BACKOFF_MS, + apiVersions, + false + ); + + // Setup with transaction state and initialize transactions with single retry + setupWithTransactionState(txnManager, false, null, 1); + doInitTransactions(txnManager, producerIdAndEpoch); + + // Begin transaction and add partition + txnManager.beginTransaction(); + txnManager.maybeAddPartition(tp0); + client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE))); + sender.runOnce(); + + // First produce request + appendToAccumulator(tp0); + client.prepareResponse(produceResponse(tp0, -1, error, -1)); + sender.runOnce(); + + // Sleep for retry backoff + time.sleep(RETRY_BACKOFF_MS); + + // Second attempt to process record - PREPARE the response before sending + client.prepareResponse(produceResponse(tp0, -1, error, -1)); + sender.runOnce(); + + // Now transaction should be in abortable state after retry is exhausted + assertTrue(txnManager.hasAbortableError()); + + // Second produce request - should fail with TransactionAbortableException + Future future2 = appendToAccumulator(tp0); + client.prepareResponse(produceResponse(tp0, -1, Errors.NONE, -1)); + // Sender will try to send and fail with TransactionAbortableException instead of COORDINATOR_LOAD_IN_PROGRESS, because we're in abortable state + sender.runOnce(); + assertFutureFailure(future2, TransactionAbortableException.class); + + // Verify transaction API requests also fail with TransactionAbortableException + try { + txnManager.beginCommit(); + fail("Expected beginCommit() to fail with TransactionAbortableException when in abortable error state"); + } catch (KafkaException e) { + assertEquals(TransactionAbortableException.class, e.getCause().getClass()); + } + } + @Test - public void testSenderShouldRetryWithBackoffOnRetriableError() { + public void testSenderShouldRetryWithBackoffOnRetriableError() throws InterruptedException { final long producerId = 343434L; TransactionManager transactionManager = createTransactionManager(); setupWithTransactionState(transactionManager); @@ -3674,6 +3730,10 @@ public class SenderTest { setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, Integer.MAX_VALUE, 0); } + private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool, int retries) { + setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, retries, 0); + } + private void setupWithTransactionState( TransactionManager transactionManager, boolean guaranteeOrder, diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 118656e47f8..4668a91ed04 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -2958,7 +2958,7 @@ public class TransactionManagerTest { "Expected to get a TimeoutException since the queued ProducerBatch should have been expired"); runUntil(commitResult::isCompleted); // the commit shouldn't be completed without being sent since the produce request failed. assertFalse(commitResult.isSuccessful()); // the commit shouldn't succeed since the produce request failed. - assertThrows(TimeoutException.class, commitResult::await); + assertInstanceOf(TimeoutException.class, assertThrows(TransactionAbortableException.class, commitResult::await).getCause()); assertTrue(transactionManager.hasAbortableError()); assertTrue(transactionManager.hasOngoingTransaction());