mirror of https://github.com/apache/kafka.git
KAFKA-19690 Add epoch check before verification guard check to prevent unexpected fatal error (#20618)
CI / build (push) Has been cancelled
Details
CI / build (push) Has been cancelled
Details
Cherry-pick changes (#20534) to 4.1 Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
ea3d231185
commit
a10c1f3ea1
|
@ -4786,6 +4786,96 @@ class UnifiedLogTest {
|
||||||
|
|
||||||
(log, segmentWithOverflow)
|
(log, segmentWithOverflow)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testStaleProducerEpochReturnsRecoverableErrorForTV1Clients(): Unit = {
|
||||||
|
// Producer epoch gets incremented (coordinator fail over, completed transaction, etc.)
|
||||||
|
// and client has stale cached epoch. Fix prevents fatal InvalidTxnStateException.
|
||||||
|
|
||||||
|
val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
|
||||||
|
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
|
||||||
|
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
|
||||||
|
|
||||||
|
val producerId = 123L
|
||||||
|
val oldEpoch = 5.toShort
|
||||||
|
val newEpoch = 6.toShort
|
||||||
|
|
||||||
|
// Step 1: Simulate a scenario where producer epoch was incremented to fence the producer
|
||||||
|
val previousRecords = MemoryRecords.withTransactionalRecords(
|
||||||
|
Compression.NONE, producerId, newEpoch, 0,
|
||||||
|
new SimpleRecord("previous-key".getBytes, "previous-value".getBytes)
|
||||||
|
)
|
||||||
|
val previousGuard = log.maybeStartTransactionVerification(producerId, 0, newEpoch, false) // TV1 = supportsEpochBump = false
|
||||||
|
log.appendAsLeader(previousRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, previousGuard)
|
||||||
|
|
||||||
|
// Complete the transaction normally (commits do update producer state with current epoch)
|
||||||
|
val commitMarker = MemoryRecords.withEndTransactionMarker(
|
||||||
|
producerId, newEpoch, new EndTransactionMarker(ControlRecordType.COMMIT, 0)
|
||||||
|
)
|
||||||
|
log.appendAsLeader(commitMarker, 0, AppendOrigin.COORDINATOR, RequestLocal.noCaching, VerificationGuard.SENTINEL)
|
||||||
|
|
||||||
|
// Step 2: TV1 client tries to write with stale cached epoch (before learning about epoch increment)
|
||||||
|
val staleEpochRecords = MemoryRecords.withTransactionalRecords(
|
||||||
|
Compression.NONE, producerId, oldEpoch, 0,
|
||||||
|
new SimpleRecord("stale-epoch-key".getBytes, "stale-epoch-value".getBytes)
|
||||||
|
)
|
||||||
|
|
||||||
|
// Step 3: Verify our fix - should get InvalidProducerEpochException (recoverable), not InvalidTxnStateException (fatal)
|
||||||
|
val exception = assertThrows(classOf[InvalidProducerEpochException], () => {
|
||||||
|
val staleGuard = log.maybeStartTransactionVerification(producerId, 0, oldEpoch, false)
|
||||||
|
log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, staleGuard)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Verify the error message indicates epoch mismatch
|
||||||
|
assertTrue(exception.getMessage.contains("smaller than the last seen epoch"))
|
||||||
|
assertTrue(exception.getMessage.contains(s"$oldEpoch"))
|
||||||
|
assertTrue(exception.getMessage.contains(s"$newEpoch"))
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testStaleProducerEpochReturnsRecoverableErrorForTV2Clients(): Unit = {
|
||||||
|
// Check producer epoch FIRST - if stale, return recoverable error before verification checks.
|
||||||
|
|
||||||
|
val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
|
||||||
|
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
|
||||||
|
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
|
||||||
|
|
||||||
|
val producerId = 456L
|
||||||
|
val originalEpoch = 3.toShort
|
||||||
|
val bumpedEpoch = 4.toShort
|
||||||
|
|
||||||
|
// Step 1: Start transaction with epoch 3 (before timeout)
|
||||||
|
val initialRecords = MemoryRecords.withTransactionalRecords(
|
||||||
|
Compression.NONE, producerId, originalEpoch, 0,
|
||||||
|
new SimpleRecord("ks-initial-key".getBytes, "ks-initial-value".getBytes)
|
||||||
|
)
|
||||||
|
val initialGuard = log.maybeStartTransactionVerification(producerId, 0, originalEpoch, true) // TV2 = supportsEpochBump = true
|
||||||
|
log.appendAsLeader(initialRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, initialGuard)
|
||||||
|
|
||||||
|
// Step 2: Coordinator times out and aborts transaction
|
||||||
|
// TV2 (KIP-890): Coordinator bumps epoch from 3 → 4 and sends abort marker with epoch 4
|
||||||
|
val abortMarker = MemoryRecords.withEndTransactionMarker(
|
||||||
|
producerId, bumpedEpoch, new EndTransactionMarker(ControlRecordType.ABORT, 0)
|
||||||
|
)
|
||||||
|
log.appendAsLeader(abortMarker, 0, AppendOrigin.COORDINATOR, RequestLocal.noCaching, VerificationGuard.SENTINEL)
|
||||||
|
|
||||||
|
// Step 3: TV2 transactional producer tries to append with stale epoch (timeout recovery scenario)
|
||||||
|
val staleEpochRecords = MemoryRecords.withTransactionalRecords(
|
||||||
|
Compression.NONE, producerId, originalEpoch, 0,
|
||||||
|
new SimpleRecord("ks-resume-key".getBytes, "ks-resume-value".getBytes)
|
||||||
|
)
|
||||||
|
|
||||||
|
// Step 4: Verify our fix works for TV2 - should get InvalidProducerEpochException (recoverable), not InvalidTxnStateException (fatal)
|
||||||
|
val exception = assertThrows(classOf[InvalidProducerEpochException], () => {
|
||||||
|
val staleGuard = log.maybeStartTransactionVerification(producerId, 0, originalEpoch, true) // TV2 = supportsEpochBump = true
|
||||||
|
log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, staleGuard)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Verify the error message indicates epoch mismatch (3 < 4)
|
||||||
|
assertTrue(exception.getMessage.contains("smaller than the last seen epoch"))
|
||||||
|
assertTrue(exception.getMessage.contains(s"$originalEpoch"))
|
||||||
|
assertTrue(exception.getMessage.contains(s"$bumpedEpoch"))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
object UnifiedLogTest {
|
object UnifiedLogTest {
|
||||||
|
|
|
@ -1385,12 +1385,21 @@ public class UnifiedLog implements AutoCloseable {
|
||||||
// transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
|
// transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
|
||||||
// ongoing. If the transaction is expected to be ongoing, we will not set a VerificationGuard. If the transaction is aborted, hasOngoingTransaction is false and
|
// ongoing. If the transaction is expected to be ongoing, we will not set a VerificationGuard. If the transaction is aborted, hasOngoingTransaction is false and
|
||||||
// requestVerificationGuard is the sentinel, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
|
// requestVerificationGuard is the sentinel, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
|
||||||
if (batch.isTransactional()
|
if (batch.isTransactional() && !hasOngoingTransaction(batch.producerId(), batch.producerEpoch())) {
|
||||||
&& !hasOngoingTransaction(batch.producerId(), batch.producerEpoch())
|
// Check epoch first: if producer epoch is stale, throw recoverable InvalidProducerEpochException.
|
||||||
&& batchMissingRequiredVerification(batch, requestVerificationGuard)) {
|
ProducerStateEntry entry = producerStateManager.activeProducers().get(batch.producerId());
|
||||||
|
if (entry != null && batch.producerEpoch() < entry.producerEpoch()) {
|
||||||
|
String message = "Epoch of producer " + batch.producerId() + " is " + batch.producerEpoch() +
|
||||||
|
", which is smaller than the last seen epoch " + entry.producerEpoch();
|
||||||
|
throw new InvalidProducerEpochException(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only check verification if epoch is current
|
||||||
|
if (batchMissingRequiredVerification(batch, requestVerificationGuard)) {
|
||||||
throw new InvalidTxnStateException("Record was not part of an ongoing transaction");
|
throw new InvalidTxnStateException("Record was not part of an ongoing transaction");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// We cache offset metadata for the start of each transaction. This allows us to
|
// We cache offset metadata for the start of each transaction. This allows us to
|
||||||
// compute the last stable offset without relying on additional index lookups.
|
// compute the last stable offset without relying on additional index lookups.
|
||||||
|
|
Loading…
Reference in New Issue