KAFKA-18042: Reject the produce request with lower producer epoch early (KIP-890) (#19844)
CI / build (push) Waiting to run Details

With the transaction V2, replica manager checks whether the incoming
producer request produces to a partition belonging to a transaction.
ReplicaManager figures this out by checking the producer epoch stored in
the partition log. However, the current code does not reject the produce
request if its producer epoch is lower than the stored producer epoch.
It is an optimization to reject such requests earlier instead of sending
an AddPartitionToTxn request and getting rejected in the response.

Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits
 <alivshits@confluent.io>
This commit is contained in:
Ritika Reddy 2025-06-04 13:21:53 -07:00 committed by GitHub
parent 25bc5f2cfa
commit cc25d217da
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 85 additions and 0 deletions

View File

@ -2405,6 +2405,82 @@ class ReplicaManagerTest {
} }
} }
@Test
def testTransactionVerificationRejectsLowerProducerEpoch(): Unit = {
val tp0 = new TopicPartition(topic, 0)
val producerId = 24L
val producerEpoch = 5.toShort
val lowerProducerEpoch= 4.toShort
val sequence = 6
val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
val replicaManager =
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0))
try {
replicaManager.becomeLeaderOrFollower(
1,
makeLeaderAndIsrRequest(
topicIds(tp0.topic),
tp0,
Seq(0, 1),
new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)
),
(_, _) => ()
)
// first append with epoch 5
val transactionalRecords = MemoryRecords.withTransactionalRecords(
Compression.NONE,
producerId,
producerEpoch,
sequence,
new SimpleRecord("message".getBytes)
)
handleProduceAppend(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId)
val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
verify(addPartitionsToTxnManager, times(1)).addOrVerifyTransaction(
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
ArgumentMatchers.eq(Seq(tp0)),
appendCallback.capture(),
any()
)
val verificationGuard = getVerificationGuard(replicaManager, tp0, producerId)
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
// simulate successful verification
val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue
callback(Map.empty[TopicPartition, Errors].toMap)
assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId))
assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId, producerEpoch))
// append lower epoch 4
val transactionalRecords2 = MemoryRecords.withTransactionalRecords(
Compression.NONE,
producerId,
lowerProducerEpoch,
sequence + 1,
new SimpleRecord("message".getBytes)
)
val result2 = handleProduceAppend(replicaManager, tp0, transactionalRecords2, transactionalId = transactionalId)
// no extra call to the txnmanager should have been made
verifyNoMoreInteractions(addPartitionsToTxnManager)
// broker returns the fencing error
assertEquals(Errors.INVALID_PRODUCER_EPOCH, result2.assertFired.error)
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
@Test @Test
def testTransactionVerificationGuardOnMultiplePartitions(): Unit = { def testTransactionVerificationGuardOnMultiplePartitions(): Unit = {
val mockTimer = new MockTimer(time) val mockTimer = new MockTimer(time)

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.InconsistentTopicIdException; import org.apache.kafka.common.errors.InconsistentTopicIdException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.InvalidTxnStateException; import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.KafkaStorageException; import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException; import org.apache.kafka.common.errors.OffsetOutOfRangeException;
@ -864,6 +865,14 @@ public class UnifiedLog implements AutoCloseable {
*/ */
public VerificationGuard maybeStartTransactionVerification(long producerId, int sequence, short epoch, boolean supportsEpochBump) { public VerificationGuard maybeStartTransactionVerification(long producerId, int sequence, short epoch, boolean supportsEpochBump) {
synchronized (lock) { synchronized (lock) {
// Check if the producer epoch is lower than the stored one, and reject early if it is
ProducerStateEntry entry = producerStateManager.activeProducers().get(producerId);
if (entry != null && epoch < entry.producerEpoch()) {
String message = "Epoch of producer " + producerId + " is " + epoch + ", " +
"which is smaller than the last seen epoch " + entry.producerEpoch();
throw new InvalidProducerEpochException(message);
}
if (hasOngoingTransaction(producerId, epoch)) { if (hasOngoingTransaction(producerId, epoch)) {
return VerificationGuard.SENTINEL; return VerificationGuard.SENTINEL;
} else { } else {