From cc25d217da55007f36920644ff2e734a08612017 Mon Sep 17 00:00:00 2001 From: Ritika Reddy <98577846+rreddy-22@users.noreply.github.com> Date: Wed, 4 Jun 2025 13:21:53 -0700 Subject: [PATCH] KAFKA-18042: Reject the produce request with lower producer epoch early (KIP-890) (#19844) With the transaction V2, replica manager checks whether the incoming producer request produces to a partition belonging to a transaction. ReplicaManager figures this out by checking the producer epoch stored in the partition log. However, the current code does not reject the produce request if its producer epoch is lower than the stored producer epoch. It is an optimization to reject such requests earlier instead of sending an AddPartitionToTxn request and getting rejected in the response. Reviewers: Justine Olshan , Artem Livshits --- .../kafka/server/ReplicaManagerTest.scala | 76 +++++++++++++++++++ .../storage/internals/log/UnifiedLog.java | 9 +++ 2 files changed, 85 insertions(+) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 11a8d466bf3..63b131c4659 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2405,6 +2405,82 @@ class ReplicaManagerTest { } } + @Test + def testTransactionVerificationRejectsLowerProducerEpoch(): Unit = { + val tp0 = new TopicPartition(topic, 0) + val producerId = 24L + val producerEpoch = 5.toShort + val lowerProducerEpoch= 4.toShort + val sequence = 6 + val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) + + val replicaManager = + setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0)) + + try { + replicaManager.becomeLeaderOrFollower( + 1, + makeLeaderAndIsrRequest( + topicIds(tp0.topic), + tp0, + Seq(0, 1), + new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava) + ), + (_, _) => () + ) + + // first append with epoch 5 + val transactionalRecords = MemoryRecords.withTransactionalRecords( + Compression.NONE, + producerId, + producerEpoch, + sequence, + new SimpleRecord("message".getBytes) + ) + + handleProduceAppend(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId) + + val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback]) + verify(addPartitionsToTxnManager, times(1)).addOrVerifyTransaction( + ArgumentMatchers.eq(transactionalId), + ArgumentMatchers.eq(producerId), + ArgumentMatchers.eq(producerEpoch), + ArgumentMatchers.eq(Seq(tp0)), + appendCallback.capture(), + any() + ) + + val verificationGuard = getVerificationGuard(replicaManager, tp0, producerId) + assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId)) + + // simulate successful verification + val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue + callback(Map.empty[TopicPartition, Errors].toMap) + + assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId)) + assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId, producerEpoch)) + + // append lower epoch 4 + val transactionalRecords2 = MemoryRecords.withTransactionalRecords( + Compression.NONE, + producerId, + lowerProducerEpoch, + sequence + 1, + new SimpleRecord("message".getBytes) + ) + + val result2 = handleProduceAppend(replicaManager, tp0, transactionalRecords2, transactionalId = transactionalId) + + // no extra call to the txn‑manager should have been made + verifyNoMoreInteractions(addPartitionsToTxnManager) + + // broker returns the fencing error + assertEquals(Errors.INVALID_PRODUCER_EPOCH, result2.assertFired.error) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + @Test def testTransactionVerificationGuardOnMultiplePartitions(): Unit = { val mockTimer = new MockTimer(time) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index fc06199de8f..2ca67ad47bc 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.errors.InconsistentTopicIdException; +import org.apache.kafka.common.errors.InvalidProducerEpochException; import org.apache.kafka.common.errors.InvalidTxnStateException; import org.apache.kafka.common.errors.KafkaStorageException; import org.apache.kafka.common.errors.OffsetOutOfRangeException; @@ -864,6 +865,14 @@ public class UnifiedLog implements AutoCloseable { */ public VerificationGuard maybeStartTransactionVerification(long producerId, int sequence, short epoch, boolean supportsEpochBump) { synchronized (lock) { + // Check if the producer epoch is lower than the stored one, and reject early if it is + ProducerStateEntry entry = producerStateManager.activeProducers().get(producerId); + if (entry != null && epoch < entry.producerEpoch()) { + String message = "Epoch of producer " + producerId + " is " + epoch + ", " + + "which is smaller than the last seen epoch " + entry.producerEpoch(); + throw new InvalidProducerEpochException(message); + } + if (hasOngoingTransaction(producerId, epoch)) { return VerificationGuard.SENTINEL; } else {