From a10c1f3ea1454dbd644ea65a885c965529e1d37d Mon Sep 17 00:00:00 2001 From: Ritika Reddy <98577846+rreddy-22@users.noreply.github.com> Date: Thu, 2 Oct 2025 06:18:34 -0700 Subject: [PATCH] KAFKA-19690 Add epoch check before verification guard check to prevent unexpected fatal error (#20618) Cherry-pick changes (#20534) to 4.1 Reviewers: Chia-Ping Tsai --- .../scala/unit/kafka/log/UnifiedLogTest.scala | 90 +++++++++++++++++++ .../storage/internals/log/UnifiedLog.java | 17 +++- 2 files changed, 103 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index d30d5a1040e..6d312f2c53e 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -4786,6 +4786,96 @@ class UnifiedLogTest { (log, segmentWithOverflow) } + + @Test + def testStaleProducerEpochReturnsRecoverableErrorForTV1Clients(): Unit = { + // Producer epoch gets incremented (coordinator fail over, completed transaction, etc.) + // and client has stale cached epoch. Fix prevents fatal InvalidTxnStateException. + + val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true) + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5) + val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig) + + val producerId = 123L + val oldEpoch = 5.toShort + val newEpoch = 6.toShort + + // Step 1: Simulate a scenario where producer epoch was incremented to fence the producer + val previousRecords = MemoryRecords.withTransactionalRecords( + Compression.NONE, producerId, newEpoch, 0, + new SimpleRecord("previous-key".getBytes, "previous-value".getBytes) + ) + val previousGuard = log.maybeStartTransactionVerification(producerId, 0, newEpoch, false) // TV1 = supportsEpochBump = false + log.appendAsLeader(previousRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, previousGuard) + + // Complete the transaction normally (commits do update producer state with current epoch) + val commitMarker = MemoryRecords.withEndTransactionMarker( + producerId, newEpoch, new EndTransactionMarker(ControlRecordType.COMMIT, 0) + ) + log.appendAsLeader(commitMarker, 0, AppendOrigin.COORDINATOR, RequestLocal.noCaching, VerificationGuard.SENTINEL) + + // Step 2: TV1 client tries to write with stale cached epoch (before learning about epoch increment) + val staleEpochRecords = MemoryRecords.withTransactionalRecords( + Compression.NONE, producerId, oldEpoch, 0, + new SimpleRecord("stale-epoch-key".getBytes, "stale-epoch-value".getBytes) + ) + + // Step 3: Verify our fix - should get InvalidProducerEpochException (recoverable), not InvalidTxnStateException (fatal) + val exception = assertThrows(classOf[InvalidProducerEpochException], () => { + val staleGuard = log.maybeStartTransactionVerification(producerId, 0, oldEpoch, false) + log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, staleGuard) + }) + + // Verify the error message indicates epoch mismatch + assertTrue(exception.getMessage.contains("smaller than the last seen epoch")) + assertTrue(exception.getMessage.contains(s"$oldEpoch")) + assertTrue(exception.getMessage.contains(s"$newEpoch")) + } + + @Test + def testStaleProducerEpochReturnsRecoverableErrorForTV2Clients(): Unit = { + // Check producer epoch FIRST - if stale, return recoverable error before verification checks. + + val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true) + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5) + val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig) + + val producerId = 456L + val originalEpoch = 3.toShort + val bumpedEpoch = 4.toShort + + // Step 1: Start transaction with epoch 3 (before timeout) + val initialRecords = MemoryRecords.withTransactionalRecords( + Compression.NONE, producerId, originalEpoch, 0, + new SimpleRecord("ks-initial-key".getBytes, "ks-initial-value".getBytes) + ) + val initialGuard = log.maybeStartTransactionVerification(producerId, 0, originalEpoch, true) // TV2 = supportsEpochBump = true + log.appendAsLeader(initialRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, initialGuard) + + // Step 2: Coordinator times out and aborts transaction + // TV2 (KIP-890): Coordinator bumps epoch from 3 → 4 and sends abort marker with epoch 4 + val abortMarker = MemoryRecords.withEndTransactionMarker( + producerId, bumpedEpoch, new EndTransactionMarker(ControlRecordType.ABORT, 0) + ) + log.appendAsLeader(abortMarker, 0, AppendOrigin.COORDINATOR, RequestLocal.noCaching, VerificationGuard.SENTINEL) + + // Step 3: TV2 transactional producer tries to append with stale epoch (timeout recovery scenario) + val staleEpochRecords = MemoryRecords.withTransactionalRecords( + Compression.NONE, producerId, originalEpoch, 0, + new SimpleRecord("ks-resume-key".getBytes, "ks-resume-value".getBytes) + ) + + // Step 4: Verify our fix works for TV2 - should get InvalidProducerEpochException (recoverable), not InvalidTxnStateException (fatal) + val exception = assertThrows(classOf[InvalidProducerEpochException], () => { + val staleGuard = log.maybeStartTransactionVerification(producerId, 0, originalEpoch, true) // TV2 = supportsEpochBump = true + log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, staleGuard) + }) + + // Verify the error message indicates epoch mismatch (3 < 4) + assertTrue(exception.getMessage.contains("smaller than the last seen epoch")) + assertTrue(exception.getMessage.contains(s"$originalEpoch")) + assertTrue(exception.getMessage.contains(s"$bumpedEpoch")) + } } object UnifiedLogTest { diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index 2ca67ad47bc..b5b16e42232 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -1385,10 +1385,19 @@ public class UnifiedLog implements AutoCloseable { // transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still // ongoing. If the transaction is expected to be ongoing, we will not set a VerificationGuard. If the transaction is aborted, hasOngoingTransaction is false and // requestVerificationGuard is the sentinel, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1. - if (batch.isTransactional() - && !hasOngoingTransaction(batch.producerId(), batch.producerEpoch()) - && batchMissingRequiredVerification(batch, requestVerificationGuard)) { - throw new InvalidTxnStateException("Record was not part of an ongoing transaction"); + if (batch.isTransactional() && !hasOngoingTransaction(batch.producerId(), batch.producerEpoch())) { + // Check epoch first: if producer epoch is stale, throw recoverable InvalidProducerEpochException. + ProducerStateEntry entry = producerStateManager.activeProducers().get(batch.producerId()); + if (entry != null && batch.producerEpoch() < entry.producerEpoch()) { + String message = "Epoch of producer " + batch.producerId() + " is " + batch.producerEpoch() + + ", which is smaller than the last seen epoch " + entry.producerEpoch(); + throw new InvalidProducerEpochException(message); + } + + // Only check verification if epoch is current + if (batchMissingRequiredVerification(batch, requestVerificationGuard)) { + throw new InvalidTxnStateException("Record was not part of an ongoing transaction"); + } } }