KAFKA-19414: Remove 2PC public APIs from 4.1 until release (KIP-939) (#19985)

We are removing some of the previously added public APIs until KIP-939
is ready to use.

Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
Ritika Reddy 2025-06-25 09:06:21 -07:00 committed by GitHub
parent aa0d1f5000
commit c4cac07819
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 13 additions and 507 deletions

View File

@ -50,13 +50,11 @@ import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
@ -622,13 +620,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
/**
* Initialize the transactional state for this producer, similar to {@link #initTransactions()} but
* with additional capabilities to keep a previously prepared transaction.
*
* Needs to be called before any other methods when the {@code transactional.id} is set in the configuration.
*
* When {@code keepPreparedTxn} is {@code false}, this behaves like the standard transactional
* initialization where the method does the following:
* This method does the following:
* <ol>
* <li>Ensures any transactions initiated by previous instances of the producer with the same
* {@code transactional.id} are completed. If the previous instance had failed with a transaction in
@ -637,39 +630,26 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* <li>Gets the internal producer id and epoch, used in all future transactional
* messages issued by the producer.</li>
* </ol>
*
* <p>
* When {@code keepPreparedTxn} is set to {@code true}, the producer does <em>not</em> automatically abort existing
* transactions. Instead, it enters a recovery mode allowing only finalization of those previously
* prepared transactions.
* This behavior is especially crucial for 2PC scenarios, where transactions should remain intact
* until the external transaction manager decides whether to commit or abort.
* <p>
*
* @param keepPreparedTxn true to retain any in-flight prepared transactions (necessary for 2PC
* recovery), false to abort existing transactions and behave like
* the standard initTransactions.
*
* Note that this method will raise {@link TimeoutException} if the transactional state cannot
* be initialized before expiration of {@code max.block.ms}. Additionally, it will raise {@link InterruptException}
* if interrupted. It is safe to retry in either case, but once the transactional state has been successfully
* initialized, this method should no longer be used.
*
* @throws IllegalStateException if no {@code transactional.id} is configured
* @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not
* support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the configured
* {@code transactional.id} is unauthorized either for normal transaction writes or 2PC.
* @throws KafkaException if the producer encounters a fatal error or any other unexpected error
* @throws IllegalStateException if no {@code transactional.id} has been configured
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.AuthorizationException error indicating that the configured
* transactional.id is not authorized, or the idempotent producer id is unavailable. See the exception for
* more details. User may retry this function call after fixing the permission.
* @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
* @throws TimeoutException if the time taken for initialize the transaction has surpassed <code>max.block.ms</code>.
* @throws InterruptException if the thread is interrupted while blocked
*/
public void initTransactions(boolean keepPreparedTxn) {
public void initTransactions() {
throwIfNoTransactionManager();
throwIfProducerClosed();
throwIfInPreparedState();
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
TransactionalRequestResult result = transactionManager.initializeTransactions(false);
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
producerMetrics.recordInit(time.nanoseconds() - now);
@ -754,7 +734,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
throwIfInvalidGroupMetadata(groupMetadata);
throwIfNoTransactionManager();
throwIfProducerClosed();
throwIfInPreparedState();
if (!offsets.isEmpty()) {
long start = time.nanoseconds();
@ -765,48 +744,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
}
/**
* Prepares the current transaction for a two-phase commit. This method will flush all pending messages
* and transition the producer into a mode where only {@link #commitTransaction()}, {@link #abortTransaction()},
* or completeTransaction(PreparedTxnState) may be called.
* <p>
* This method is used as part of a two-phase commit protocol:
* <ol>
* <li>Prepare the transaction by calling this method. This returns a {@link PreparedTxnState} if successful.</li>
* <li>Make any external system changes that need to be atomic with this transaction.</li>
* <li>Complete the transaction by calling {@link #commitTransaction()}, {@link #abortTransaction()} or
* completeTransaction(PreparedTxnState).</li>
* </ol>
*
* @return the prepared transaction state to use when completing the transaction
*
* @throws IllegalStateException if no transactional.id has been configured or no transaction has been started yet.
* @throws InvalidTxnStateException if the producer is not in a state where preparing
* a transaction is possible or 2PC is not enabled.
* @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
* @throws UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized. See the exception for more details
* @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
* @throws TimeoutException if the time taken for preparing the transaction has surpassed <code>max.block.ms</code>
* @throws InterruptException if the thread is interrupted while blocked
*/
@Override
public PreparedTxnState prepareTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
throwIfProducerClosed();
throwIfInPreparedState();
if (!transactionManager.is2PCEnabled()) {
throw new InvalidTxnStateException("Cannot prepare a transaction when 2PC is not enabled");
}
long now = time.nanoseconds();
flush();
transactionManager.prepareTransaction();
producerMetrics.recordPrepareTxn(time.nanoseconds() - now);
return transactionManager.preparedTransactionState();
}
/**
* Commits the ongoing transaction. This method will flush any unsent records before actually committing the transaction.
* <p>
@ -884,40 +821,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
}
/**
* Completes a prepared transaction by comparing the provided prepared transaction state with the
* current prepared state on the producer.
* If they match, the transaction is committed; otherwise, it is aborted.
*
* @param preparedTxnState The prepared transaction state to compare against the current state
* @throws IllegalStateException if no transactional.id has been configured or no transaction has been started
* @throws InvalidTxnStateException if the producer is not in prepared state
* @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
* @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
* @throws TimeoutException if the time taken for completing the transaction has surpassed <code>max.block.ms</code>
* @throws InterruptException if the thread is interrupted while blocked
*/
@Override
public void completeTransaction(PreparedTxnState preparedTxnState) throws ProducerFencedException {
throwIfNoTransactionManager();
throwIfProducerClosed();
if (!transactionManager.isPrepared()) {
throw new InvalidTxnStateException("Cannot complete transaction because no transaction has been prepared. " +
"Call prepareTransaction() first, or make sure initTransaction(true) was called.");
}
// Get the current prepared transaction state
PreparedTxnState currentPreparedState = transactionManager.preparedTransactionState();
// Compare the prepared transaction state token and commit or abort accordingly
if (currentPreparedState.equals(preparedTxnState)) {
commitTransaction();
} else {
abortTransaction();
}
}
/**
* Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
* See {@link #send(ProducerRecord, Callback)} for details.

View File

@ -142,7 +142,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
}
@Override
public void initTransactions(boolean keepPreparedTxn) {
public void initTransactions() {
verifyNotClosed();
verifyNotFenced();
if (this.transactionInitialized) {
@ -200,18 +200,6 @@ public class MockProducer<K, V> implements Producer<K, V> {
this.sentOffsets = true;
}
@Override
public PreparedTxnState prepareTransaction() throws ProducerFencedException {
verifyNotClosed();
verifyNotFenced();
verifyTransactionsInitialized();
verifyTransactionInFlight();
// Return a new PreparedTxnState with mock values for producerId and epoch
// Using 1000L and (short)1 as arbitrary values for a valid PreparedTxnState
return new PreparedTxnState(1000L, (short) 1);
}
@Override
public void commitTransaction() throws ProducerFencedException {
verifyNotClosed();
@ -257,27 +245,6 @@ public class MockProducer<K, V> implements Producer<K, V> {
this.transactionInFlight = false;
}
@Override
public void completeTransaction(PreparedTxnState preparedTxnState) throws ProducerFencedException {
verifyNotClosed();
verifyNotFenced();
verifyTransactionsInitialized();
if (!this.transactionInFlight) {
throw new IllegalStateException("There is no prepared transaction to complete.");
}
// For testing purposes, we'll consider a prepared state with producerId=1000L and epoch=1 as valid
// This should match what's returned in prepareTransaction()
PreparedTxnState currentState = new PreparedTxnState(1000L, (short) 1);
if (currentState.equals(preparedTxnState)) {
commitTransaction();
} else {
abortTransaction();
}
}
private synchronized void verifyNotClosed() {
if (this.closed) {
throw new IllegalStateException("MockProducer is already closed.");

View File

@ -42,14 +42,7 @@ public interface Producer<K, V> extends Closeable {
/**
* See {@link KafkaProducer#initTransactions()}
*/
default void initTransactions() {
initTransactions(false);
}
/**
* See {@link KafkaProducer#initTransactions(boolean)}
*/
void initTransactions(boolean keepPreparedTxn);
void initTransactions();
/**
* See {@link KafkaProducer#beginTransaction()}
@ -62,11 +55,6 @@ public interface Producer<K, V> extends Closeable {
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata) throws ProducerFencedException;
/**
* See {@link KafkaProducer#prepareTransaction()}
*/
PreparedTxnState prepareTransaction() throws ProducerFencedException;
/**
* See {@link KafkaProducer#commitTransaction()}
*/
@ -77,11 +65,6 @@ public interface Producer<K, V> extends Closeable {
*/
void abortTransaction() throws ProducerFencedException;
/**
* See {@link KafkaProducer#completeTransaction(PreparedTxnState)}
*/
void completeTransaction(PreparedTxnState preparedTxnState) throws ProducerFencedException;
/**
* @see KafkaProducer#registerMetricForSubscription(KafkaMetric)
*/

View File

@ -320,9 +320,7 @@ public class TransactionManager {
.setTransactionalId(transactionalId)
.setTransactionTimeoutMs(transactionTimeoutMs)
.setProducerId(producerIdAndEpoch.producerId)
.setProducerEpoch(producerIdAndEpoch.epoch)
.setEnable2Pc(enable2PC)
.setKeepPreparedTxn(keepPreparedTxn);
.setProducerEpoch(producerIdAndEpoch.epoch);
InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
isEpochBump);

View File

@ -32,7 +32,6 @@ import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
@ -47,7 +46,6 @@ import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@ -78,7 +76,6 @@ import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.MetadataResponse;
@ -107,7 +104,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
@ -154,7 +150,6 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@ -165,7 +160,6 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
@ -1389,294 +1383,6 @@ public class KafkaProducerTest {
}
}
@ParameterizedTest
@CsvSource({
"true, false",
"true, true",
"false, true"
})
public void testInitTransactionsWithKeepPreparedTxnAndTwoPhaseCommit(boolean keepPreparedTxn, boolean enable2PC) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
if (enable2PC) {
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true);
}
Time time = new MockTime(1);
MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
MockClient client = new MockClient(time, metadata);
client.updateMetadata(initialUpdateResponse);
// Capture flags from the InitProducerIdRequest
boolean[] requestFlags = new boolean[2]; // [keepPreparedTxn, enable2Pc]
client.prepareResponse(
request -> request instanceof FindCoordinatorRequest &&
((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
FindCoordinatorResponse.prepareResponse(Errors.NONE, "test-txn-id", NODE));
client.prepareResponse(
request -> {
if (request instanceof InitProducerIdRequest) {
InitProducerIdRequest initRequest = (InitProducerIdRequest) request;
requestFlags[0] = initRequest.data().keepPreparedTxn();
requestFlags[1] = initRequest.data().enable2Pc();
return true;
}
return false;
},
initProducerIdResponse(1L, (short) 5, Errors.NONE));
try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
new StringSerializer(), metadata, client, null, time)) {
producer.initTransactions(keepPreparedTxn);
// Verify request flags match expected values
assertEquals(keepPreparedTxn, requestFlags[0],
"keepPreparedTxn flag should match input parameter");
assertEquals(enable2PC, requestFlags[1],
"enable2Pc flag should match producer configuration");
}
}
@Test
public void testPrepareTransactionSuccess() throws Exception {
StringSerializer serializer = new StringSerializer();
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
when(ctx.transactionManager.isTransactionV2Enabled()).thenReturn(true);
when(ctx.transactionManager.is2PCEnabled()).thenReturn(true);
when(ctx.sender.isRunning()).thenReturn(true);
doNothing().when(ctx.transactionManager).prepareTransaction();
PreparedTxnState expectedState = mock(PreparedTxnState.class);
when(ctx.transactionManager.preparedTransactionState()).thenReturn(expectedState);
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
PreparedTxnState returned = producer.prepareTransaction();
assertSame(expectedState, returned);
verify(ctx.transactionManager).prepareTransaction();
verify(ctx.accumulator).beginFlush();
verify(ctx.accumulator).awaitFlushCompletion();
}
}
@Test
public void testSendNotAllowedInPreparedTransactionState() throws Exception {
StringSerializer serializer = new StringSerializer();
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
String topic = "foo";
Cluster cluster = TestUtils.singletonCluster(topic, 1);
when(ctx.sender.isRunning()).thenReturn(true);
when(ctx.metadata.fetch()).thenReturn(cluster);
// Mock transaction manager to simulate being in a prepared state
when(ctx.transactionManager.isTransactional()).thenReturn(true);
when(ctx.transactionManager.isPrepared()).thenReturn(true);
// Create record to send
long timestamp = ctx.time.milliseconds();
ProducerRecord<String, String> record = new ProducerRecord<>(topic, 0, timestamp, "key", "value");
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
// Verify that sending a record throws IllegalStateException with the correct message
IllegalStateException exception = assertThrows(
IllegalStateException.class,
() -> producer.send(record)
);
assertTrue(exception.getMessage().contains("Cannot perform operation while the transaction is in a prepared state"));
// Verify transactionManager methods were called
verify(ctx.transactionManager).isTransactional();
verify(ctx.transactionManager).isPrepared();
// Verify that no message was actually sent (accumulator was not called)
verify(ctx.accumulator, never()).append(
eq(topic),
anyInt(),
anyLong(),
any(),
any(),
any(),
any(),
anyLong(),
anyLong(),
any()
);
}
}
@Test
public void testSendOffsetsNotAllowedInPreparedTransactionState() throws Exception {
StringSerializer serializer = new StringSerializer();
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
String topic = "foo";
Cluster cluster = TestUtils.singletonCluster(topic, 1);
when(ctx.sender.isRunning()).thenReturn(true);
when(ctx.metadata.fetch()).thenReturn(cluster);
// Mock transaction manager to simulate being in a prepared state
when(ctx.transactionManager.isTransactional()).thenReturn(true);
when(ctx.transactionManager.isPrepared()).thenReturn(true);
// Create consumer group metadata
String groupId = "test-group";
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(100L));
ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata(groupId);
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
// Verify that sending offsets throws IllegalStateException with the correct message
IllegalStateException exception = assertThrows(
IllegalStateException.class,
() -> producer.sendOffsetsToTransaction(offsets, groupMetadata)
);
assertTrue(exception.getMessage().contains("Cannot perform operation while the transaction is in a prepared state"));
// Verify transactionManager methods were called
verify(ctx.transactionManager).isTransactional();
verify(ctx.transactionManager).isPrepared();
// Verify that no offsets were actually sent
verify(ctx.transactionManager, never()).sendOffsetsToTransaction(
eq(offsets),
eq(groupMetadata)
);
}
}
@Test
public void testBeginTransactionNotAllowedInPreparedTransactionState() throws Exception {
StringSerializer serializer = new StringSerializer();
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
when(ctx.sender.isRunning()).thenReturn(true);
// Mock transaction manager to simulate being in a prepared state
when(ctx.transactionManager.isTransactional()).thenReturn(true);
when(ctx.transactionManager.isPrepared()).thenReturn(true);
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
// Verify that calling beginTransaction throws IllegalStateException with the correct message
IllegalStateException exception = assertThrows(
IllegalStateException.class,
producer::beginTransaction
);
assertTrue(exception.getMessage().contains("Cannot perform operation while the transaction is in a prepared state"));
// Verify transactionManager methods were called
verify(ctx.transactionManager).isTransactional();
verify(ctx.transactionManager).isPrepared();
}
}
@Test
public void testPrepareTransactionFailsWhen2PCDisabled() {
StringSerializer serializer = new StringSerializer();
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
// Disable 2PC
when(ctx.transactionManager.isTransactionV2Enabled()).thenReturn(true);
when(ctx.transactionManager.is2PCEnabled()).thenReturn(false);
when(ctx.sender.isRunning()).thenReturn(true);
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
assertThrows(
InvalidTxnStateException.class,
producer::prepareTransaction,
"prepareTransaction() should fail if 2PC is disabled"
);
}
}
@Test
public void testCompleteTransactionWithMatchingState() throws Exception {
StringSerializer serializer = new StringSerializer();
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
when(ctx.transactionManager.isPrepared()).thenReturn(true);
when(ctx.sender.isRunning()).thenReturn(true);
// Create prepared states with matching values
long producerId = 12345L;
short epoch = 5;
PreparedTxnState currentState = new PreparedTxnState(producerId, epoch);
PreparedTxnState inputState = new PreparedTxnState(producerId, epoch);
// Set up the transaction manager to return the prepared state
when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState);
// Should trigger commit when states match
TransactionalRequestResult commitResult = mock(TransactionalRequestResult.class);
when(ctx.transactionManager.beginCommit()).thenReturn(commitResult);
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
// Call completeTransaction with the matching state
producer.completeTransaction(inputState);
// Verify methods called in order
verify(ctx.transactionManager).isPrepared();
verify(ctx.transactionManager).preparedTransactionState();
verify(ctx.transactionManager).beginCommit();
// Verify abort was never called
verify(ctx.transactionManager, never()).beginAbort();
// Verify sender was woken up
verify(ctx.sender).wakeup();
}
}
@Test
public void testCompleteTransactionWithNonMatchingState() throws Exception {
StringSerializer serializer = new StringSerializer();
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
when(ctx.transactionManager.isPrepared()).thenReturn(true);
when(ctx.sender.isRunning()).thenReturn(true);
// Create txn prepared states with different values
long producerId = 12345L;
short epoch = 5;
PreparedTxnState currentState = new PreparedTxnState(producerId, epoch);
PreparedTxnState inputState = new PreparedTxnState(producerId + 1, epoch);
// Set up the transaction manager to return the prepared state
when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState);
// Should trigger abort when states don't match
TransactionalRequestResult abortResult = mock(TransactionalRequestResult.class);
when(ctx.transactionManager.beginAbort()).thenReturn(abortResult);
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
// Call completeTransaction with the non-matching state
producer.completeTransaction(inputState);
// Verify methods called in order
verify(ctx.transactionManager).isPrepared();
verify(ctx.transactionManager).preparedTransactionState();
verify(ctx.transactionManager).beginAbort();
// Verify commit was never called
verify(ctx.transactionManager, never()).beginCommit();
// Verify sender was woken up
verify(ctx.sender).wakeup();
}
}
@Test
public void testClusterAuthorizationFailure() throws Exception {
int maxBlockMs = 500;

View File

@ -23,7 +23,6 @@ import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.PreparedTxnState;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
@ -4025,56 +4024,6 @@ public class TransactionManagerTest {
assertFalse(transactionManager.hasOngoingTransaction());
}
@Test
public void testInitializeTransactionsWithKeepPreparedTxn() {
doInitTransactionsWith2PCEnabled(true);
runUntil(transactionManager::hasProducerId);
// Expect a bumped epoch in the response.
assertTrue(transactionManager.hasProducerId());
assertFalse(transactionManager.hasOngoingTransaction());
assertEquals(ongoingProducerId, transactionManager.producerIdAndEpoch().producerId);
assertEquals(bumpedOngoingEpoch, transactionManager.producerIdAndEpoch().epoch);
}
@Test
public void testPrepareTransaction() {
doInitTransactionsWith2PCEnabled(false);
runUntil(transactionManager::hasProducerId);
// Begin a transaction
transactionManager.beginTransaction();
assertTrue(transactionManager.hasOngoingTransaction());
// Add a partition to the transaction
transactionManager.maybeAddPartition(tp0);
// Capture the current producer ID and epoch before preparing the response
long producerId = transactionManager.producerIdAndEpoch().producerId;
short epoch = transactionManager.producerIdAndEpoch().epoch;
// Simulate a produce request
try {
// Prepare the response before sending to ensure it's ready
prepareProduceResponse(Errors.NONE, producerId, epoch);
appendToAccumulator(tp0);
// Wait until the request is processed
runUntil(() -> !client.hasPendingResponses());
} catch (InterruptedException e) {
fail("Unexpected interruption: " + e);
}
transactionManager.prepareTransaction();
assertTrue(transactionManager.isPrepared());
PreparedTxnState preparedState = transactionManager.preparedTransactionState();
// Validate the state contains the correct serialized producer ID and epoch
assertEquals(producerId + ":" + epoch, preparedState.toString());
assertEquals(producerId, preparedState.producerId());
assertEquals(epoch, preparedState.epoch());
}
private void prepareAddPartitionsToTxn(final Map<TopicPartition, Errors> errors) {
AddPartitionsToTxnResult result = AddPartitionsToTxnResponse.resultForTransaction(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID, errors);
AddPartitionsToTxnResponseData data = new AddPartitionsToTxnResponseData().setResultsByTopicV3AndBelow(result.topicResults()).setThrottleTimeMs(0);