KAFKA-19690 Add epoch check before verification guard check to prevent unexpected fatal error (#20607)

Cherry-pick changes (https://github.com/apache/kafka/pull/20534) to 4.0

Conflicts:
->
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
- kept the file the same, and the rest of the code is in
UnifiedLog.scala in 4.0 so added it there
-> core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala - just added
the required test and kept everything else the same

Reviewers: Justine Olshan
[jolshan@confluent.io](mailto:jolshan@confluent.io), Chia-Ping Tsai
[chia7712@gmail.com](mailto:chia7712@gmail.com)
This commit is contained in:
Ritika Reddy 2025-10-06 07:20:57 -07:00 committed by GitHub
parent f181048e42
commit 4b0ba42483
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 102 additions and 2 deletions

View File

@ -1057,9 +1057,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
// ongoing. If the transaction is expected to be ongoing, we will not set a VerificationGuard. If the transaction is aborted, hasOngoingTransaction is false and
// requestVerificationGuard is the sentinel, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
if (batch.isTransactional && !hasOngoingTransaction(batch.producerId, batch.producerEpoch()) && batchMissingRequiredVerification(batch, requestVerificationGuard))
if (batch.isTransactional && !hasOngoingTransaction(batch.producerId, batch.producerEpoch)) {
// Check epoch first: if producer epoch is stale, throw recoverable InvalidProducerEpochException.
val entry = producerStateManager.activeProducers.get(batch.producerId)
if (entry != null && batch.producerEpoch < entry.producerEpoch) {
val message = "Epoch of producer " + batch.producerId + " is " + batch.producerEpoch + ", which is smaller than the last seen epoch " + entry.producerEpoch
throw new InvalidProducerEpochException(message)
}
// Only check verification if epoch is current
if (batchMissingRequiredVerification(batch, requestVerificationGuard))
throw new InvalidTxnStateException("Record was not part of an ongoing transaction")
}
}
// We cache offset metadata for the start of each transaction. This allows us to
// compute the last stable offset without relying on additional index lookups.

View File

@ -57,6 +57,7 @@ import org.mockito.Mockito.{doAnswer, doThrow, spy}
import net.jqwik.api.AfterFailureMode
import net.jqwik.api.ForAll
import net.jqwik.api.Property
import org.apache.kafka.server.common.RequestLocal
import java.io._
import java.nio.ByteBuffer
@ -4660,6 +4661,96 @@ class UnifiedLogTest {
assertEquals(new OffsetResultHolder(Optional.empty(), Optional.empty()), result)
}
@Test
def testStaleProducerEpochReturnsRecoverableErrorForTV1Clients(): Unit = {
// Producer epoch gets incremented (coordinator fail over, completed transaction, etc.)
// and client has stale cached epoch. Fix prevents fatal InvalidTxnStateException.
val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
val producerId = 123L
val oldEpoch = 5.toShort
val newEpoch = 6.toShort
// Step 1: Simulate a scenario where producer epoch was incremented to fence the producer
val previousRecords = MemoryRecords.withTransactionalRecords(
Compression.NONE, producerId, newEpoch, 0,
new SimpleRecord("previous-key".getBytes, "previous-value".getBytes)
)
val previousGuard = log.maybeStartTransactionVerification(producerId, 0, newEpoch, false) // TV1 = supportsEpochBump = false
log.appendAsLeader(previousRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, previousGuard)
// Complete the transaction normally (commits do update producer state with current epoch)
val commitMarker = MemoryRecords.withEndTransactionMarker(
producerId, newEpoch, new EndTransactionMarker(ControlRecordType.COMMIT, 0)
)
log.appendAsLeader(commitMarker, 0, AppendOrigin.COORDINATOR, RequestLocal.noCaching, VerificationGuard.SENTINEL)
// Step 2: TV1 client tries to write with stale cached epoch (before learning about epoch increment)
val staleEpochRecords = MemoryRecords.withTransactionalRecords(
Compression.NONE, producerId, oldEpoch, 0,
new SimpleRecord("stale-epoch-key".getBytes, "stale-epoch-value".getBytes)
)
// Step 3: Verify our fix - should get InvalidProducerEpochException (recoverable), not InvalidTxnStateException (fatal)
val exception = assertThrows(classOf[InvalidProducerEpochException], () => {
val staleGuard = log.maybeStartTransactionVerification(producerId, 0, oldEpoch, false)
log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, staleGuard)
})
// Verify the error message indicates epoch mismatch
assertTrue(exception.getMessage.contains("smaller than the last seen epoch"))
assertTrue(exception.getMessage.contains(s"$oldEpoch"))
assertTrue(exception.getMessage.contains(s"$newEpoch"))
}
@Test
def testStaleProducerEpochReturnsRecoverableErrorForTV2Clients(): Unit = {
// Check producer epoch FIRST - if stale, return recoverable error before verification checks.
val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
val producerId = 456L
val originalEpoch = 3.toShort
val bumpedEpoch = 4.toShort
// Step 1: Start transaction with epoch 3 (before timeout)
val initialRecords = MemoryRecords.withTransactionalRecords(
Compression.NONE, producerId, originalEpoch, 0,
new SimpleRecord("ks-initial-key".getBytes, "ks-initial-value".getBytes)
)
val initialGuard = log.maybeStartTransactionVerification(producerId, 0, originalEpoch, true) // TV2 = supportsEpochBump = true
log.appendAsLeader(initialRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, initialGuard)
// Step 2: Coordinator times out and aborts transaction
// TV2 (KIP-890): Coordinator bumps epoch from 3 4 and sends abort marker with epoch 4
val abortMarker = MemoryRecords.withEndTransactionMarker(
producerId, bumpedEpoch, new EndTransactionMarker(ControlRecordType.ABORT, 0)
)
log.appendAsLeader(abortMarker, 0, AppendOrigin.COORDINATOR, RequestLocal.noCaching, VerificationGuard.SENTINEL)
// Step 3: TV2 transactional producer tries to append with stale epoch (timeout recovery scenario)
val staleEpochRecords = MemoryRecords.withTransactionalRecords(
Compression.NONE, producerId, originalEpoch, 0,
new SimpleRecord("ks-resume-key".getBytes, "ks-resume-value".getBytes)
)
// Step 4: Verify our fix works for TV2 - should get InvalidProducerEpochException (recoverable), not InvalidTxnStateException (fatal)
val exception = assertThrows(classOf[InvalidProducerEpochException], () => {
val staleGuard = log.maybeStartTransactionVerification(producerId, 0, originalEpoch, true) // TV2 = supportsEpochBump = true
log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, staleGuard)
})
// Verify the error message indicates epoch mismatch (3 < 4)
assertTrue(exception.getMessage.contains("smaller than the last seen epoch"))
assertTrue(exception.getMessage.contains(s"$originalEpoch"))
assertTrue(exception.getMessage.contains(s"$bumpedEpoch"))
}
private def appendTransactionalToBuffer(buffer: ByteBuffer,
producerId: Long,
producerEpoch: Short,