KAFKA-19082:[3/4] Add prepare txn method (KIP-939) (#19539)

This patch belongs to the client-side changes required to enable 2PC as
a part of KIP-939.

New method is added to KafkaProducer:  public PreparedTxnState
prepareTransaction()

This would flush all the pending messages and transition the producer
into a mode where only .commitTransaction, .abortTransaction, or
.completeTransaction could be called (calling other methods,  e.g. .send
, in that mode would result in IllegalStateException being thrown).  If
the call is successful (all messages successfully got flushed to all
partitions) the transaction is prepared.  If the 2PC is not enabled, we
return the INVALID_TXN_STATE error.

A new state is added to the TransactionManager called
PREPARING_TRANSACTION. There are two situations where we would move into
this state:
1) When prepareTransaction() is called during an ongoing transaction
with 2PC enabled
2) When initTransaction(true) is called after a client failure
(keepPrepared = true)

Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan
 <jolshan@confluent.io>
This commit is contained in:
Ritika Reddy 2025-05-14 15:11:39 -07:00 committed by GitHub
parent 37963256d1
commit 7f02c263a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 400 additions and 37 deletions

View File

@ -50,11 +50,13 @@ import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
@ -665,6 +667,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
public void initTransactions(boolean keepPreparedTxn) {
throwIfNoTransactionManager();
throwIfProducerClosed();
throwIfInPreparedState();
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
sender.wakeup();
@ -691,6 +694,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
public void beginTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
throwIfProducerClosed();
throwIfInPreparedState();
long now = time.nanoseconds();
transactionManager.beginTransaction();
producerMetrics.recordBeginTxn(time.nanoseconds() - now);
@ -750,6 +754,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
throwIfInvalidGroupMetadata(groupMetadata);
throwIfNoTransactionManager();
throwIfProducerClosed();
throwIfInPreparedState();
if (!offsets.isEmpty()) {
long start = time.nanoseconds();
@ -760,6 +765,48 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
}
/**
* Prepares the current transaction for a two-phase commit. This method will flush all pending messages
* and transition the producer into a mode where only {@link #commitTransaction()}, {@link #abortTransaction()},
* or completeTransaction(PreparedTxnState) may be called.
* <p>
* This method is used as part of a two-phase commit protocol:
* <ol>
* <li>Prepare the transaction by calling this method. This returns a {@link PreparedTxnState} if successful.</li>
* <li>Make any external system changes that need to be atomic with this transaction.</li>
* <li>Complete the transaction by calling {@link #commitTransaction()}, {@link #abortTransaction()} or
* completeTransaction(PreparedTxnState).</li>
* </ol>
*
* @return the prepared transaction state to use when completing the transaction
*
* @throws IllegalStateException if no transactional.id has been configured or no transaction has been started yet.
* @throws InvalidTxnStateException if the producer is not in a state where preparing
* a transaction is possible or 2PC is not enabled.
* @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
* @throws UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized. See the exception for more details
* @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
* @throws TimeoutException if the time taken for preparing the transaction has surpassed <code>max.block.ms</code>
* @throws InterruptException if the thread is interrupted while blocked
*/
@Override
public PreparedTxnState prepareTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
throwIfProducerClosed();
throwIfInPreparedState();
if (!transactionManager.is2PCEnabled()) {
throw new InvalidTxnStateException("Cannot prepare a transaction when 2PC is not enabled");
}
long now = time.nanoseconds();
flush();
transactionManager.prepareTransaction();
producerMetrics.recordPrepareTxn(time.nanoseconds() - now);
return transactionManager.preparedTransactionState();
}
/**
* Commits the ongoing transaction. This method will flush any unsent records before actually committing the transaction.
* <p>
@ -967,6 +1014,23 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
throw new IllegalStateException("Cannot perform operation after producer has been closed");
}
/**
* Throws an exception if the transaction is in a prepared state.
* In a two-phase commit (2PC) flow, once a transaction enters the prepared state,
* only commit, abort, or complete operations are allowed.
*
* @throws IllegalStateException if any other operation is attempted in the prepared state.
*/
private void throwIfInPreparedState() {
if (transactionManager != null &&
transactionManager.isTransactional() &&
transactionManager.isPrepared()
) {
throw new IllegalStateException("Cannot perform operation while the transaction is in a prepared state. " +
"Only commitTransaction(), abortTransaction(), or completeTransaction() are permitted.");
}
}
/**
* Implementation of asynchronously send a record to a topic.
*/
@ -978,6 +1042,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
try {
throwIfProducerClosed();
throwIfInPreparedState();
// first make sure the metadata for the topic is available
long nowMs = time.milliseconds();
ClusterAndWaitTime clusterAndWaitTime;

View File

@ -200,6 +200,18 @@ public class MockProducer<K, V> implements Producer<K, V> {
this.sentOffsets = true;
}
@Override
public PreparedTxnState prepareTransaction() throws ProducerFencedException {
verifyNotClosed();
verifyNotFenced();
verifyTransactionsInitialized();
verifyTransactionInFlight();
// Return a new PreparedTxnState with mock values for producerId and epoch
// Using 1000L and (short)1 as arbitrary values for a valid PreparedTxnState
return new PreparedTxnState(1000L, (short) 1);
}
@Override
public void commitTransaction() throws ProducerFencedException {
verifyNotClosed();

View File

@ -62,6 +62,11 @@ public interface Producer<K, V> extends Closeable {
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata) throws ProducerFencedException;
/**
* See {@link KafkaProducer#prepareTransaction()}
*/
PreparedTxnState prepareTransaction() throws ProducerFencedException;
/**
* See {@link KafkaProducer#commitTransaction()}
*/

View File

@ -33,6 +33,7 @@ public class KafkaProducerMetrics implements AutoCloseable {
private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
private static final String TXN_COMMIT = "txn-commit";
private static final String TXN_ABORT = "txn-abort";
private static final String TXN_PREPARE = "txn-prepare";
private static final String TOTAL_TIME_SUFFIX = "-time-ns-total";
private static final String METADATA_WAIT = "metadata-wait";
@ -44,6 +45,7 @@ public class KafkaProducerMetrics implements AutoCloseable {
private final Sensor sendOffsetsSensor;
private final Sensor commitTxnSensor;
private final Sensor abortTxnSensor;
private final Sensor prepareTxnSensor;
private final Sensor metadataWaitSensor;
public KafkaProducerMetrics(Metrics metrics) {
@ -73,6 +75,10 @@ public class KafkaProducerMetrics implements AutoCloseable {
TXN_ABORT,
"Total time producer has spent in abortTransaction in nanoseconds."
);
prepareTxnSensor = newLatencySensor(
TXN_PREPARE,
"Total time producer has spent in prepareTransaction in nanoseconds."
);
metadataWaitSensor = newLatencySensor(
METADATA_WAIT,
"Total time producer has spent waiting on topic metadata in nanoseconds."
@ -87,6 +93,7 @@ public class KafkaProducerMetrics implements AutoCloseable {
removeMetric(TXN_SEND_OFFSETS);
removeMetric(TXN_COMMIT);
removeMetric(TXN_ABORT);
removeMetric(TXN_PREPARE);
removeMetric(METADATA_WAIT);
}
@ -114,6 +121,10 @@ public class KafkaProducerMetrics implements AutoCloseable {
abortTxnSensor.record(duration);
}
public void recordPrepareTxn(long duration) {
prepareTxnSensor.record(duration);
}
public void recordMetadataWait(long duration) {
metadataWaitSensor.record(duration);
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.PreparedTxnState;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
@ -144,12 +145,14 @@ public class TransactionManager {
private volatile long latestFinalizedFeaturesEpoch = -1;
private volatile boolean isTransactionV2Enabled = false;
private final boolean enable2PC;
private volatile PreparedTxnState preparedTxnState;
private enum State {
UNINITIALIZED,
INITIALIZING,
READY,
IN_TRANSACTION,
PREPARED_TRANSACTION,
COMMITTING_TRANSACTION,
ABORTING_TRANSACTION,
ABORTABLE_ERROR,
@ -165,10 +168,12 @@ public class TransactionManager {
return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
case IN_TRANSACTION:
return source == READY;
case PREPARED_TRANSACTION:
return source == IN_TRANSACTION || source == INITIALIZING;
case COMMITTING_TRANSACTION:
return source == IN_TRANSACTION;
return source == IN_TRANSACTION || source == PREPARED_TRANSACTION;
case ABORTING_TRANSACTION:
return source == IN_TRANSACTION || source == ABORTABLE_ERROR;
return source == IN_TRANSACTION || source == PREPARED_TRANSACTION || source == ABORTABLE_ERROR;
case ABORTABLE_ERROR:
return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTABLE_ERROR
|| source == INITIALIZING;
@ -223,6 +228,7 @@ public class TransactionManager {
this.txnPartitionMap = new TxnPartitionMap(logContext);
this.apiVersions = apiVersions;
this.enable2PC = enable2PC;
this.preparedTxnState = new PreparedTxnState();
}
/**
@ -241,7 +247,7 @@ public class TransactionManager {
* transaction manager. The {@link Producer} API calls that perform a state transition include:
*
* <ul>
* <li>{@link Producer#initTransactions()} calls {@link #initializeTransactions()}</li>
* <li>{@link Producer#initTransactions()} calls {@link #initializeTransactions(boolean)}</li>
* <li>{@link Producer#beginTransaction()} calls {@link #beginTransaction()}</li>
* <li>{@link Producer#commitTransaction()}} calls {@link #beginCommit()}</li>
* <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
@ -330,6 +336,22 @@ public class TransactionManager {
transitionTo(State.IN_TRANSACTION);
}
/**
* Prepare a transaction for a two-phase commit.
* This transitions the transaction to the PREPARED_TRANSACTION state.
* The preparedTxnState is set with the current producer ID and epoch.
*/
public synchronized void prepareTransaction() {
ensureTransactional();
throwIfPendingState("prepareTransaction");
maybeFailWithError();
transitionTo(State.PREPARED_TRANSACTION);
this.preparedTxnState = new PreparedTxnState(
this.producerIdAndEpoch.producerId + ":" +
this.producerIdAndEpoch.epoch
);
}
public synchronized TransactionalRequestResult beginCommit() {
return handleCachedTransactionRequestResult(() -> {
maybeFailWithError();
@ -487,6 +509,10 @@ public class TransactionManager {
return isTransactionV2Enabled;
}
public boolean is2PCEnabled() {
return enable2PC;
}
synchronized boolean hasPartitionsToAdd() {
return !newPartitionsInTransaction.isEmpty() || !pendingPartitionsInTransaction.isEmpty();
}
@ -1058,6 +1084,15 @@ public class TransactionManager {
return isTransactional() && currentState == State.INITIALIZING;
}
/**
* Check if the transaction is in the prepared state.
*
* @return true if the current state is PREPARED_TRANSACTION
*/
public synchronized boolean isPrepared() {
return currentState == State.PREPARED_TRANSACTION;
}
void handleCoordinatorReady() {
NodeApiVersions nodeApiVersions = transactionCoordinator != null ?
apiVersions.get(transactionCoordinator.idString()) :
@ -1453,6 +1488,7 @@ public class TransactionManager {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.data().producerId(),
initProducerIdResponse.data().producerEpoch());
setProducerIdAndEpoch(producerIdAndEpoch);
// TO_DO Add code to handle transition to prepared_txn when keepPrepared = true
transitionTo(State.READY);
lastError = null;
if (this.isEpochBump) {
@ -1904,5 +1940,14 @@ public class TransactionManager {
}
}
/**
* Returns a PreparedTxnState object containing the producer ID and epoch
* of the ongoing transaction.
* This is used when preparing a transaction for a two-phase commit.
*
* @return a PreparedTxnState with the current producer ID and epoch
*/
public PreparedTxnState preparedTransactionState() {
return this.preparedTxnState;
}
}

View File

@ -46,6 +46,7 @@ import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@ -152,6 +153,7 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@ -162,6 +164,7 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
@ -1439,6 +1442,165 @@ public class KafkaProducerTest {
}
}
@Test
public void testPrepareTransactionSuccess() throws Exception {
StringSerializer serializer = new StringSerializer();
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
when(ctx.transactionManager.isTransactionV2Enabled()).thenReturn(true);
when(ctx.transactionManager.is2PCEnabled()).thenReturn(true);
when(ctx.sender.isRunning()).thenReturn(true);
doNothing().when(ctx.transactionManager).prepareTransaction();
PreparedTxnState expectedState = mock(PreparedTxnState.class);
when(ctx.transactionManager.preparedTransactionState()).thenReturn(expectedState);
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
PreparedTxnState returned = producer.prepareTransaction();
assertSame(expectedState, returned);
verify(ctx.transactionManager).prepareTransaction();
verify(ctx.accumulator).beginFlush();
verify(ctx.accumulator).awaitFlushCompletion();
}
}
@Test
public void testSendNotAllowedInPreparedTransactionState() throws Exception {
StringSerializer serializer = new StringSerializer();
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
String topic = "foo";
Cluster cluster = TestUtils.singletonCluster(topic, 1);
when(ctx.sender.isRunning()).thenReturn(true);
when(ctx.metadata.fetch()).thenReturn(cluster);
// Mock transaction manager to simulate being in a prepared state
when(ctx.transactionManager.isTransactional()).thenReturn(true);
when(ctx.transactionManager.isPrepared()).thenReturn(true);
// Create record to send
long timestamp = ctx.time.milliseconds();
ProducerRecord<String, String> record = new ProducerRecord<>(topic, 0, timestamp, "key", "value");
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
// Verify that sending a record throws IllegalStateException with the correct message
IllegalStateException exception = assertThrows(
IllegalStateException.class,
() -> producer.send(record)
);
assertTrue(exception.getMessage().contains("Cannot perform operation while the transaction is in a prepared state"));
// Verify transactionManager methods were called
verify(ctx.transactionManager).isTransactional();
verify(ctx.transactionManager).isPrepared();
// Verify that no message was actually sent (accumulator was not called)
verify(ctx.accumulator, never()).append(
eq(topic),
anyInt(),
anyLong(),
any(),
any(),
any(),
any(),
anyLong(),
anyLong(),
any()
);
}
}
@Test
public void testSendOffsetsNotAllowedInPreparedTransactionState() throws Exception {
StringSerializer serializer = new StringSerializer();
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
String topic = "foo";
Cluster cluster = TestUtils.singletonCluster(topic, 1);
when(ctx.sender.isRunning()).thenReturn(true);
when(ctx.metadata.fetch()).thenReturn(cluster);
// Mock transaction manager to simulate being in a prepared state
when(ctx.transactionManager.isTransactional()).thenReturn(true);
when(ctx.transactionManager.isPrepared()).thenReturn(true);
// Create consumer group metadata
String groupId = "test-group";
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(100L));
ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata(groupId);
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
// Verify that sending offsets throws IllegalStateException with the correct message
IllegalStateException exception = assertThrows(
IllegalStateException.class,
() -> producer.sendOffsetsToTransaction(offsets, groupMetadata)
);
assertTrue(exception.getMessage().contains("Cannot perform operation while the transaction is in a prepared state"));
// Verify transactionManager methods were called
verify(ctx.transactionManager).isTransactional();
verify(ctx.transactionManager).isPrepared();
// Verify that no offsets were actually sent
verify(ctx.transactionManager, never()).sendOffsetsToTransaction(
eq(offsets),
eq(groupMetadata)
);
}
}
@Test
public void testBeginTransactionNotAllowedInPreparedTransactionState() throws Exception {
StringSerializer serializer = new StringSerializer();
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
when(ctx.sender.isRunning()).thenReturn(true);
// Mock transaction manager to simulate being in a prepared state
when(ctx.transactionManager.isTransactional()).thenReturn(true);
when(ctx.transactionManager.isPrepared()).thenReturn(true);
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
// Verify that calling beginTransaction throws IllegalStateException with the correct message
IllegalStateException exception = assertThrows(
IllegalStateException.class,
producer::beginTransaction
);
assertTrue(exception.getMessage().contains("Cannot perform operation while the transaction is in a prepared state"));
// Verify transactionManager methods were called
verify(ctx.transactionManager).isTransactional();
verify(ctx.transactionManager).isPrepared();
}
}
@Test
public void testPrepareTransactionFailsWhen2PCDisabled() {
StringSerializer serializer = new StringSerializer();
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
// Disable 2PC
when(ctx.transactionManager.isTransactionV2Enabled()).thenReturn(true);
when(ctx.transactionManager.is2PCEnabled()).thenReturn(false);
when(ctx.sender.isRunning()).thenReturn(true);
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
assertThrows(
InvalidTxnStateException.class,
producer::prepareTransaction,
"prepareTransaction() should fail if 2PC is disabled"
);
}
}
@Test
public void testClusterAuthorizationFailure() throws Exception {
int maxBlockMs = 500;

View File

@ -23,6 +23,7 @@ import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.PreparedTxnState;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
@ -138,6 +139,8 @@ public class TransactionManagerTest {
private final TopicPartition tp1 = new TopicPartition(topic, 1);
private final long producerId = 13131L;
private final short epoch = 1;
private final long ongoingProducerId = 999L;
private final short bumpedOngoingEpoch = 11;
private final String consumerGroupId = "myConsumerGroup";
private final String memberId = "member";
private final int generationId = 5;
@ -4022,6 +4025,56 @@ public class TransactionManagerTest {
assertFalse(transactionManager.hasOngoingTransaction());
}
@Test
public void testInitializeTransactionsWithKeepPreparedTxn() {
doInitTransactionsWith2PCEnabled(true);
runUntil(transactionManager::hasProducerId);
// Expect a bumped epoch in the response.
assertTrue(transactionManager.hasProducerId());
assertFalse(transactionManager.hasOngoingTransaction());
assertEquals(ongoingProducerId, transactionManager.producerIdAndEpoch().producerId);
assertEquals(bumpedOngoingEpoch, transactionManager.producerIdAndEpoch().epoch);
}
@Test
public void testPrepareTransaction() {
doInitTransactionsWith2PCEnabled(false);
runUntil(transactionManager::hasProducerId);
// Begin a transaction
transactionManager.beginTransaction();
assertTrue(transactionManager.hasOngoingTransaction());
// Add a partition to the transaction
transactionManager.maybeAddPartition(tp0);
// Capture the current producer ID and epoch before preparing the response
long producerId = transactionManager.producerIdAndEpoch().producerId;
short epoch = transactionManager.producerIdAndEpoch().epoch;
// Simulate a produce request
try {
// Prepare the response before sending to ensure it's ready
prepareProduceResponse(Errors.NONE, producerId, epoch);
appendToAccumulator(tp0);
// Wait until the request is processed
runUntil(() -> !client.hasPendingResponses());
} catch (InterruptedException e) {
fail("Unexpected interruption: " + e);
}
transactionManager.prepareTransaction();
assertTrue(transactionManager.isPrepared());
PreparedTxnState preparedState = transactionManager.preparedTransactionState();
// Validate the state contains the correct serialized producer ID and epoch
assertEquals(producerId + ":" + epoch, preparedState.toString());
assertEquals(producerId, preparedState.producerId());
assertEquals(epoch, preparedState.epoch());
}
private void prepareAddPartitionsToTxn(final Map<TopicPartition, Errors> errors) {
AddPartitionsToTxnResult result = AddPartitionsToTxnResponse.resultForTransaction(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID, errors);
AddPartitionsToTxnResponseData data = new AddPartitionsToTxnResponseData().setResultsByTopicV3AndBelow(result.topicResults()).setThrottleTimeMs(0);
@ -4361,6 +4414,48 @@ public class TransactionManagerTest {
assertTrue(result.isAcked());
}
private void doInitTransactionsWith2PCEnabled(boolean keepPrepared) {
initializeTransactionManager(Optional.of(transactionalId), true, true);
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPrepared);
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
if (keepPrepared) {
// Simulate an ongoing prepared transaction (ongoingProducerId != -1).
short ongoingEpoch = bumpedOngoingEpoch - 1;
prepareInitPidResponse(
Errors.NONE,
false,
ongoingProducerId,
bumpedOngoingEpoch,
true,
true,
ongoingProducerId,
ongoingEpoch
);
} else {
prepareInitPidResponse(
Errors.NONE,
false,
producerId,
epoch,
false,
true,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH
);
}
runUntil(transactionManager::hasProducerId);
transactionManager.maybeUpdateTransactionV2Enabled(true);
result.await();
assertTrue(result.isSuccessful());
assertTrue(result.isAcked());
}
private void assertAbortableError(Class<? extends RuntimeException> cause) {
try {
transactionManager.beginCommit();
@ -4411,39 +4506,6 @@ public class TransactionManagerTest {
ProducerTestUtils.runUntil(sender, condition);
}
@Test
public void testInitializeTransactionsWithKeepPreparedTxn() {
initializeTransactionManager(Optional.of(transactionalId), true, true);
client.prepareResponse(
FindCoordinatorResponse.prepareResponse(Errors.NONE, transactionalId, brokerNode)
);
// Simulate an ongoing prepared transaction (ongoingProducerId != -1).
long ongoingProducerId = 999L;
short ongoingEpoch = 10;
short bumpedEpoch = 11;
prepareInitPidResponse(
Errors.NONE,
false,
ongoingProducerId,
bumpedEpoch,
true,
true,
ongoingProducerId,
ongoingEpoch
);
transactionManager.initializeTransactions(true);
runUntil(transactionManager::hasProducerId);
assertTrue(transactionManager.hasProducerId());
assertFalse(transactionManager.hasOngoingTransaction());
assertEquals(ongoingProducerId, transactionManager.producerIdAndEpoch().producerId);
assertEquals(bumpedEpoch, transactionManager.producerIdAndEpoch().epoch);
}
/**
* This subclass exists only to optionally change the default behavior related to poisoning the state
* on invalid state transition attempts.