mirror of https://github.com/apache/kafka.git
KAFKA-17898: Refine Epoch Bumping Logic (#17849)
With KAFKA-14562, we implemented epoch bump on both the client and the server. Mentioned below are the different epoch bump scenarios we have on hand after enabled tv2 Non-Transactional Producers • Epoch bumping is always allowed. • Different code paths are used to handle epoch bumping. Transactional Producers No Epoch Bump Allowed • coordinatorSupportsBumpingEpoch = false when initPIDVersion < 3 or initPIDVersion = null. Client-Triggered Epoch Bump Allowed • coordinatorSupportsBumpingEpoch = true when initPIDVersion >= 3. • TransactionVersion2Enabled = false when endTxnVersion < 5. Only Server-Triggered Epoch Bump Allowed • TransactionVersion2Enabled = true and endTxnVersion >= 5. We want to refine the code and make it more structured to correctly handle epoch bumping in the above mentioned cases. The changes made in this patch are: Rename epochBumpRequired to epochBumpTriggerRequired to symbolize a manual epoch bump request from the client Modify canEpochBump method according to the above mentioned scenarios Reviewers: Artem Livshits <alivshits@confluent.io>, Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
parent
7f8a592ad1
commit
4fc9e442c3
|
@ -191,7 +191,7 @@ public class TransactionManager {
|
|||
private volatile RuntimeException lastError = null;
|
||||
private volatile ProducerIdAndEpoch producerIdAndEpoch;
|
||||
private volatile boolean transactionStarted = false;
|
||||
private volatile boolean epochBumpRequired = false;
|
||||
private volatile boolean clientSideEpochBumpRequired = false;
|
||||
private volatile long latestFinalizedFeaturesEpoch = -1;
|
||||
private volatile boolean isTransactionV2Enabled = false;
|
||||
|
||||
|
@ -351,7 +351,7 @@ public class TransactionManager {
|
|||
enqueueRequest(handler);
|
||||
|
||||
// If an epoch bump is required for recovery, initialize the transaction after completing the EndTxn request.
|
||||
if (epochBumpRequired) {
|
||||
if (clientSideEpochBumpRequired) {
|
||||
return initializeTransactions(this.producerIdAndEpoch);
|
||||
}
|
||||
|
||||
|
@ -477,6 +477,26 @@ public class TransactionManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transitions to an abortable error state if the coordinator can handle an abortable error or
|
||||
* to a fatal error if not.
|
||||
*
|
||||
* @param abortableException The exception in case of an abortable error.
|
||||
* @param fatalException The exception in case of a fatal error.
|
||||
*/
|
||||
private void transitionToAbortableErrorOrFatalError(
|
||||
RuntimeException abortableException,
|
||||
RuntimeException fatalException
|
||||
) {
|
||||
if (canHandleAbortableError()) {
|
||||
if (needToTriggerEpochBumpFromClient())
|
||||
clientSideEpochBumpRequired = true;
|
||||
transitionToAbortableError(abortableException);
|
||||
} else {
|
||||
transitionToFatalError(fatalException);
|
||||
}
|
||||
}
|
||||
|
||||
// visible for testing
|
||||
synchronized boolean isPartitionAdded(TopicPartition partition) {
|
||||
return partitionsInTransaction.contains(partition);
|
||||
|
@ -544,8 +564,11 @@ public class TransactionManager {
|
|||
this.partitionsWithUnresolvedSequences.clear();
|
||||
}
|
||||
|
||||
synchronized void requestEpochBumpForPartition(TopicPartition tp) {
|
||||
epochBumpRequired = true;
|
||||
/**
|
||||
* This method is used to trigger an epoch bump for non-transactional idempotent producers.
|
||||
*/
|
||||
synchronized void requestIdempotentEpochBumpForPartition(TopicPartition tp) {
|
||||
clientSideEpochBumpRequired = true;
|
||||
this.partitionsToRewriteSequences.add(tp);
|
||||
}
|
||||
|
||||
|
@ -564,12 +587,12 @@ public class TransactionManager {
|
|||
}
|
||||
this.partitionsToRewriteSequences.clear();
|
||||
|
||||
epochBumpRequired = false;
|
||||
clientSideEpochBumpRequired = false;
|
||||
}
|
||||
|
||||
synchronized void bumpIdempotentEpochAndResetIdIfNeeded() {
|
||||
if (!isTransactional()) {
|
||||
if (epochBumpRequired) {
|
||||
if (clientSideEpochBumpRequired) {
|
||||
bumpIdempotentProducerEpoch();
|
||||
}
|
||||
if (currentState != State.INITIALIZING && !hasProducerId()) {
|
||||
|
@ -675,8 +698,8 @@ public class TransactionManager {
|
|||
|| exception instanceof UnsupportedVersionException) {
|
||||
transitionToFatalError(exception);
|
||||
} else if (isTransactional()) {
|
||||
if (canBumpEpoch() && !isCompleting()) {
|
||||
epochBumpRequired = true;
|
||||
if (needToTriggerEpochBumpFromClient() && !isCompleting()) {
|
||||
clientSideEpochBumpRequired = true;
|
||||
}
|
||||
transitionToAbortableError(exception);
|
||||
}
|
||||
|
@ -699,7 +722,7 @@ public class TransactionManager {
|
|||
|
||||
// If we fail with an OutOfOrderSequenceException, we have a gap in the log. Bump the epoch for this
|
||||
// partition, which will reset the sequence number to 0 and allow us to continue
|
||||
requestEpochBumpForPartition(batch.topicPartition);
|
||||
requestIdempotentEpochBumpForPartition(batch.topicPartition);
|
||||
} else if (exception instanceof UnknownProducerIdException) {
|
||||
// If we get an UnknownProducerId for a partition, then the broker has no state for that producer. It will
|
||||
// therefore accept a write with sequence number 0. We reset the sequence number for the partition here so
|
||||
|
@ -710,7 +733,7 @@ public class TransactionManager {
|
|||
} else {
|
||||
if (adjustSequenceNumbers) {
|
||||
if (!isTransactional()) {
|
||||
requestEpochBumpForPartition(batch.topicPartition);
|
||||
requestIdempotentEpochBumpForPartition(batch.topicPartition);
|
||||
} else {
|
||||
txnPartitionMap.adjustSequencesDueToFailedBatch(batch);
|
||||
}
|
||||
|
@ -760,21 +783,17 @@ public class TransactionManager {
|
|||
// For the transactional producer, we bump the epoch if possible, otherwise we transition to a fatal error
|
||||
String unackedMessagesErr = "The client hasn't received acknowledgment for some previously " +
|
||||
"sent messages and can no longer retry them. ";
|
||||
if (canBumpEpoch()) {
|
||||
epochBumpRequired = true;
|
||||
KafkaException exception = new KafkaException(unackedMessagesErr + "It is safe to abort " +
|
||||
KafkaException abortableException = new KafkaException(unackedMessagesErr + "It is safe to abort " +
|
||||
"the transaction and continue.");
|
||||
transitionToAbortableError(exception);
|
||||
} else {
|
||||
KafkaException exception = new KafkaException(unackedMessagesErr + "It isn't safe to continue.");
|
||||
transitionToFatalError(exception);
|
||||
}
|
||||
KafkaException fatalException = new KafkaException(unackedMessagesErr + "It isn't safe to continue.");
|
||||
|
||||
transitionToAbortableErrorOrFatalError(abortableException, fatalException);
|
||||
} else {
|
||||
// For the idempotent producer, bump the epoch
|
||||
log.info("No inflight batches remaining for {}, last ack'd sequence for partition is {}, next sequence is {}. " +
|
||||
"Going to bump epoch and reset sequence numbers.", topicPartition,
|
||||
lastAckedSequence(topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER), sequenceNumber(topicPartition));
|
||||
requestEpochBumpForPartition(topicPartition);
|
||||
requestIdempotentEpochBumpForPartition(topicPartition);
|
||||
}
|
||||
|
||||
iter.remove();
|
||||
|
@ -943,7 +962,7 @@ public class TransactionManager {
|
|||
if (isTransactional()) {
|
||||
txnPartitionMap.startSequencesAtBeginning(batch.topicPartition, this.producerIdAndEpoch);
|
||||
} else {
|
||||
requestEpochBumpForPartition(batch.topicPartition);
|
||||
requestIdempotentEpochBumpForPartition(batch.topicPartition);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -951,7 +970,7 @@ public class TransactionManager {
|
|||
if (!isTransactional()) {
|
||||
// For the idempotent producer, always retry UNKNOWN_PRODUCER_ID errors. If the batch has the current
|
||||
// producer ID and epoch, request a bump of the epoch. Otherwise just retry the produce.
|
||||
requestEpochBumpForPartition(batch.topicPartition);
|
||||
requestIdempotentEpochBumpForPartition(batch.topicPartition);
|
||||
return true;
|
||||
}
|
||||
} else if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER) {
|
||||
|
@ -967,7 +986,7 @@ public class TransactionManager {
|
|||
// and wait to see if the sequence resolves
|
||||
if (!hasUnresolvedSequence(batch.topicPartition) ||
|
||||
isNextSequenceForUnresolvedPartition(batch.topicPartition, batch.baseSequence())) {
|
||||
requestEpochBumpForPartition(batch.topicPartition);
|
||||
requestIdempotentEpochBumpForPartition(batch.topicPartition);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -1164,23 +1183,59 @@ public class TransactionManager {
|
|||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if an epoch bump can be triggered manually based on the api versions.
|
||||
*
|
||||
* <b>NOTE:</b>
|
||||
* This method should only be used for transactional producers.
|
||||
* For non-transactional producers epoch bumping is always allowed.
|
||||
*
|
||||
* <ol>
|
||||
* <li><b>Client-Triggered Epoch Bump</b>:
|
||||
* If the coordinator supports epoch bumping (initProducerIdVersion.maxVersion() >= 3),
|
||||
* client-triggered epoch bumping is allowed, returns true.
|
||||
* <code>clientSideEpochBumpTriggerRequired</code> must be set to true in this case.</li>
|
||||
*
|
||||
* <li><b>No Epoch Bump Allowed</b>:
|
||||
* If the coordinator does not support epoch bumping, returns false.</li>
|
||||
*
|
||||
* <li><b>Server-Triggered Only</b>:
|
||||
* When TransactionV2 is enabled, epoch bumping is handled automatically
|
||||
* by the server in EndTxn, so manual epoch bumping is not required, returns false.</li>
|
||||
* </ol>
|
||||
*
|
||||
* @return true if a client-triggered epoch bump is allowed, otherwise false.
|
||||
*/
|
||||
// package-private for testing
|
||||
boolean canBumpEpoch() {
|
||||
if (!isTransactional()) {
|
||||
return true;
|
||||
boolean needToTriggerEpochBumpFromClient() {
|
||||
return coordinatorSupportsBumpingEpoch && !isTransactionV2Enabled;
|
||||
}
|
||||
|
||||
return coordinatorSupportsBumpingEpoch;
|
||||
/**
|
||||
* Determines if the coordinator can handle an abortable error.
|
||||
* Recovering from an abortable error requires an epoch bump which can be triggered by the client
|
||||
* or automatically taken care of at the end of every transaction (Transaction V2).
|
||||
* Use <code>needToTriggerEpochBumpFromClient</code> to check whether the epoch bump needs to be triggered
|
||||
* manually.
|
||||
*
|
||||
* <b>NOTE:</b>
|
||||
* This method should only be used for transactional producers.
|
||||
* There is no concept of abortable errors for idempotent producers.
|
||||
*
|
||||
* @return true if an abortable error can be handled, otherwise false.
|
||||
*/
|
||||
boolean canHandleAbortableError() {
|
||||
return coordinatorSupportsBumpingEpoch || isTransactionV2Enabled;
|
||||
}
|
||||
|
||||
private void completeTransaction() {
|
||||
if (epochBumpRequired) {
|
||||
if (clientSideEpochBumpRequired) {
|
||||
transitionTo(State.INITIALIZING);
|
||||
} else {
|
||||
transitionTo(State.READY);
|
||||
}
|
||||
lastError = null;
|
||||
epochBumpRequired = false;
|
||||
clientSideEpochBumpRequired = false;
|
||||
transactionStarted = false;
|
||||
newPartitionsInTransaction.clear();
|
||||
pendingPartitionsInTransaction.clear();
|
||||
|
@ -1209,9 +1264,23 @@ public class TransactionManager {
|
|||
transitionToAbortableError(e);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if an error should be treated as abortable or fatal, based on transaction state and configuration.
|
||||
* <ol><l> NOTE: Only use this method for transactional producers </l></ol>
|
||||
*
|
||||
* - <b>Abortable Error</b>:
|
||||
* An abortable error can be handled effectively, if epoch bumping is supported.
|
||||
* 1) If transactionV2 is enabled, automatic epoch bumping happens at the end of every transaction.
|
||||
* 2) If the client can trigger an epoch bump, the abortable error can be handled.
|
||||
*
|
||||
*- <b>Fatal Error</b>:
|
||||
* If epoch bumping is not supported, the system cannot recover and the error must be treated as fatal.
|
||||
* @param e the error to determine as either abortable or fatal.
|
||||
*/
|
||||
void abortableErrorIfPossible(RuntimeException e) {
|
||||
if (canBumpEpoch()) {
|
||||
epochBumpRequired = true;
|
||||
if (canHandleAbortableError()) {
|
||||
if (needToTriggerEpochBumpFromClient())
|
||||
clientSideEpochBumpRequired = true;
|
||||
abortableError(e);
|
||||
} else {
|
||||
fatalError(e);
|
||||
|
|
|
@ -678,7 +678,7 @@ public class TransactionManagerTest {
|
|||
assertEquals(2, transactionManager.sequenceNumber(tp0));
|
||||
|
||||
// The producerId might be reset due to a failure on another partition
|
||||
transactionManager.requestEpochBumpForPartition(tp1);
|
||||
transactionManager.requestIdempotentEpochBumpForPartition(tp1);
|
||||
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
|
||||
initializeIdempotentProducerId(producerId + 1, (short) 0);
|
||||
|
||||
|
@ -780,6 +780,21 @@ public class TransactionManagerTest {
|
|||
return batch;
|
||||
}
|
||||
|
||||
private ProducerBatch writeTransactionalBatchWithValue(
|
||||
TransactionManager manager,
|
||||
TopicPartition tp,
|
||||
String value
|
||||
) {
|
||||
manager.maybeUpdateProducerIdAndEpoch(tp);
|
||||
int seq = manager.sequenceNumber(tp);
|
||||
manager.incrementSequenceNumber(tp, 1);
|
||||
ProducerBatch batch = batchWithValue(tp, value);
|
||||
batch.setProducerState(manager.producerIdAndEpoch(), seq, true);
|
||||
manager.addInFlightBatch(batch);
|
||||
batch.close();
|
||||
return batch;
|
||||
}
|
||||
|
||||
private ProducerBatch batchWithValue(TopicPartition tp, String value) {
|
||||
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(64),
|
||||
Compression.NONE, TimestampType.CREATE_TIME, 0L);
|
||||
|
@ -814,7 +829,7 @@ public class TransactionManagerTest {
|
|||
transactionManager.incrementSequenceNumber(tp1, 3);
|
||||
assertEquals(transactionManager.sequenceNumber(tp1), 3);
|
||||
|
||||
transactionManager.requestEpochBumpForPartition(tp0);
|
||||
transactionManager.requestIdempotentEpochBumpForPartition(tp0);
|
||||
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
|
||||
assertEquals(transactionManager.sequenceNumber(tp0), 0);
|
||||
assertEquals(transactionManager.sequenceNumber(tp1), 3);
|
||||
|
@ -2948,7 +2963,7 @@ public class TransactionManagerTest {
|
|||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testEpochBumpAfterLastInflightBatchFails(boolean transactionV2Enabled) {
|
||||
public void testEpochBumpAfterLastInFlightBatchFailsIdempotentProducer(boolean transactionV2Enabled) {
|
||||
initializeTransactionManager(Optional.empty(), transactionV2Enabled);
|
||||
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId, epoch);
|
||||
initializeIdempotentProducerId(producerId, epoch);
|
||||
|
@ -2980,6 +2995,39 @@ public class TransactionManagerTest {
|
|||
assertEquals(0, transactionManager.sequenceNumber(tp0));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testMaybeResolveSequencesTransactionalProducer(boolean transactionV2Enabled) throws Exception {
|
||||
initializeTransactionManager(Optional.of(transactionalId), transactionV2Enabled);
|
||||
|
||||
// Initialize transaction with initial producer ID and epoch.
|
||||
doInitTransactions(producerId, epoch);
|
||||
|
||||
transactionManager.beginTransaction();
|
||||
transactionManager.maybeAddPartition(tp0);
|
||||
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
|
||||
runUntil(() -> transactionManager.isPartitionAdded(tp0));
|
||||
|
||||
ProducerBatch b1 = writeTransactionalBatchWithValue(transactionManager, tp0, "1");
|
||||
assertEquals(Integer.valueOf(1), transactionManager.sequenceNumber(tp0));
|
||||
|
||||
transactionManager.markSequenceUnresolved(b1);
|
||||
assertTrue(transactionManager.hasUnresolvedSequences());
|
||||
|
||||
transactionManager.handleFailedBatch(b1, new TimeoutException(), false);
|
||||
// Call maybeResolveSequences to trigger resolution logic
|
||||
transactionManager.maybeResolveSequences();
|
||||
|
||||
// Verify the type of error state the transaction is in.
|
||||
if (transactionManager.isTransactionV2Enabled() || transactionManager.needToTriggerEpochBumpFromClient()) {
|
||||
// Expected to throw an abortable error when epoch bumping is allowed
|
||||
assertTrue(transactionManager.hasAbortableError());
|
||||
} else {
|
||||
// Expected to throw a fatal error when epoch bumping is not allowed
|
||||
assertTrue(transactionManager.hasFatalError());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEpochUpdateAfterBumpFromEndTxnResponseInV2() throws InterruptedException {
|
||||
initializeTransactionManager(Optional.of(transactionalId), true);
|
||||
|
@ -3506,13 +3554,13 @@ public class TransactionManagerTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testCanBumpEpochDuringCoordinatorDisconnect() {
|
||||
public void testNeedToTriggerEpochBumpFromClientDuringCoordinatorDisconnect() {
|
||||
doInitTransactions(0, (short) 0);
|
||||
runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
|
||||
assertTrue(transactionManager.canBumpEpoch());
|
||||
assertTrue(transactionManager.needToTriggerEpochBumpFromClient());
|
||||
|
||||
apiVersions.remove(transactionManager.coordinator(CoordinatorType.TRANSACTION).idString());
|
||||
assertTrue(transactionManager.canBumpEpoch());
|
||||
assertTrue(transactionManager.needToTriggerEpochBumpFromClient());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
|
Loading…
Reference in New Issue