diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 71d201b71f9..1b1d7bb7e83 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -50,13 +50,11 @@ import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidTopicException; -import org.apache.kafka.common.errors.InvalidTxnStateException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -622,13 +620,8 @@ public class KafkaProducer implements Producer { } /** - * Initialize the transactional state for this producer, similar to {@link #initTransactions()} but - * with additional capabilities to keep a previously prepared transaction. - * * Needs to be called before any other methods when the {@code transactional.id} is set in the configuration. - * - * When {@code keepPreparedTxn} is {@code false}, this behaves like the standard transactional - * initialization where the method does the following: + * This method does the following: *
    *
  1. Ensures any transactions initiated by previous instances of the producer with the same * {@code transactional.id} are completed. If the previous instance had failed with a transaction in @@ -637,39 +630,26 @@ public class KafkaProducer implements Producer { *
  2. Gets the internal producer id and epoch, used in all future transactional * messages issued by the producer.
  3. *
- * - *

- * When {@code keepPreparedTxn} is set to {@code true}, the producer does not automatically abort existing - * transactions. Instead, it enters a recovery mode allowing only finalization of those previously - * prepared transactions. - * This behavior is especially crucial for 2PC scenarios, where transactions should remain intact - * until the external transaction manager decides whether to commit or abort. - *

- * - * @param keepPreparedTxn true to retain any in-flight prepared transactions (necessary for 2PC - * recovery), false to abort existing transactions and behave like - * the standard initTransactions. - * * Note that this method will raise {@link TimeoutException} if the transactional state cannot * be initialized before expiration of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} * if interrupted. It is safe to retry in either case, but once the transactional state has been successfully * initialized, this method should no longer be used. * - * @throws IllegalStateException if no {@code transactional.id} is configured - * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not - * support transactions (i.e. if its version is lower than 0.11.0.0) - * @throws org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the configured - * {@code transactional.id} is unauthorized either for normal transaction writes or 2PC. - * @throws KafkaException if the producer encounters a fatal error or any other unexpected error + * @throws IllegalStateException if no {@code transactional.id} has been configured + * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker + * does not support transactions (i.e. if its version is lower than 0.11.0.0) + * @throws org.apache.kafka.common.errors.AuthorizationException error indicating that the configured + * transactional.id is not authorized, or the idempotent producer id is unavailable. See the exception for + * more details. User may retry this function call after fixing the permission. + * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error * @throws TimeoutException if the time taken for initialize the transaction has surpassed max.block.ms. * @throws InterruptException if the thread is interrupted while blocked */ - public void initTransactions(boolean keepPreparedTxn) { + public void initTransactions() { throwIfNoTransactionManager(); throwIfProducerClosed(); - throwIfInPreparedState(); long now = time.nanoseconds(); - TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn); + TransactionalRequestResult result = transactionManager.initializeTransactions(false); sender.wakeup(); result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); producerMetrics.recordInit(time.nanoseconds() - now); @@ -754,7 +734,6 @@ public class KafkaProducer implements Producer { throwIfInvalidGroupMetadata(groupMetadata); throwIfNoTransactionManager(); throwIfProducerClosed(); - throwIfInPreparedState(); if (!offsets.isEmpty()) { long start = time.nanoseconds(); @@ -765,48 +744,6 @@ public class KafkaProducer implements Producer { } } - /** - * Prepares the current transaction for a two-phase commit. This method will flush all pending messages - * and transition the producer into a mode where only {@link #commitTransaction()}, {@link #abortTransaction()}, - * or completeTransaction(PreparedTxnState) may be called. - *

- * This method is used as part of a two-phase commit protocol: - *

    - *
  1. Prepare the transaction by calling this method. This returns a {@link PreparedTxnState} if successful.
  2. - *
  3. Make any external system changes that need to be atomic with this transaction.
  4. - *
  5. Complete the transaction by calling {@link #commitTransaction()}, {@link #abortTransaction()} or - * completeTransaction(PreparedTxnState).
  6. - *
- * - * @return the prepared transaction state to use when completing the transaction - * - * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started yet. - * @throws InvalidTxnStateException if the producer is not in a state where preparing - * a transaction is possible or 2PC is not enabled. - * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active - * @throws UnsupportedVersionException fatal error indicating the broker - * does not support transactions (i.e. if its version is lower than 0.11.0.0) - * @throws AuthorizationException fatal error indicating that the configured - * transactional.id is not authorized. See the exception for more details - * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error - * @throws TimeoutException if the time taken for preparing the transaction has surpassed max.block.ms - * @throws InterruptException if the thread is interrupted while blocked - */ - @Override - public PreparedTxnState prepareTransaction() throws ProducerFencedException { - throwIfNoTransactionManager(); - throwIfProducerClosed(); - throwIfInPreparedState(); - if (!transactionManager.is2PCEnabled()) { - throw new InvalidTxnStateException("Cannot prepare a transaction when 2PC is not enabled"); - } - long now = time.nanoseconds(); - flush(); - transactionManager.prepareTransaction(); - producerMetrics.recordPrepareTxn(time.nanoseconds() - now); - return transactionManager.preparedTransactionState(); - } - /** * Commits the ongoing transaction. This method will flush any unsent records before actually committing the transaction. *

@@ -884,40 +821,6 @@ public class KafkaProducer implements Producer { producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart); } - /** - * Completes a prepared transaction by comparing the provided prepared transaction state with the - * current prepared state on the producer. - * If they match, the transaction is committed; otherwise, it is aborted. - * - * @param preparedTxnState The prepared transaction state to compare against the current state - * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started - * @throws InvalidTxnStateException if the producer is not in prepared state - * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active - * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error - * @throws TimeoutException if the time taken for completing the transaction has surpassed max.block.ms - * @throws InterruptException if the thread is interrupted while blocked - */ - @Override - public void completeTransaction(PreparedTxnState preparedTxnState) throws ProducerFencedException { - throwIfNoTransactionManager(); - throwIfProducerClosed(); - - if (!transactionManager.isPrepared()) { - throw new InvalidTxnStateException("Cannot complete transaction because no transaction has been prepared. " + - "Call prepareTransaction() first, or make sure initTransaction(true) was called."); - } - - // Get the current prepared transaction state - PreparedTxnState currentPreparedState = transactionManager.preparedTransactionState(); - - // Compare the prepared transaction state token and commit or abort accordingly - if (currentPreparedState.equals(preparedTxnState)) { - commitTransaction(); - } else { - abortTransaction(); - } - } - /** * Asynchronously send a record to a topic. Equivalent to send(record, null). * See {@link #send(ProducerRecord, Callback)} for details. diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 3e5cb9f5d5a..a4aac86df09 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -142,7 +142,7 @@ public class MockProducer implements Producer { } @Override - public void initTransactions(boolean keepPreparedTxn) { + public void initTransactions() { verifyNotClosed(); verifyNotFenced(); if (this.transactionInitialized) { @@ -200,18 +200,6 @@ public class MockProducer implements Producer { this.sentOffsets = true; } - @Override - public PreparedTxnState prepareTransaction() throws ProducerFencedException { - verifyNotClosed(); - verifyNotFenced(); - verifyTransactionsInitialized(); - verifyTransactionInFlight(); - - // Return a new PreparedTxnState with mock values for producerId and epoch - // Using 1000L and (short)1 as arbitrary values for a valid PreparedTxnState - return new PreparedTxnState(1000L, (short) 1); - } - @Override public void commitTransaction() throws ProducerFencedException { verifyNotClosed(); @@ -257,27 +245,6 @@ public class MockProducer implements Producer { this.transactionInFlight = false; } - @Override - public void completeTransaction(PreparedTxnState preparedTxnState) throws ProducerFencedException { - verifyNotClosed(); - verifyNotFenced(); - verifyTransactionsInitialized(); - - if (!this.transactionInFlight) { - throw new IllegalStateException("There is no prepared transaction to complete."); - } - - // For testing purposes, we'll consider a prepared state with producerId=1000L and epoch=1 as valid - // This should match what's returned in prepareTransaction() - PreparedTxnState currentState = new PreparedTxnState(1000L, (short) 1); - - if (currentState.equals(preparedTxnState)) { - commitTransaction(); - } else { - abortTransaction(); - } - } - private synchronized void verifyNotClosed() { if (this.closed) { throw new IllegalStateException("MockProducer is already closed."); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index e6e94691e34..798034dda6d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -42,14 +42,7 @@ public interface Producer extends Closeable { /** * See {@link KafkaProducer#initTransactions()} */ - default void initTransactions() { - initTransactions(false); - } - - /** - * See {@link KafkaProducer#initTransactions(boolean)} - */ - void initTransactions(boolean keepPreparedTxn); + void initTransactions(); /** * See {@link KafkaProducer#beginTransaction()} @@ -62,11 +55,6 @@ public interface Producer extends Closeable { void sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata groupMetadata) throws ProducerFencedException; - /** - * See {@link KafkaProducer#prepareTransaction()} - */ - PreparedTxnState prepareTransaction() throws ProducerFencedException; - /** * See {@link KafkaProducer#commitTransaction()} */ @@ -77,11 +65,6 @@ public interface Producer extends Closeable { */ void abortTransaction() throws ProducerFencedException; - /** - * See {@link KafkaProducer#completeTransaction(PreparedTxnState)} - */ - void completeTransaction(PreparedTxnState preparedTxnState) throws ProducerFencedException; - /** * @see KafkaProducer#registerMetricForSubscription(KafkaMetric) */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 5d83cbc0b1b..20804c505dd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -320,9 +320,7 @@ public class TransactionManager { .setTransactionalId(transactionalId) .setTransactionTimeoutMs(transactionTimeoutMs) .setProducerId(producerIdAndEpoch.producerId) - .setProducerEpoch(producerIdAndEpoch.epoch) - .setEnable2Pc(enable2PC) - .setKeepPreparedTxn(keepPreparedTxn); + .setProducerEpoch(producerIdAndEpoch.epoch); InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), isEpochBump); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 8460d0f4c5f..ac63fed3c04 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -32,7 +32,6 @@ import org.apache.kafka.clients.producer.internals.ProducerMetadata; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.Sender; import org.apache.kafka.clients.producer.internals.TransactionManager; -import org.apache.kafka.clients.producer.internals.TransactionalRequestResult; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -47,7 +46,6 @@ import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidTopicException; -import org.apache.kafka.common.errors.InvalidTxnStateException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; @@ -78,7 +76,6 @@ import org.apache.kafka.common.requests.AddOffsetsToTxnResponse; import org.apache.kafka.common.requests.EndTxnResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; -import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.MetadataResponse; @@ -107,7 +104,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.MockedStatic; import org.mockito.Mockito; @@ -154,7 +150,6 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -165,7 +160,6 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.notNull; import static org.mockito.Mockito.atMostOnce; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; @@ -1389,294 +1383,6 @@ public class KafkaProducerTest { } } - @ParameterizedTest - @CsvSource({ - "true, false", - "true, true", - "false, true" - }) - public void testInitTransactionsWithKeepPreparedTxnAndTwoPhaseCommit(boolean keepPreparedTxn, boolean enable2PC) { - Map configs = new HashMap<>(); - configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id"); - configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); - configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); - if (enable2PC) { - configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true); - } - - Time time = new MockTime(1); - MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1)); - ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE); - MockClient client = new MockClient(time, metadata); - client.updateMetadata(initialUpdateResponse); - - // Capture flags from the InitProducerIdRequest - boolean[] requestFlags = new boolean[2]; // [keepPreparedTxn, enable2Pc] - - client.prepareResponse( - request -> request instanceof FindCoordinatorRequest && - ((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(), - FindCoordinatorResponse.prepareResponse(Errors.NONE, "test-txn-id", NODE)); - - client.prepareResponse( - request -> { - if (request instanceof InitProducerIdRequest) { - InitProducerIdRequest initRequest = (InitProducerIdRequest) request; - requestFlags[0] = initRequest.data().keepPreparedTxn(); - requestFlags[1] = initRequest.data().enable2Pc(); - return true; - } - return false; - }, - initProducerIdResponse(1L, (short) 5, Errors.NONE)); - - try (Producer producer = kafkaProducer(configs, new StringSerializer(), - new StringSerializer(), metadata, client, null, time)) { - producer.initTransactions(keepPreparedTxn); - - // Verify request flags match expected values - assertEquals(keepPreparedTxn, requestFlags[0], - "keepPreparedTxn flag should match input parameter"); - assertEquals(enable2PC, requestFlags[1], - "enable2Pc flag should match producer configuration"); - } - } - - @Test - public void testPrepareTransactionSuccess() throws Exception { - StringSerializer serializer = new StringSerializer(); - KafkaProducerTestContext ctx = new KafkaProducerTestContext<>(testInfo, serializer); - - when(ctx.transactionManager.isTransactionV2Enabled()).thenReturn(true); - when(ctx.transactionManager.is2PCEnabled()).thenReturn(true); - when(ctx.sender.isRunning()).thenReturn(true); - - doNothing().when(ctx.transactionManager).prepareTransaction(); - - PreparedTxnState expectedState = mock(PreparedTxnState.class); - when(ctx.transactionManager.preparedTransactionState()).thenReturn(expectedState); - - try (KafkaProducer producer = ctx.newKafkaProducer()) { - PreparedTxnState returned = producer.prepareTransaction(); - assertSame(expectedState, returned); - - verify(ctx.transactionManager).prepareTransaction(); - verify(ctx.accumulator).beginFlush(); - verify(ctx.accumulator).awaitFlushCompletion(); - } - } - - @Test - public void testSendNotAllowedInPreparedTransactionState() throws Exception { - StringSerializer serializer = new StringSerializer(); - KafkaProducerTestContext ctx = new KafkaProducerTestContext<>(testInfo, serializer); - - String topic = "foo"; - Cluster cluster = TestUtils.singletonCluster(topic, 1); - - when(ctx.sender.isRunning()).thenReturn(true); - when(ctx.metadata.fetch()).thenReturn(cluster); - - // Mock transaction manager to simulate being in a prepared state - when(ctx.transactionManager.isTransactional()).thenReturn(true); - when(ctx.transactionManager.isPrepared()).thenReturn(true); - - // Create record to send - long timestamp = ctx.time.milliseconds(); - ProducerRecord record = new ProducerRecord<>(topic, 0, timestamp, "key", "value"); - - try (KafkaProducer producer = ctx.newKafkaProducer()) { - // Verify that sending a record throws IllegalStateException with the correct message - IllegalStateException exception = assertThrows( - IllegalStateException.class, - () -> producer.send(record) - ); - - assertTrue(exception.getMessage().contains("Cannot perform operation while the transaction is in a prepared state")); - - // Verify transactionManager methods were called - verify(ctx.transactionManager).isTransactional(); - verify(ctx.transactionManager).isPrepared(); - - // Verify that no message was actually sent (accumulator was not called) - verify(ctx.accumulator, never()).append( - eq(topic), - anyInt(), - anyLong(), - any(), - any(), - any(), - any(), - anyLong(), - anyLong(), - any() - ); - } - } - - @Test - public void testSendOffsetsNotAllowedInPreparedTransactionState() throws Exception { - StringSerializer serializer = new StringSerializer(); - KafkaProducerTestContext ctx = new KafkaProducerTestContext<>(testInfo, serializer); - - String topic = "foo"; - Cluster cluster = TestUtils.singletonCluster(topic, 1); - - when(ctx.sender.isRunning()).thenReturn(true); - when(ctx.metadata.fetch()).thenReturn(cluster); - - // Mock transaction manager to simulate being in a prepared state - when(ctx.transactionManager.isTransactional()).thenReturn(true); - when(ctx.transactionManager.isPrepared()).thenReturn(true); - - // Create consumer group metadata - String groupId = "test-group"; - Map offsets = new HashMap<>(); - offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(100L)); - ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata(groupId); - - try (KafkaProducer producer = ctx.newKafkaProducer()) { - // Verify that sending offsets throws IllegalStateException with the correct message - IllegalStateException exception = assertThrows( - IllegalStateException.class, - () -> producer.sendOffsetsToTransaction(offsets, groupMetadata) - ); - - assertTrue(exception.getMessage().contains("Cannot perform operation while the transaction is in a prepared state")); - - // Verify transactionManager methods were called - verify(ctx.transactionManager).isTransactional(); - verify(ctx.transactionManager).isPrepared(); - - // Verify that no offsets were actually sent - verify(ctx.transactionManager, never()).sendOffsetsToTransaction( - eq(offsets), - eq(groupMetadata) - ); - } - } - - @Test - public void testBeginTransactionNotAllowedInPreparedTransactionState() throws Exception { - StringSerializer serializer = new StringSerializer(); - KafkaProducerTestContext ctx = new KafkaProducerTestContext<>(testInfo, serializer); - - when(ctx.sender.isRunning()).thenReturn(true); - - // Mock transaction manager to simulate being in a prepared state - when(ctx.transactionManager.isTransactional()).thenReturn(true); - when(ctx.transactionManager.isPrepared()).thenReturn(true); - - try (KafkaProducer producer = ctx.newKafkaProducer()) { - // Verify that calling beginTransaction throws IllegalStateException with the correct message - IllegalStateException exception = assertThrows( - IllegalStateException.class, - producer::beginTransaction - ); - - assertTrue(exception.getMessage().contains("Cannot perform operation while the transaction is in a prepared state")); - - // Verify transactionManager methods were called - verify(ctx.transactionManager).isTransactional(); - verify(ctx.transactionManager).isPrepared(); - } - } - - @Test - public void testPrepareTransactionFailsWhen2PCDisabled() { - StringSerializer serializer = new StringSerializer(); - KafkaProducerTestContext ctx = new KafkaProducerTestContext<>(testInfo, serializer); - - // Disable 2PC - when(ctx.transactionManager.isTransactionV2Enabled()).thenReturn(true); - when(ctx.transactionManager.is2PCEnabled()).thenReturn(false); - when(ctx.sender.isRunning()).thenReturn(true); - - try (KafkaProducer producer = ctx.newKafkaProducer()) { - assertThrows( - InvalidTxnStateException.class, - producer::prepareTransaction, - "prepareTransaction() should fail if 2PC is disabled" - ); - } - } - - @Test - public void testCompleteTransactionWithMatchingState() throws Exception { - StringSerializer serializer = new StringSerializer(); - KafkaProducerTestContext ctx = new KafkaProducerTestContext<>(testInfo, serializer); - - when(ctx.transactionManager.isPrepared()).thenReturn(true); - when(ctx.sender.isRunning()).thenReturn(true); - - // Create prepared states with matching values - long producerId = 12345L; - short epoch = 5; - PreparedTxnState currentState = new PreparedTxnState(producerId, epoch); - PreparedTxnState inputState = new PreparedTxnState(producerId, epoch); - - // Set up the transaction manager to return the prepared state - when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState); - - // Should trigger commit when states match - TransactionalRequestResult commitResult = mock(TransactionalRequestResult.class); - when(ctx.transactionManager.beginCommit()).thenReturn(commitResult); - - try (KafkaProducer producer = ctx.newKafkaProducer()) { - // Call completeTransaction with the matching state - producer.completeTransaction(inputState); - - // Verify methods called in order - verify(ctx.transactionManager).isPrepared(); - verify(ctx.transactionManager).preparedTransactionState(); - verify(ctx.transactionManager).beginCommit(); - - // Verify abort was never called - verify(ctx.transactionManager, never()).beginAbort(); - - // Verify sender was woken up - verify(ctx.sender).wakeup(); - } - } - - @Test - public void testCompleteTransactionWithNonMatchingState() throws Exception { - StringSerializer serializer = new StringSerializer(); - KafkaProducerTestContext ctx = new KafkaProducerTestContext<>(testInfo, serializer); - - when(ctx.transactionManager.isPrepared()).thenReturn(true); - when(ctx.sender.isRunning()).thenReturn(true); - - // Create txn prepared states with different values - long producerId = 12345L; - short epoch = 5; - PreparedTxnState currentState = new PreparedTxnState(producerId, epoch); - PreparedTxnState inputState = new PreparedTxnState(producerId + 1, epoch); - - // Set up the transaction manager to return the prepared state - when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState); - - // Should trigger abort when states don't match - TransactionalRequestResult abortResult = mock(TransactionalRequestResult.class); - when(ctx.transactionManager.beginAbort()).thenReturn(abortResult); - - try (KafkaProducer producer = ctx.newKafkaProducer()) { - // Call completeTransaction with the non-matching state - producer.completeTransaction(inputState); - - // Verify methods called in order - verify(ctx.transactionManager).isPrepared(); - verify(ctx.transactionManager).preparedTransactionState(); - verify(ctx.transactionManager).beginAbort(); - - // Verify commit was never called - verify(ctx.transactionManager, never()).beginCommit(); - - // Verify sender was woken up - verify(ctx.sender).wakeup(); - } - } - @Test public void testClusterAuthorizationFailure() throws Exception { int maxBlockMs = 500; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 4668a91ed04..278e6c3e381 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.PreparedTxnState; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; @@ -4025,56 +4024,6 @@ public class TransactionManagerTest { assertFalse(transactionManager.hasOngoingTransaction()); } - @Test - public void testInitializeTransactionsWithKeepPreparedTxn() { - doInitTransactionsWith2PCEnabled(true); - runUntil(transactionManager::hasProducerId); - - // Expect a bumped epoch in the response. - assertTrue(transactionManager.hasProducerId()); - assertFalse(transactionManager.hasOngoingTransaction()); - assertEquals(ongoingProducerId, transactionManager.producerIdAndEpoch().producerId); - assertEquals(bumpedOngoingEpoch, transactionManager.producerIdAndEpoch().epoch); - } - - @Test - public void testPrepareTransaction() { - doInitTransactionsWith2PCEnabled(false); - runUntil(transactionManager::hasProducerId); - - // Begin a transaction - transactionManager.beginTransaction(); - assertTrue(transactionManager.hasOngoingTransaction()); - - // Add a partition to the transaction - transactionManager.maybeAddPartition(tp0); - - // Capture the current producer ID and epoch before preparing the response - long producerId = transactionManager.producerIdAndEpoch().producerId; - short epoch = transactionManager.producerIdAndEpoch().epoch; - - // Simulate a produce request - try { - // Prepare the response before sending to ensure it's ready - prepareProduceResponse(Errors.NONE, producerId, epoch); - - appendToAccumulator(tp0); - // Wait until the request is processed - runUntil(() -> !client.hasPendingResponses()); - } catch (InterruptedException e) { - fail("Unexpected interruption: " + e); - } - - transactionManager.prepareTransaction(); - assertTrue(transactionManager.isPrepared()); - - PreparedTxnState preparedState = transactionManager.preparedTransactionState(); - // Validate the state contains the correct serialized producer ID and epoch - assertEquals(producerId + ":" + epoch, preparedState.toString()); - assertEquals(producerId, preparedState.producerId()); - assertEquals(epoch, preparedState.epoch()); - } - private void prepareAddPartitionsToTxn(final Map errors) { AddPartitionsToTxnResult result = AddPartitionsToTxnResponse.resultForTransaction(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID, errors); AddPartitionsToTxnResponseData data = new AddPartitionsToTxnResponseData().setResultsByTopicV3AndBelow(result.topicResults()).setThrottleTimeMs(0);