KAFKA-19176: Update Transactional producer to translate retriable into abortable exceptions (#19522)
CI / build (push) Waiting to run Details

### Problem
- Currently, when a transactional producer encounters retriable errors
(like `COORDINATOR_LOAD_IN_PROGRESS`) and exhausts all retries, finally
returns retriable error to Application Layer.
- Application reties can cause duplicate records. As a fix we are
transitioning all retriable errors  as Abortable Error in transaction
producer path.

- Additionally added InvalidTxnStateException as part of
https://issues.apache.org/jira/browse/KAFKA-19177

### Solution
- Modified the TransactionManager to automatically transition retriable
errors to abortable errors after all retries are exhausted. This ensures
that applications can abort transaction when they encounter
`TransactionAbortableException`

- `RefreshRetriableException` like `CoordinatorNotAvailableException`
will be refreshed internally

[[code](6c26595ce3/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java (L1702-L1705))]
till reties are expired, then it will be treated as retriable errors and
translated to `TransactionAbortableException`

- Similarly for InvalidTxnStateException

### Testing
Added test `testSenderShouldTransitionToAbortableAfterRetriesExhausted`
to verify in sender thread:
- Retriable errors are properly converted to abortable state after
retries
- Transaction state transitions correctly and subsequent operations fail
appropriately with TransactionAbortableException

Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
Kaushik Raina 2025-06-03 22:51:22 +05:30 committed by GitHub
parent 2977cb17d0
commit 8c71ab03b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 81 additions and 3 deletions

View File

@ -34,10 +34,12 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionAbortableException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
@ -771,6 +773,15 @@ public class TransactionManager {
|| exception instanceof InvalidPidMappingException) {
transitionToFatalError(exception);
} else if (isTransactional()) {
// RetriableExceptions from the Sender thread are converted to Abortable errors
// because they indicate that the transaction cannot be completed after all retry attempts.
// This conversion ensures the application layer treats these errors as abortable,
// preventing duplicate message delivery.
if (exception instanceof RetriableException ||
exception instanceof InvalidTxnStateException) {
exception = new TransactionAbortableException("Transaction Request was aborted after exhausting retries.", exception);
}
if (needToTriggerEpochBumpFromClient() && !isCompleting()) {
clientSideEpochBumpRequired = true;
}

View File

@ -17,6 +17,13 @@
package org.apache.kafka.common.errors;
public class TransactionAbortableException extends ApiException {
private static final long serialVersionUID = 1L;
public TransactionAbortableException(String message, Throwable cause) {
super(message, cause);
}
public TransactionAbortableException(String message) {
super(message);
}

View File

@ -96,6 +96,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.InOrder;
import java.nio.ByteBuffer;
@ -205,7 +207,7 @@ public class SenderTest {
}));
return Collections.unmodifiableMap(partitionRecords);
}
@Test
public void testSimple() throws Exception {
long offset = 0;
@ -3031,8 +3033,62 @@ public class SenderTest {
verifyErrorMessage(produceResponse(tp0, 0L, Errors.INVALID_REQUEST, 0, -1, errorMessage), errorMessage);
}
@ParameterizedTest
@EnumSource(value = Errors.class, names = {"COORDINATOR_LOAD_IN_PROGRESS", "INVALID_TXN_STATE"})
public void testTransactionShouldTransitionToAbortableForSenderAPI(Errors error) throws InterruptedException {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TransactionManager txnManager = new TransactionManager(
logContext,
"testRetriableException",
60000,
RETRY_BACKOFF_MS,
apiVersions,
false
);
// Setup with transaction state and initialize transactions with single retry
setupWithTransactionState(txnManager, false, null, 1);
doInitTransactions(txnManager, producerIdAndEpoch);
// Begin transaction and add partition
txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);
client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
sender.runOnce();
// First produce request
appendToAccumulator(tp0);
client.prepareResponse(produceResponse(tp0, -1, error, -1));
sender.runOnce();
// Sleep for retry backoff
time.sleep(RETRY_BACKOFF_MS);
// Second attempt to process record - PREPARE the response before sending
client.prepareResponse(produceResponse(tp0, -1, error, -1));
sender.runOnce();
// Now transaction should be in abortable state after retry is exhausted
assertTrue(txnManager.hasAbortableError());
// Second produce request - should fail with TransactionAbortableException
Future<RecordMetadata> future2 = appendToAccumulator(tp0);
client.prepareResponse(produceResponse(tp0, -1, Errors.NONE, -1));
// Sender will try to send and fail with TransactionAbortableException instead of COORDINATOR_LOAD_IN_PROGRESS, because we're in abortable state
sender.runOnce();
assertFutureFailure(future2, TransactionAbortableException.class);
// Verify transaction API requests also fail with TransactionAbortableException
try {
txnManager.beginCommit();
fail("Expected beginCommit() to fail with TransactionAbortableException when in abortable error state");
} catch (KafkaException e) {
assertEquals(TransactionAbortableException.class, e.getCause().getClass());
}
}
@Test
public void testSenderShouldRetryWithBackoffOnRetriableError() {
public void testSenderShouldRetryWithBackoffOnRetriableError() throws InterruptedException {
final long producerId = 343434L;
TransactionManager transactionManager = createTransactionManager();
setupWithTransactionState(transactionManager);
@ -3674,6 +3730,10 @@ public class SenderTest {
setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, Integer.MAX_VALUE, 0);
}
private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool, int retries) {
setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, retries, 0);
}
private void setupWithTransactionState(
TransactionManager transactionManager,
boolean guaranteeOrder,

View File

@ -2958,7 +2958,7 @@ public class TransactionManagerTest {
"Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
runUntil(commitResult::isCompleted); // the commit shouldn't be completed without being sent since the produce request failed.
assertFalse(commitResult.isSuccessful()); // the commit shouldn't succeed since the produce request failed.
assertThrows(TimeoutException.class, commitResult::await);
assertInstanceOf(TimeoutException.class, assertThrows(TransactionAbortableException.class, commitResult::await).getCause());
assertTrue(transactionManager.hasAbortableError());
assertTrue(transactionManager.hasOngoingTransaction());