KAFKA-18202: Add rejection for non-zero sequences in TV2 (KIP-890) (#19902)

This change handles rejecting non-zero sequences when there is an empty
producerIDState with TV2.  The scenario will be covered with the
re-triable OutOfOrderSequence error.

For Transactions V2 with empty state:   Allow only sequence 0 is allowed for
new producers or after state cleanup (new validation added)   Don't allow any
non-zero sequence is rejected with our specific error message   Don't allow any epoch
bumps still require sequence 0 (existing validation remains)

For Transactions V1 with empty state:   Allow ANY sequence number is allowed
(0, 5, 100, etc.)   Don't allow epoch bumps still require sequence 0 (existing
validation)

Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits
 <alivshits@confluent.io>
This commit is contained in:
Ritika Reddy 2025-06-06 09:23:10 -07:00 committed by GitHub
parent 574516dcbf
commit 3479ce793b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 240 additions and 14 deletions

View File

@ -4008,12 +4008,13 @@ class UnifiedLogTest {
@ParameterizedTest
@EnumSource(value = classOf[AppendOrigin], names = Array("CLIENT", "COORDINATOR"))
def testTransactionIsOngoingAndVerificationGuard(appendOrigin: AppendOrigin): Unit = {
def testTransactionIsOngoingAndVerificationGuardTV2(appendOrigin: AppendOrigin): Unit = {
val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
val producerId = 23L
val producerEpoch = 1.toShort
var sequence = if (appendOrigin == AppendOrigin.CLIENT) 3 else 0
// For TV2, when there's no existing producer state, sequence must be 0 for both CLIENT and COORDINATOR
var sequence = 0
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
@ -4081,6 +4082,83 @@ class UnifiedLogTest {
assertFalse(verificationGuard.verify(newVerificationGuard))
}
@ParameterizedTest
@EnumSource(value = classOf[AppendOrigin], names = Array("CLIENT", "COORDINATOR"))
def testTransactionIsOngoingAndVerificationGuardTV1(appendOrigin: AppendOrigin): Unit = {
val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, false)
val producerId = 23L
val producerEpoch = 1.toShort
// For TV1, can start with non-zero sequences even with non-zero epoch when no existing producer state
var sequence = if (appendOrigin == AppendOrigin.CLIENT) 3 else 0
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
assertFalse(log.verificationGuard(producerId).verify(VerificationGuard.SENTINEL))
val idempotentRecords = MemoryRecords.withIdempotentRecords(
Compression.NONE,
producerId,
producerEpoch,
sequence,
new SimpleRecord("1".getBytes),
new SimpleRecord("2".getBytes)
)
// Only clients have nonzero sequences
if (appendOrigin == AppendOrigin.CLIENT)
sequence = sequence + 2
val transactionalRecords = MemoryRecords.withTransactionalRecords(
Compression.NONE,
producerId,
producerEpoch,
sequence,
new SimpleRecord("1".getBytes),
new SimpleRecord("2".getBytes)
)
// For TV1, create verification guard with supportsEpochBump=false
val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch, false)
assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
log.appendAsLeader(idempotentRecords, 0, appendOrigin)
assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
// Since we wrote idempotent records, we keep VerificationGuard.
assertEquals(verificationGuard, log.verificationGuard(producerId))
// Now write the transactional records
assertTrue(log.verificationGuard(producerId).verify(verificationGuard))
log.appendAsLeader(transactionalRecords, 0, appendOrigin, RequestLocal.noCaching(), verificationGuard)
assertTrue(log.hasOngoingTransaction(producerId, producerEpoch))
// VerificationGuard should be cleared now.
assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
// A subsequent maybeStartTransactionVerification will be empty since we are already verified.
assertEquals(VerificationGuard.SENTINEL, log.maybeStartTransactionVerification(producerId, sequence, producerEpoch, false))
val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
producerId,
producerEpoch,
new EndTransactionMarker(ControlRecordType.COMMIT, 0)
)
log.appendAsLeader(endTransactionMarkerRecord, 0, AppendOrigin.COORDINATOR)
assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
if (appendOrigin == AppendOrigin.CLIENT)
sequence = sequence + 1
// A new maybeStartTransactionVerification will not be empty, as we need to verify the next transaction.
val newVerificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch, false)
assertNotEquals(VerificationGuard.SENTINEL, newVerificationGuard)
assertNotEquals(verificationGuard, newVerificationGuard)
assertFalse(verificationGuard.verify(newVerificationGuard))
}
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testEmptyTransactionStillClearsVerificationGuard(supportsEpochBump: Boolean): Unit = {
@ -4165,7 +4243,7 @@ class UnifiedLogTest {
val producerId = 23L
val producerEpoch = 1.toShort
val sequence = 4
val sequence = 0
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
@ -4191,9 +4269,10 @@ class UnifiedLogTest {
assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
}
@Test
def testAllowNonZeroSequenceOnFirstAppendNonZeroEpoch(): Unit = {
val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testNonZeroSequenceOnFirstAppendNonZeroEpoch(transactionVerificationEnabled: Boolean): Unit = {
val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, transactionVerificationEnabled)
val producerId = 23L
val producerEpoch = 1.toShort
@ -4212,9 +4291,19 @@ class UnifiedLogTest {
new SimpleRecord("2".getBytes)
)
if (transactionVerificationEnabled) {
// TV2 behavior: Create verification state that supports epoch bumps
val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch, true)
// Append should not throw error.
// Should reject non-zero sequences when there's no existing producer state
assertThrows(classOf[OutOfOrderSequenceException], () =>
log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, verificationGuard))
} else {
// TV1 behavior: Create verification state with supportsEpochBump=false
val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch, false)
// Should allow non-zero sequences with non-zero epoch
log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, verificationGuard)
assertTrue(log.hasOngoingTransaction(producerId, producerEpoch))
}
}
@Test

View File

@ -2279,7 +2279,7 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0)
val producerId = 24L
val producerEpoch = 0.toShort
val sequence = 6
val sequence = 0
val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
val scheduler = new MockScheduler(time)
@ -2346,7 +2346,7 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0)
val producerId = 24L
val producerEpoch = 0.toShort
val sequence = 6
val sequence = 0
val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0))
@ -2355,7 +2355,7 @@ class ReplicaManagerTest {
makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
(_, _) => ())
// Start with sequence 6
// Start with sequence 0
val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence,
new SimpleRecord("message".getBytes))
@ -2379,7 +2379,7 @@ class ReplicaManagerTest {
assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, result.assertFired.error)
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
// Try to append a higher sequence (7) after the first one failed with a retriable error.
// Try to append a higher sequence (1) after the first one failed with a retriable error.
val transactionalRecords2 = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence + 1,
new SimpleRecord("message".getBytes))
@ -2557,7 +2557,7 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0)
val producerId = 24L
val producerEpoch = 0.toShort
val sequence = 6
val sequence = 0
val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0))

View File

@ -110,6 +110,13 @@ public class ProducerAppendInfo {
}
private void checkSequence(short producerEpoch, int appendFirstSeq, long offset) {
// For transactions v2 idempotent producers, reject non-zero sequences when there is no producer ID state
if (verificationStateEntry != null && verificationStateEntry.supportsEpochBump() &&
appendFirstSeq != 0 && currentEntry.isEmpty()) {
throw new OutOfOrderSequenceException("Invalid sequence number for producer " + producerId + " at " +
"offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq +
" (incoming seq. number). Expected sequence 0 for transactions v2 idempotent producer with no existing state.");
}
if (verificationStateEntry != null && appendFirstSeq > verificationStateEntry.lowestSequence()) {
throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " +
"offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq +

View File

@ -1049,6 +1049,136 @@ public class ProducerStateManagerTest {
assertNull(stateManager.verificationStateEntry(producerId));
}
@Test
public void testRejectNonZeroSequenceForTransactionsV2WithEmptyState() {
// Create a verification state entry that supports epoch bump (transactions v2)
VerificationStateEntry verificationEntry = stateManager.maybeCreateVerificationStateEntry(
producerId,
0,
epoch,
true
);
// Verify this is actually transactions v2
assertTrue(
verificationEntry.supportsEpochBump(),
"Should be using transactions v2 (supports epoch bump)"
);
// Create ProducerAppendInfo with empty producer state
ProducerAppendInfo appendInfo = new ProducerAppendInfo(
partition,
producerId,
ProducerStateEntry.empty(producerId),
AppendOrigin.CLIENT,
verificationEntry
);
// Attempting to append with non-zero sequence number should fail for transactions v2
OutOfOrderSequenceException exception = assertThrows(
OutOfOrderSequenceException.class,
() -> appendInfo.appendDataBatch(
epoch,
5,
5,
time.milliseconds(),
new LogOffsetMetadata(0L), 0L, false
)
);
assertTrue(exception.getMessage().contains("Expected sequence 0 for " +
"transactions v2 idempotent producer"
));
assertTrue(exception.getMessage().contains("5 (incoming seq. number)"));
// Attempting to append with sequence 0 should succeed
assertDoesNotThrow(() -> appendInfo.appendDataBatch(
epoch,
0,
0,
time.milliseconds(),
new LogOffsetMetadata(0L), 0L, false)
);
}
@Test
public void testAllowNonZeroSequenceForTransactionsV1WithEmptyState() {
// Create a verification state entry that does NOT support epoch bump (transactions v1)
// Set lowest sequence to 5 to allow our test sequence to pass the verification check
VerificationStateEntry verificationEntry = stateManager.maybeCreateVerificationStateEntry(
producerId + 1,
5,
epoch,
false
);
// Verify this is transactions v1
assertFalse(
verificationEntry.supportsEpochBump(),
"Should be using transactions v1 (does not support epoch bump)"
);
// Create ProducerAppendInfo with empty producer state
ProducerAppendInfo appendInfo = new ProducerAppendInfo(
partition,
producerId + 1,
ProducerStateEntry.empty(producerId + 1),
AppendOrigin.CLIENT,
verificationEntry
);
// Attempting to append with non-zero sequence number should succeed for transactions v1
// (our validation should not trigger)
assertDoesNotThrow(() -> appendInfo.appendDataBatch(
epoch,
5,
5,
time.milliseconds(),
new LogOffsetMetadata(0L), 0L, false)
);
}
@Test
public void testRejectNonZeroSequenceForDirectEpochBump() {
// Setup: Establish producer with epoch 0 and some sequence history
appendClientEntry(stateManager, producerId, epoch, 0, 0L, false);
appendClientEntry(stateManager, producerId, epoch, 1, 1L, false);
appendClientEntry(stateManager, producerId, epoch, 2, 2L, false);
// Verify initial state
ProducerStateEntry initialEntry = getLastEntryOrElseThrownByProducerId(stateManager, producerId);
assertEquals(0, initialEntry.producerEpoch());
assertEquals(2, initialEntry.lastSeq());
assertFalse(initialEntry.isEmpty()); // Has batch metadata
ProducerAppendInfo appendInfo = stateManager.prepareUpdate(producerId, AppendOrigin.CLIENT);
// Test Case 1: Epoch bump (0 -> 1) with non-zero sequence should be rejected
OutOfOrderSequenceException exception = assertThrows(OutOfOrderSequenceException.class,
() -> appendInfo.appendDataBatch(
(short) 1,
5,
5,
time.milliseconds(),
new LogOffsetMetadata(3L), 3L, false)
);
assertTrue(exception.getMessage().contains("Invalid sequence number for new epoch"));
assertTrue(exception.getMessage().contains("1 (request epoch)"));
assertTrue(exception.getMessage().contains("5 (seq. number)"));
assertTrue(exception.getMessage().contains("0 (current producer epoch)"));
// Test Case 2: Epoch bump (0 -> 1) with sequence 0 should succeed
ProducerAppendInfo appendInfo2 = stateManager.prepareUpdate(producerId, AppendOrigin.CLIENT);
assertDoesNotThrow(() -> appendInfo2.appendDataBatch(
(short) 1,
0,
0,
time.milliseconds(),
new LogOffsetMetadata(3L), 3L, false)
);
}
@Test
public void testLastStableOffsetCompletedTxn() {
long segmentBaseOffset = 990000L;