From 7f02c263a6e2490dca5632e0389e312dc14dd593 Mon Sep 17 00:00:00 2001 From: Ritika Reddy <98577846+rreddy-22@users.noreply.github.com> Date: Wed, 14 May 2025 15:11:39 -0700 Subject: [PATCH] KAFKA-19082:[3/4] Add prepare txn method (KIP-939) (#19539) This patch belongs to the client-side changes required to enable 2PC as a part of KIP-939. New method is added to KafkaProducer: public PreparedTxnState prepareTransaction() This would flush all the pending messages and transition the producer into a mode where only .commitTransaction, .abortTransaction, or .completeTransaction could be called (calling other methods, e.g. .send , in that mode would result in IllegalStateException being thrown). If the call is successful (all messages successfully got flushed to all partitions) the transaction is prepared. If the 2PC is not enabled, we return the INVALID_TXN_STATE error. A new state is added to the TransactionManager called PREPARING_TRANSACTION. There are two situations where we would move into this state: 1) When prepareTransaction() is called during an ongoing transaction with 2PC enabled 2) When initTransaction(true) is called after a client failure (keepPrepared = true) Reviewers: Artem Livshits , Justine Olshan --- .../kafka/clients/producer/KafkaProducer.java | 66 +++++++ .../kafka/clients/producer/MockProducer.java | 12 ++ .../kafka/clients/producer/Producer.java | 5 + .../internals/KafkaProducerMetrics.java | 11 ++ .../internals/TransactionManager.java | 53 +++++- .../clients/producer/KafkaProducerTest.java | 162 ++++++++++++++++++ .../internals/TransactionManagerTest.java | 128 ++++++++++---- 7 files changed, 400 insertions(+), 37 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 0fc237d70c8..b44b358dede 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -50,11 +50,13 @@ import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.InvalidTxnStateException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -665,6 +667,7 @@ public class KafkaProducer implements Producer { public void initTransactions(boolean keepPreparedTxn) { throwIfNoTransactionManager(); throwIfProducerClosed(); + throwIfInPreparedState(); long now = time.nanoseconds(); TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn); sender.wakeup(); @@ -691,6 +694,7 @@ public class KafkaProducer implements Producer { public void beginTransaction() throws ProducerFencedException { throwIfNoTransactionManager(); throwIfProducerClosed(); + throwIfInPreparedState(); long now = time.nanoseconds(); transactionManager.beginTransaction(); producerMetrics.recordBeginTxn(time.nanoseconds() - now); @@ -750,6 +754,7 @@ public class KafkaProducer implements Producer { throwIfInvalidGroupMetadata(groupMetadata); throwIfNoTransactionManager(); throwIfProducerClosed(); + throwIfInPreparedState(); if (!offsets.isEmpty()) { long start = time.nanoseconds(); @@ -760,6 +765,48 @@ public class KafkaProducer implements Producer { } } + /** + * Prepares the current transaction for a two-phase commit. This method will flush all pending messages + * and transition the producer into a mode where only {@link #commitTransaction()}, {@link #abortTransaction()}, + * or completeTransaction(PreparedTxnState) may be called. + *

+ * This method is used as part of a two-phase commit protocol: + *

    + *
  1. Prepare the transaction by calling this method. This returns a {@link PreparedTxnState} if successful.
  2. + *
  3. Make any external system changes that need to be atomic with this transaction.
  4. + *
  5. Complete the transaction by calling {@link #commitTransaction()}, {@link #abortTransaction()} or + * completeTransaction(PreparedTxnState).
  6. + *
+ * + * @return the prepared transaction state to use when completing the transaction + * + * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started yet. + * @throws InvalidTxnStateException if the producer is not in a state where preparing + * a transaction is possible or 2PC is not enabled. + * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active + * @throws UnsupportedVersionException fatal error indicating the broker + * does not support transactions (i.e. if its version is lower than 0.11.0.0) + * @throws AuthorizationException fatal error indicating that the configured + * transactional.id is not authorized. See the exception for more details + * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error + * @throws TimeoutException if the time taken for preparing the transaction has surpassed max.block.ms + * @throws InterruptException if the thread is interrupted while blocked + */ + @Override + public PreparedTxnState prepareTransaction() throws ProducerFencedException { + throwIfNoTransactionManager(); + throwIfProducerClosed(); + throwIfInPreparedState(); + if (!transactionManager.is2PCEnabled()) { + throw new InvalidTxnStateException("Cannot prepare a transaction when 2PC is not enabled"); + } + long now = time.nanoseconds(); + flush(); + transactionManager.prepareTransaction(); + producerMetrics.recordPrepareTxn(time.nanoseconds() - now); + return transactionManager.preparedTransactionState(); + } + /** * Commits the ongoing transaction. This method will flush any unsent records before actually committing the transaction. *

@@ -967,6 +1014,23 @@ public class KafkaProducer implements Producer { throw new IllegalStateException("Cannot perform operation after producer has been closed"); } + /** + * Throws an exception if the transaction is in a prepared state. + * In a two-phase commit (2PC) flow, once a transaction enters the prepared state, + * only commit, abort, or complete operations are allowed. + * + * @throws IllegalStateException if any other operation is attempted in the prepared state. + */ + private void throwIfInPreparedState() { + if (transactionManager != null && + transactionManager.isTransactional() && + transactionManager.isPrepared() + ) { + throw new IllegalStateException("Cannot perform operation while the transaction is in a prepared state. " + + "Only commitTransaction(), abortTransaction(), or completeTransaction() are permitted."); + } + } + /** * Implementation of asynchronously send a record to a topic. */ @@ -978,6 +1042,8 @@ public class KafkaProducer implements Producer { try { throwIfProducerClosed(); + throwIfInPreparedState(); + // first make sure the metadata for the topic is available long nowMs = time.milliseconds(); ClusterAndWaitTime clusterAndWaitTime; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index e3c5a23ca51..c6e02d4ab16 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -200,6 +200,18 @@ public class MockProducer implements Producer { this.sentOffsets = true; } + @Override + public PreparedTxnState prepareTransaction() throws ProducerFencedException { + verifyNotClosed(); + verifyNotFenced(); + verifyTransactionsInitialized(); + verifyTransactionInFlight(); + + // Return a new PreparedTxnState with mock values for producerId and epoch + // Using 1000L and (short)1 as arbitrary values for a valid PreparedTxnState + return new PreparedTxnState(1000L, (short) 1); + } + @Override public void commitTransaction() throws ProducerFencedException { verifyNotClosed(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index a5cd92295ff..db4460d6b10 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -62,6 +62,11 @@ public interface Producer extends Closeable { void sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata groupMetadata) throws ProducerFencedException; + /** + * See {@link KafkaProducer#prepareTransaction()} + */ + PreparedTxnState prepareTransaction() throws ProducerFencedException; + /** * See {@link KafkaProducer#commitTransaction()} */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java index 7d942d572cf..6c94466c55e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java @@ -33,6 +33,7 @@ public class KafkaProducerMetrics implements AutoCloseable { private static final String TXN_SEND_OFFSETS = "txn-send-offsets"; private static final String TXN_COMMIT = "txn-commit"; private static final String TXN_ABORT = "txn-abort"; + private static final String TXN_PREPARE = "txn-prepare"; private static final String TOTAL_TIME_SUFFIX = "-time-ns-total"; private static final String METADATA_WAIT = "metadata-wait"; @@ -44,6 +45,7 @@ public class KafkaProducerMetrics implements AutoCloseable { private final Sensor sendOffsetsSensor; private final Sensor commitTxnSensor; private final Sensor abortTxnSensor; + private final Sensor prepareTxnSensor; private final Sensor metadataWaitSensor; public KafkaProducerMetrics(Metrics metrics) { @@ -73,6 +75,10 @@ public class KafkaProducerMetrics implements AutoCloseable { TXN_ABORT, "Total time producer has spent in abortTransaction in nanoseconds." ); + prepareTxnSensor = newLatencySensor( + TXN_PREPARE, + "Total time producer has spent in prepareTransaction in nanoseconds." + ); metadataWaitSensor = newLatencySensor( METADATA_WAIT, "Total time producer has spent waiting on topic metadata in nanoseconds." @@ -87,6 +93,7 @@ public class KafkaProducerMetrics implements AutoCloseable { removeMetric(TXN_SEND_OFFSETS); removeMetric(TXN_COMMIT); removeMetric(TXN_ABORT); + removeMetric(TXN_PREPARE); removeMetric(METADATA_WAIT); } @@ -114,6 +121,10 @@ public class KafkaProducerMetrics implements AutoCloseable { abortTxnSensor.record(duration); } + public void recordPrepareTxn(long duration) { + prepareTxnSensor.record(duration); + } + public void recordMetadataWait(long duration) { metadataWaitSensor.record(duration); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index e17dc15d239..061df29fbb2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.RequestCompletionHandler; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.PreparedTxnState; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; @@ -144,12 +145,14 @@ public class TransactionManager { private volatile long latestFinalizedFeaturesEpoch = -1; private volatile boolean isTransactionV2Enabled = false; private final boolean enable2PC; + private volatile PreparedTxnState preparedTxnState; private enum State { UNINITIALIZED, INITIALIZING, READY, IN_TRANSACTION, + PREPARED_TRANSACTION, COMMITTING_TRANSACTION, ABORTING_TRANSACTION, ABORTABLE_ERROR, @@ -165,10 +168,12 @@ public class TransactionManager { return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION; case IN_TRANSACTION: return source == READY; + case PREPARED_TRANSACTION: + return source == IN_TRANSACTION || source == INITIALIZING; case COMMITTING_TRANSACTION: - return source == IN_TRANSACTION; + return source == IN_TRANSACTION || source == PREPARED_TRANSACTION; case ABORTING_TRANSACTION: - return source == IN_TRANSACTION || source == ABORTABLE_ERROR; + return source == IN_TRANSACTION || source == PREPARED_TRANSACTION || source == ABORTABLE_ERROR; case ABORTABLE_ERROR: return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTABLE_ERROR || source == INITIALIZING; @@ -223,6 +228,7 @@ public class TransactionManager { this.txnPartitionMap = new TxnPartitionMap(logContext); this.apiVersions = apiVersions; this.enable2PC = enable2PC; + this.preparedTxnState = new PreparedTxnState(); } /** @@ -241,7 +247,7 @@ public class TransactionManager { * transaction manager. The {@link Producer} API calls that perform a state transition include: * *

    - *
  • {@link Producer#initTransactions()} calls {@link #initializeTransactions()}
  • + *
  • {@link Producer#initTransactions()} calls {@link #initializeTransactions(boolean)}
  • *
  • {@link Producer#beginTransaction()} calls {@link #beginTransaction()}
  • *
  • {@link Producer#commitTransaction()}} calls {@link #beginCommit()}
  • *
  • {@link Producer#abortTransaction()} calls {@link #beginAbort()} @@ -330,6 +336,22 @@ public class TransactionManager { transitionTo(State.IN_TRANSACTION); } + /** + * Prepare a transaction for a two-phase commit. + * This transitions the transaction to the PREPARED_TRANSACTION state. + * The preparedTxnState is set with the current producer ID and epoch. + */ + public synchronized void prepareTransaction() { + ensureTransactional(); + throwIfPendingState("prepareTransaction"); + maybeFailWithError(); + transitionTo(State.PREPARED_TRANSACTION); + this.preparedTxnState = new PreparedTxnState( + this.producerIdAndEpoch.producerId + ":" + + this.producerIdAndEpoch.epoch + ); + } + public synchronized TransactionalRequestResult beginCommit() { return handleCachedTransactionRequestResult(() -> { maybeFailWithError(); @@ -487,6 +509,10 @@ public class TransactionManager { return isTransactionV2Enabled; } + public boolean is2PCEnabled() { + return enable2PC; + } + synchronized boolean hasPartitionsToAdd() { return !newPartitionsInTransaction.isEmpty() || !pendingPartitionsInTransaction.isEmpty(); } @@ -1058,6 +1084,15 @@ public class TransactionManager { return isTransactional() && currentState == State.INITIALIZING; } + /** + * Check if the transaction is in the prepared state. + * + * @return true if the current state is PREPARED_TRANSACTION + */ + public synchronized boolean isPrepared() { + return currentState == State.PREPARED_TRANSACTION; + } + void handleCoordinatorReady() { NodeApiVersions nodeApiVersions = transactionCoordinator != null ? apiVersions.get(transactionCoordinator.idString()) : @@ -1453,6 +1488,7 @@ public class TransactionManager { ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.data().producerId(), initProducerIdResponse.data().producerEpoch()); setProducerIdAndEpoch(producerIdAndEpoch); + // TO_DO Add code to handle transition to prepared_txn when keepPrepared = true transitionTo(State.READY); lastError = null; if (this.isEpochBump) { @@ -1904,5 +1940,14 @@ public class TransactionManager { } } - + /** + * Returns a PreparedTxnState object containing the producer ID and epoch + * of the ongoing transaction. + * This is used when preparing a transaction for a two-phase commit. + * + * @return a PreparedTxnState with the current producer ID and epoch + */ + public PreparedTxnState preparedTransactionState() { + return this.preparedTxnState; + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index ca6757d1f6d..2231e0338c7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -46,6 +46,7 @@ import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.InvalidTxnStateException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; @@ -152,6 +153,7 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -162,6 +164,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.notNull; import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; @@ -1438,6 +1441,165 @@ public class KafkaProducerTest { "enable2Pc flag should match producer configuration"); } } + + @Test + public void testPrepareTransactionSuccess() throws Exception { + StringSerializer serializer = new StringSerializer(); + KafkaProducerTestContext ctx = new KafkaProducerTestContext<>(testInfo, serializer); + + when(ctx.transactionManager.isTransactionV2Enabled()).thenReturn(true); + when(ctx.transactionManager.is2PCEnabled()).thenReturn(true); + when(ctx.sender.isRunning()).thenReturn(true); + + doNothing().when(ctx.transactionManager).prepareTransaction(); + + PreparedTxnState expectedState = mock(PreparedTxnState.class); + when(ctx.transactionManager.preparedTransactionState()).thenReturn(expectedState); + + try (KafkaProducer producer = ctx.newKafkaProducer()) { + PreparedTxnState returned = producer.prepareTransaction(); + assertSame(expectedState, returned); + + verify(ctx.transactionManager).prepareTransaction(); + verify(ctx.accumulator).beginFlush(); + verify(ctx.accumulator).awaitFlushCompletion(); + } + } + + @Test + public void testSendNotAllowedInPreparedTransactionState() throws Exception { + StringSerializer serializer = new StringSerializer(); + KafkaProducerTestContext ctx = new KafkaProducerTestContext<>(testInfo, serializer); + + String topic = "foo"; + Cluster cluster = TestUtils.singletonCluster(topic, 1); + + when(ctx.sender.isRunning()).thenReturn(true); + when(ctx.metadata.fetch()).thenReturn(cluster); + + // Mock transaction manager to simulate being in a prepared state + when(ctx.transactionManager.isTransactional()).thenReturn(true); + when(ctx.transactionManager.isPrepared()).thenReturn(true); + + // Create record to send + long timestamp = ctx.time.milliseconds(); + ProducerRecord record = new ProducerRecord<>(topic, 0, timestamp, "key", "value"); + + try (KafkaProducer producer = ctx.newKafkaProducer()) { + // Verify that sending a record throws IllegalStateException with the correct message + IllegalStateException exception = assertThrows( + IllegalStateException.class, + () -> producer.send(record) + ); + + assertTrue(exception.getMessage().contains("Cannot perform operation while the transaction is in a prepared state")); + + // Verify transactionManager methods were called + verify(ctx.transactionManager).isTransactional(); + verify(ctx.transactionManager).isPrepared(); + + // Verify that no message was actually sent (accumulator was not called) + verify(ctx.accumulator, never()).append( + eq(topic), + anyInt(), + anyLong(), + any(), + any(), + any(), + any(), + anyLong(), + anyLong(), + any() + ); + } + } + + @Test + public void testSendOffsetsNotAllowedInPreparedTransactionState() throws Exception { + StringSerializer serializer = new StringSerializer(); + KafkaProducerTestContext ctx = new KafkaProducerTestContext<>(testInfo, serializer); + + String topic = "foo"; + Cluster cluster = TestUtils.singletonCluster(topic, 1); + + when(ctx.sender.isRunning()).thenReturn(true); + when(ctx.metadata.fetch()).thenReturn(cluster); + + // Mock transaction manager to simulate being in a prepared state + when(ctx.transactionManager.isTransactional()).thenReturn(true); + when(ctx.transactionManager.isPrepared()).thenReturn(true); + + // Create consumer group metadata + String groupId = "test-group"; + Map offsets = new HashMap<>(); + offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(100L)); + ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata(groupId); + + try (KafkaProducer producer = ctx.newKafkaProducer()) { + // Verify that sending offsets throws IllegalStateException with the correct message + IllegalStateException exception = assertThrows( + IllegalStateException.class, + () -> producer.sendOffsetsToTransaction(offsets, groupMetadata) + ); + + assertTrue(exception.getMessage().contains("Cannot perform operation while the transaction is in a prepared state")); + + // Verify transactionManager methods were called + verify(ctx.transactionManager).isTransactional(); + verify(ctx.transactionManager).isPrepared(); + + // Verify that no offsets were actually sent + verify(ctx.transactionManager, never()).sendOffsetsToTransaction( + eq(offsets), + eq(groupMetadata) + ); + } + } + + @Test + public void testBeginTransactionNotAllowedInPreparedTransactionState() throws Exception { + StringSerializer serializer = new StringSerializer(); + KafkaProducerTestContext ctx = new KafkaProducerTestContext<>(testInfo, serializer); + + when(ctx.sender.isRunning()).thenReturn(true); + + // Mock transaction manager to simulate being in a prepared state + when(ctx.transactionManager.isTransactional()).thenReturn(true); + when(ctx.transactionManager.isPrepared()).thenReturn(true); + + try (KafkaProducer producer = ctx.newKafkaProducer()) { + // Verify that calling beginTransaction throws IllegalStateException with the correct message + IllegalStateException exception = assertThrows( + IllegalStateException.class, + producer::beginTransaction + ); + + assertTrue(exception.getMessage().contains("Cannot perform operation while the transaction is in a prepared state")); + + // Verify transactionManager methods were called + verify(ctx.transactionManager).isTransactional(); + verify(ctx.transactionManager).isPrepared(); + } + } + + @Test + public void testPrepareTransactionFailsWhen2PCDisabled() { + StringSerializer serializer = new StringSerializer(); + KafkaProducerTestContext ctx = new KafkaProducerTestContext<>(testInfo, serializer); + + // Disable 2PC + when(ctx.transactionManager.isTransactionV2Enabled()).thenReturn(true); + when(ctx.transactionManager.is2PCEnabled()).thenReturn(false); + when(ctx.sender.isRunning()).thenReturn(true); + + try (KafkaProducer producer = ctx.newKafkaProducer()) { + assertThrows( + InvalidTxnStateException.class, + producer::prepareTransaction, + "prepareTransaction() should fail if 2PC is disabled" + ); + } + } @Test public void testClusterAuthorizationFailure() throws Exception { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index e6ac3e56299..118656e47f8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.PreparedTxnState; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; @@ -138,6 +139,8 @@ public class TransactionManagerTest { private final TopicPartition tp1 = new TopicPartition(topic, 1); private final long producerId = 13131L; private final short epoch = 1; + private final long ongoingProducerId = 999L; + private final short bumpedOngoingEpoch = 11; private final String consumerGroupId = "myConsumerGroup"; private final String memberId = "member"; private final int generationId = 5; @@ -4022,6 +4025,56 @@ public class TransactionManagerTest { assertFalse(transactionManager.hasOngoingTransaction()); } + @Test + public void testInitializeTransactionsWithKeepPreparedTxn() { + doInitTransactionsWith2PCEnabled(true); + runUntil(transactionManager::hasProducerId); + + // Expect a bumped epoch in the response. + assertTrue(transactionManager.hasProducerId()); + assertFalse(transactionManager.hasOngoingTransaction()); + assertEquals(ongoingProducerId, transactionManager.producerIdAndEpoch().producerId); + assertEquals(bumpedOngoingEpoch, transactionManager.producerIdAndEpoch().epoch); + } + + @Test + public void testPrepareTransaction() { + doInitTransactionsWith2PCEnabled(false); + runUntil(transactionManager::hasProducerId); + + // Begin a transaction + transactionManager.beginTransaction(); + assertTrue(transactionManager.hasOngoingTransaction()); + + // Add a partition to the transaction + transactionManager.maybeAddPartition(tp0); + + // Capture the current producer ID and epoch before preparing the response + long producerId = transactionManager.producerIdAndEpoch().producerId; + short epoch = transactionManager.producerIdAndEpoch().epoch; + + // Simulate a produce request + try { + // Prepare the response before sending to ensure it's ready + prepareProduceResponse(Errors.NONE, producerId, epoch); + + appendToAccumulator(tp0); + // Wait until the request is processed + runUntil(() -> !client.hasPendingResponses()); + } catch (InterruptedException e) { + fail("Unexpected interruption: " + e); + } + + transactionManager.prepareTransaction(); + assertTrue(transactionManager.isPrepared()); + + PreparedTxnState preparedState = transactionManager.preparedTransactionState(); + // Validate the state contains the correct serialized producer ID and epoch + assertEquals(producerId + ":" + epoch, preparedState.toString()); + assertEquals(producerId, preparedState.producerId()); + assertEquals(epoch, preparedState.epoch()); + } + private void prepareAddPartitionsToTxn(final Map errors) { AddPartitionsToTxnResult result = AddPartitionsToTxnResponse.resultForTransaction(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID, errors); AddPartitionsToTxnResponseData data = new AddPartitionsToTxnResponseData().setResultsByTopicV3AndBelow(result.topicResults()).setThrottleTimeMs(0); @@ -4361,6 +4414,48 @@ public class TransactionManagerTest { assertTrue(result.isAcked()); } + private void doInitTransactionsWith2PCEnabled(boolean keepPrepared) { + initializeTransactionManager(Optional.of(transactionalId), true, true); + TransactionalRequestResult result = transactionManager.initializeTransactions(keepPrepared); + + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + + if (keepPrepared) { + // Simulate an ongoing prepared transaction (ongoingProducerId != -1). + short ongoingEpoch = bumpedOngoingEpoch - 1; + prepareInitPidResponse( + Errors.NONE, + false, + ongoingProducerId, + bumpedOngoingEpoch, + true, + true, + ongoingProducerId, + ongoingEpoch + ); + } else { + prepareInitPidResponse( + Errors.NONE, + false, + producerId, + epoch, + false, + true, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH + ); + } + + runUntil(transactionManager::hasProducerId); + transactionManager.maybeUpdateTransactionV2Enabled(true); + + result.await(); + assertTrue(result.isSuccessful()); + assertTrue(result.isAcked()); + } + private void assertAbortableError(Class cause) { try { transactionManager.beginCommit(); @@ -4411,39 +4506,6 @@ public class TransactionManagerTest { ProducerTestUtils.runUntil(sender, condition); } - @Test - public void testInitializeTransactionsWithKeepPreparedTxn() { - initializeTransactionManager(Optional.of(transactionalId), true, true); - - client.prepareResponse( - FindCoordinatorResponse.prepareResponse(Errors.NONE, transactionalId, brokerNode) - ); - - // Simulate an ongoing prepared transaction (ongoingProducerId != -1). - long ongoingProducerId = 999L; - short ongoingEpoch = 10; - short bumpedEpoch = 11; - - prepareInitPidResponse( - Errors.NONE, - false, - ongoingProducerId, - bumpedEpoch, - true, - true, - ongoingProducerId, - ongoingEpoch - ); - - transactionManager.initializeTransactions(true); - runUntil(transactionManager::hasProducerId); - - assertTrue(transactionManager.hasProducerId()); - assertFalse(transactionManager.hasOngoingTransaction()); - assertEquals(ongoingProducerId, transactionManager.producerIdAndEpoch().producerId); - assertEquals(bumpedEpoch, transactionManager.producerIdAndEpoch().epoch); - } - /** * This subclass exists only to optionally change the default behavior related to poisoning the state * on invalid state transition attempts.