mirror of https://github.com/apache/kafka.git
KAFKA-14920: Address timeouts and out of order sequences (#14033)
When creating a verification state entry, we also store sequence and epoch. On subsequent requests, we will take the latest epoch seen and the earliest sequence seen. That way, if we try to append a sequence after the earliest seen sequence, we can block that and retry. This addresses potential OutOfOrderSequence loops caused by errors during verification (coordinator loading, timeouts, etc). Reviewers: David Jacot <david.jacot@gmail.com>, Artem Livshits <alivshits@confluent.io>
This commit is contained in:
parent
84691b11f6
commit
38781f9aea
|
@ -577,9 +577,9 @@ class Partition(val topicPartition: TopicPartition,
|
|||
}
|
||||
|
||||
// Returns a verification guard object if we need to verify. This starts or continues the verification process. Otherwise return null.
|
||||
def maybeStartTransactionVerification(producerId: Long): Object = {
|
||||
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): Object = {
|
||||
leaderLogIfLocal match {
|
||||
case Some(log) => log.maybeStartTransactionVerification(producerId)
|
||||
case Some(log) => log.maybeStartTransactionVerification(producerId, sequence, epoch)
|
||||
case None => throw new NotLeaderOrFollowerException();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -581,18 +581,27 @@ class UnifiedLog(@volatile var logStartOffset: Long,
|
|||
* Maybe create and return the verification guard object for the given producer ID if the transaction is not yet ongoing.
|
||||
* Creation starts the verification process. Otherwise return null.
|
||||
*/
|
||||
def maybeStartTransactionVerification(producerId: Long): Object = lock synchronized {
|
||||
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): Object = lock synchronized {
|
||||
if (hasOngoingTransaction(producerId))
|
||||
null
|
||||
else
|
||||
getOrMaybeCreateVerificationGuard(producerId, true)
|
||||
maybeCreateVerificationGuard(producerId, sequence, epoch)
|
||||
}
|
||||
|
||||
/**
|
||||
* Maybe create the VerificationStateEntry for the given producer ID -- if an entry is present, return its verification guard, otherwise, return null.
|
||||
* Maybe create the VerificationStateEntry for the given producer ID -- always return the verification guard
|
||||
*/
|
||||
def getOrMaybeCreateVerificationGuard(producerId: Long, createIfAbsent: Boolean = false): Object = lock synchronized {
|
||||
val entry = producerStateManager.verificationStateEntry(producerId, createIfAbsent)
|
||||
def maybeCreateVerificationGuard(producerId: Long,
|
||||
sequence: Int,
|
||||
epoch: Short): Object = lock synchronized {
|
||||
producerStateManager.maybeCreateVerificationStateEntry(producerId, sequence, epoch).verificationGuard
|
||||
}
|
||||
|
||||
/**
|
||||
* If an VerificationStateEntry is present for the given producer ID, return its verification guard, otherwise, return null.
|
||||
*/
|
||||
def verificationGuard(producerId: Long): Object = lock synchronized {
|
||||
val entry = producerStateManager.verificationStateEntry(producerId)
|
||||
if (entry != null) entry.verificationGuard else null
|
||||
}
|
||||
|
||||
|
@ -1042,7 +1051,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
|
|||
}
|
||||
|
||||
private def batchMissingRequiredVerification(batch: MutableRecordBatch, requestVerificationGuard: Object): Boolean = {
|
||||
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() && (requestVerificationGuard != getOrMaybeCreateVerificationGuard(batch.producerId) || requestVerificationGuard == null)
|
||||
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() &&
|
||||
(requestVerificationGuard != verificationGuard(batch.producerId) || requestVerificationGuard == null)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -862,7 +862,8 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
|
||||
if (transactionalBatches.nonEmpty) {
|
||||
// We return verification guard if the partition needs to be verified. If no state is present, no need to verify.
|
||||
val verificationGuard = getPartitionOrException(topicPartition).maybeStartTransactionVerification(records.firstBatch.producerId)
|
||||
val firstBatch = records.firstBatch
|
||||
val verificationGuard = getPartitionOrException(topicPartition).maybeStartTransactionVerification(firstBatch.producerId, firstBatch.baseSequence, firstBatch.producerEpoch)
|
||||
if (verificationGuard != null) {
|
||||
verificationGuards.put(topicPartition, verificationGuard)
|
||||
unverifiedEntries.put(topicPartition, records)
|
||||
|
|
|
@ -998,7 +998,7 @@ class PartitionTest extends AbstractPartitionTest {
|
|||
new SimpleRecord("k3".getBytes, "v3".getBytes)),
|
||||
baseOffset = 0L,
|
||||
producerId = 2L)
|
||||
val verificationGuard = partition.maybeStartTransactionVerification(2L)
|
||||
val verificationGuard = partition.maybeStartTransactionVerification(2L, 0, 0)
|
||||
partition.appendRecordsToLeader(records, origin = AppendOrigin.CLIENT, requiredAcks = 0, RequestLocal.withThreadConfinedCaching, verificationGuard)
|
||||
|
||||
def fetchOffset(isolationLevel: Option[IsolationLevel], timestamp: Long): TimestampAndOffset = {
|
||||
|
@ -3390,7 +3390,7 @@ class PartitionTest extends AbstractPartitionTest {
|
|||
assertThrows(classOf[InvalidRecordException], () => partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching))
|
||||
|
||||
// Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-null verification object.
|
||||
val verificationGuard = partition.maybeStartTransactionVerification(producerId)
|
||||
val verificationGuard = partition.maybeStartTransactionVerification(producerId, 3, 0)
|
||||
assertNotNull(verificationGuard)
|
||||
|
||||
// With the wrong verification guard, append should fail.
|
||||
|
@ -3398,12 +3398,12 @@ class PartitionTest extends AbstractPartitionTest {
|
|||
origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, Optional.of(new Object)))
|
||||
|
||||
// We should return the same verification object when we still need to verify. Append should proceed.
|
||||
val verificationGuard2 = partition.maybeStartTransactionVerification(producerId)
|
||||
val verificationGuard2 = partition.maybeStartTransactionVerification(producerId, 3, 0)
|
||||
assertEquals(verificationGuard, verificationGuard2)
|
||||
partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, verificationGuard)
|
||||
|
||||
// We should no longer need a verification object. Future appends without verification guard will also succeed.
|
||||
val verificationGuard3 = partition.maybeStartTransactionVerification(producerId)
|
||||
val verificationGuard3 = partition.maybeStartTransactionVerification(producerId, 3, 0)
|
||||
assertNull(verificationGuard3)
|
||||
partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
|
||||
}
|
||||
|
|
|
@ -200,7 +200,8 @@ class ProducerStateManagerTest {
|
|||
val producerEpoch = 0.toShort
|
||||
val offset = 992342L
|
||||
val seq = 0
|
||||
val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), AppendOrigin.CLIENT)
|
||||
val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), AppendOrigin.CLIENT,
|
||||
stateManager.maybeCreateVerificationStateEntry(producerId, seq, producerEpoch))
|
||||
|
||||
val firstOffsetMetadata = new LogOffsetMetadata(offset, 990000L, 234224)
|
||||
producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, time.milliseconds(),
|
||||
|
@ -388,7 +389,8 @@ class ProducerStateManagerTest {
|
|||
partition,
|
||||
producerId,
|
||||
ProducerStateEntry.empty(producerId),
|
||||
AppendOrigin.CLIENT
|
||||
AppendOrigin.CLIENT,
|
||||
stateManager.maybeCreateVerificationStateEntry(producerId, 0, producerEpoch)
|
||||
)
|
||||
val firstOffsetMetadata = new LogOffsetMetadata(startOffset, segmentBaseOffset, 50 * relativeOffset)
|
||||
producerAppendInfo.appendDataBatch(producerEpoch, 0, 0, time.milliseconds(),
|
||||
|
@ -1089,37 +1091,74 @@ class ProducerStateManagerTest {
|
|||
|
||||
@Test
|
||||
def testEntryForVerification(): Unit = {
|
||||
val originalEntry = stateManager.verificationStateEntry(producerId, true)
|
||||
val originalEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)
|
||||
val originalEntryVerificationGuard = originalEntry.verificationGuard()
|
||||
|
||||
def verifyEntry(producerId: Long, newEntry: VerificationStateEntry): Unit = {
|
||||
val entry = stateManager.verificationStateEntry(producerId, false)
|
||||
val entry = stateManager.verificationStateEntry(producerId)
|
||||
assertEquals(originalEntryVerificationGuard, entry.verificationGuard)
|
||||
assertEquals(entry.verificationGuard, newEntry.verificationGuard)
|
||||
}
|
||||
|
||||
// If we already have an entry, reuse it.
|
||||
val updatedEntry = stateManager.verificationStateEntry(producerId, true)
|
||||
val updatedEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)
|
||||
verifyEntry(producerId, updatedEntry)
|
||||
|
||||
// Add the transactional data and clear the entry.
|
||||
append(stateManager, producerId, 0, 0, offset = 0, isTransactional = true)
|
||||
stateManager.clearVerificationStateEntry(producerId)
|
||||
assertNull(stateManager.verificationStateEntry(producerId, false))
|
||||
assertNull(stateManager.verificationStateEntry(producerId))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testSequenceAndEpochInVerificationEntry(): Unit = {
|
||||
val originalEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 1, 0)
|
||||
val originalEntryVerificationGuard = originalEntry.verificationGuard()
|
||||
|
||||
def verifyEntry(producerId: Long, newEntry: VerificationStateEntry, expectedSequence: Int, expectedEpoch: Short): Unit = {
|
||||
val entry = stateManager.verificationStateEntry(producerId)
|
||||
assertEquals(originalEntryVerificationGuard, entry.verificationGuard)
|
||||
assertEquals(entry.verificationGuard, newEntry.verificationGuard)
|
||||
assertEquals(expectedSequence, entry.lowestSequence)
|
||||
assertEquals(expectedEpoch, entry.epoch)
|
||||
}
|
||||
verifyEntry(producerId, originalEntry, 1, 0)
|
||||
|
||||
// If we see a lower sequence, update to the lower one.
|
||||
val updatedEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)
|
||||
verifyEntry(producerId, updatedEntry, 0, 0)
|
||||
|
||||
// If we see a new epoch that is higher, update the sequence.
|
||||
val updatedEntryNewEpoch = stateManager.maybeCreateVerificationStateEntry(producerId, 2, 1)
|
||||
verifyEntry(producerId, updatedEntryNewEpoch, 2, 1)
|
||||
|
||||
// Ignore a lower epoch.
|
||||
val updatedEntryOldEpoch = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)
|
||||
verifyEntry(producerId, updatedEntryOldEpoch, 2, 1)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testThrowOutOfOrderSequenceWithVerificationSequenceCheck(): Unit = {
|
||||
val originalEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)
|
||||
|
||||
// Trying to append with a higher sequence should fail
|
||||
assertThrows(classOf[OutOfOrderSequenceException], () => append(stateManager, producerId, 0, 4, offset = 0, isTransactional = true))
|
||||
|
||||
assertEquals(originalEntry, stateManager.verificationStateEntry(producerId))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testVerificationStateEntryExpiration(): Unit = {
|
||||
val originalEntry = stateManager.verificationStateEntry(producerId, true)
|
||||
val originalEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)
|
||||
|
||||
// Before timeout we do not remove. Note: Accessing the verification entry does not update the time.
|
||||
time.sleep(producerStateManagerConfig.producerIdExpirationMs / 2)
|
||||
stateManager.removeExpiredProducers(time.milliseconds())
|
||||
assertEquals(originalEntry, stateManager.verificationStateEntry(producerId, false))
|
||||
assertEquals(originalEntry, stateManager.verificationStateEntry(producerId))
|
||||
|
||||
time.sleep((producerStateManagerConfig.producerIdExpirationMs / 2) + 1)
|
||||
stateManager.removeExpiredProducers(time.milliseconds())
|
||||
assertNull(stateManager.verificationStateEntry(producerId, false))
|
||||
assertNull(stateManager.verificationStateEntry(producerId))
|
||||
}
|
||||
|
||||
private def testLoadFromCorruptSnapshot(makeFileCorrupt: FileChannel => Unit): Unit = {
|
||||
|
|
|
@ -3677,7 +3677,7 @@ class UnifiedLogTest {
|
|||
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
|
||||
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
|
||||
assertFalse(log.hasOngoingTransaction(producerId))
|
||||
assertNull(log.getOrMaybeCreateVerificationGuard(producerId))
|
||||
assertNull(log.verificationGuard(producerId))
|
||||
|
||||
val idempotentRecords = MemoryRecords.withIdempotentRecords(
|
||||
CompressionType.NONE,
|
||||
|
@ -3688,15 +3688,6 @@ class UnifiedLogTest {
|
|||
new SimpleRecord("2".getBytes)
|
||||
)
|
||||
|
||||
val verificationGuard = log.maybeStartTransactionVerification(producerId)
|
||||
assertNotNull(verificationGuard)
|
||||
|
||||
log.appendAsLeader(idempotentRecords, leaderEpoch = 0)
|
||||
assertFalse(log.hasOngoingTransaction(producerId))
|
||||
|
||||
// Since we wrote idempotent records, we keep verification guard.
|
||||
assertEquals(verificationGuard, log.getOrMaybeCreateVerificationGuard(producerId))
|
||||
|
||||
val transactionalRecords = MemoryRecords.withTransactionalRecords(
|
||||
CompressionType.NONE,
|
||||
producerId,
|
||||
|
@ -3706,13 +3697,23 @@ class UnifiedLogTest {
|
|||
new SimpleRecord("2".getBytes)
|
||||
)
|
||||
|
||||
val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence + 2, producerEpoch)
|
||||
assertNotNull(verificationGuard)
|
||||
|
||||
log.appendAsLeader(idempotentRecords, leaderEpoch = 0)
|
||||
assertFalse(log.hasOngoingTransaction(producerId))
|
||||
|
||||
// Since we wrote idempotent records, we keep verification guard.
|
||||
assertEquals(verificationGuard, log.verificationGuard(producerId))
|
||||
|
||||
// Now write the transactional records
|
||||
log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
|
||||
assertTrue(log.hasOngoingTransaction(producerId))
|
||||
// Verification guard should be cleared now.
|
||||
assertNull(log.getOrMaybeCreateVerificationGuard(producerId))
|
||||
assertNull(log.verificationGuard(producerId))
|
||||
|
||||
// A subsequent maybeStartTransactionVerification will be empty since we are already verified.
|
||||
assertNull(log.maybeStartTransactionVerification(producerId))
|
||||
assertNull(log.maybeStartTransactionVerification(producerId, sequence + 2, producerEpoch))
|
||||
|
||||
val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
|
||||
producerId,
|
||||
|
@ -3722,10 +3723,10 @@ class UnifiedLogTest {
|
|||
|
||||
log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
|
||||
assertFalse(log.hasOngoingTransaction(producerId))
|
||||
assertNull(log.getOrMaybeCreateVerificationGuard(producerId))
|
||||
assertNull(log.verificationGuard(producerId))
|
||||
|
||||
// A new maybeStartTransactionVerification will not be empty, as we need to verify the next transaction.
|
||||
val newVerificationGuard = log.maybeStartTransactionVerification(producerId)
|
||||
val newVerificationGuard = log.maybeStartTransactionVerification(producerId, sequence + 3, producerEpoch)
|
||||
assertNotNull(newVerificationGuard)
|
||||
assertNotEquals(verificationGuard, newVerificationGuard)
|
||||
}
|
||||
|
@ -3739,7 +3740,7 @@ class UnifiedLogTest {
|
|||
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
|
||||
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
|
||||
|
||||
val verificationGuard = log.maybeStartTransactionVerification(producerId)
|
||||
val verificationGuard = log.maybeStartTransactionVerification(producerId, 0, producerEpoch)
|
||||
assertNotNull(verificationGuard)
|
||||
|
||||
val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
|
||||
|
@ -3750,7 +3751,7 @@ class UnifiedLogTest {
|
|||
|
||||
log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
|
||||
assertFalse(log.hasOngoingTransaction(producerId))
|
||||
assertNull(log.getOrMaybeCreateVerificationGuard(producerId))
|
||||
assertNull(log.verificationGuard(producerId))
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -3763,7 +3764,7 @@ class UnifiedLogTest {
|
|||
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
|
||||
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
|
||||
assertFalse(log.hasOngoingTransaction(producerId))
|
||||
assertNull(log.getOrMaybeCreateVerificationGuard(producerId))
|
||||
assertNull(log.verificationGuard(producerId))
|
||||
|
||||
val transactionalRecords = MemoryRecords.withTransactionalRecords(
|
||||
CompressionType.NONE,
|
||||
|
@ -3774,7 +3775,7 @@ class UnifiedLogTest {
|
|||
new SimpleRecord("2".getBytes)
|
||||
)
|
||||
|
||||
val verificationGuard = log.maybeStartTransactionVerification(producerId)
|
||||
val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
|
||||
// Append should not throw error.
|
||||
log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
|
||||
}
|
||||
|
|
|
@ -2250,6 +2250,66 @@ class ReplicaManagerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testTransactionVerificationBlocksOutOfOrderSequence(): Unit = {
|
||||
val tp0 = new TopicPartition(topic, 0)
|
||||
val producerId = 24L
|
||||
val producerEpoch = 0.toShort
|
||||
val sequence = 6
|
||||
val node = new Node(0, "host1", 0)
|
||||
val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
|
||||
|
||||
val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), node)
|
||||
try {
|
||||
replicaManager.becomeLeaderOrFollower(1,
|
||||
makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
|
||||
(_, _) => ())
|
||||
|
||||
// Start with sequence 6
|
||||
val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
|
||||
new SimpleRecord("message".getBytes))
|
||||
|
||||
val transactionToAdd = new AddPartitionsToTxnTransaction()
|
||||
.setTransactionalId(transactionalId)
|
||||
.setProducerId(producerId)
|
||||
.setProducerEpoch(producerEpoch)
|
||||
.setVerifyOnly(true)
|
||||
.setTopics(new AddPartitionsToTxnTopicCollection(
|
||||
Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
|
||||
))
|
||||
|
||||
// We should add these partitions to the manager to verify.
|
||||
val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0))
|
||||
val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
|
||||
verify(addPartitionsToTxnManager, times(1)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
|
||||
val verificationGuard = getVerificationGuard(replicaManager, tp0, producerId)
|
||||
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
|
||||
|
||||
// Confirm we did not write to the log and instead returned error.
|
||||
val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue()
|
||||
callback(Map(tp0 -> Errors.NOT_COORDINATOR).toMap)
|
||||
assertEquals(Errors.NOT_COORDINATOR, result.assertFired.error)
|
||||
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
|
||||
|
||||
// Try to append a higher sequence (7) after the first one failed with a retriable error.
|
||||
val transactionalRecords2 = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 1,
|
||||
new SimpleRecord("message".getBytes))
|
||||
|
||||
val result2 = appendRecords(replicaManager, tp0, transactionalRecords2, transactionalId = transactionalId, transactionStatePartition = Some(0))
|
||||
val appendCallback2 = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
|
||||
verify(addPartitionsToTxnManager, times(2)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), appendCallback2.capture())
|
||||
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
|
||||
|
||||
// Verification should succeed, but we expect to fail with OutOfOrderSequence and for the verification guard to remain.
|
||||
val callback2: AddPartitionsToTxnManager.AppendCallback = appendCallback2.getValue()
|
||||
callback2(Map.empty[TopicPartition, Errors].toMap)
|
||||
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
|
||||
assertEquals(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, result2.assertFired.error)
|
||||
} finally {
|
||||
replicaManager.shutdown(checkpointHW = false)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testTransactionVerificationGuardOnMultiplePartitions(): Unit = {
|
||||
val mockTimer = new MockTimer(time)
|
||||
|
@ -2851,7 +2911,7 @@ class ReplicaManagerTest {
|
|||
private def getVerificationGuard(replicaManager: ReplicaManager,
|
||||
tp: TopicPartition,
|
||||
producerId: Long): Object = {
|
||||
replicaManager.getPartitionOrException(tp).log.get.getOrMaybeCreateVerificationGuard(producerId)
|
||||
replicaManager.getPartitionOrException(tp).log.get.verificationGuard(producerId)
|
||||
}
|
||||
|
||||
private def setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager: AddPartitionsToTxnManager,
|
||||
|
|
|
@ -47,6 +47,7 @@ public class ProducerAppendInfo {
|
|||
private final long producerId;
|
||||
private final ProducerStateEntry currentEntry;
|
||||
private final AppendOrigin origin;
|
||||
private final VerificationStateEntry verificationStateEntry;
|
||||
|
||||
private final List<TxnMetadata> transactions = new ArrayList<>();
|
||||
private final ProducerStateEntry updatedEntry;
|
||||
|
@ -54,25 +55,28 @@ public class ProducerAppendInfo {
|
|||
/**
|
||||
* Creates a new instance with the provided parameters.
|
||||
*
|
||||
* @param topicPartition topic partition
|
||||
* @param producerId The id of the producer appending to the log
|
||||
* @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of
|
||||
* the most recent appends made by the producer. Validation of the first incoming append will
|
||||
* be made against the latest append in the current entry. New appends will replace older appends
|
||||
* in the current entry so that the space overhead is constant.
|
||||
* @param origin Indicates the origin of the append which implies the extent of validation. For example, offset
|
||||
* commits, which originate from the group coordinator, do not have sequence numbers and therefore
|
||||
* only producer epoch validation is done. Appends which come through replication are not validated
|
||||
* (we assume the validation has already been done) and appends from clients require full validation.
|
||||
* @param topicPartition topic partition
|
||||
* @param producerId The id of the producer appending to the log
|
||||
* @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of
|
||||
* the most recent appends made by the producer. Validation of the first incoming append will
|
||||
* be made against the latest append in the current entry. New appends will replace older appends
|
||||
* in the current entry so that the space overhead is constant.
|
||||
* @param origin Indicates the origin of the append which implies the extent of validation. For example, offset
|
||||
* commits, which originate from the group coordinator, do not have sequence numbers and therefore
|
||||
* only producer epoch validation is done. Appends which come through replication are not validated
|
||||
* (we assume the validation has already been done) and appends from clients require full validation.
|
||||
* @param verificationStateEntry The most recent entry used for verification if no append has been completed yet otherwise null
|
||||
*/
|
||||
public ProducerAppendInfo(TopicPartition topicPartition,
|
||||
long producerId,
|
||||
ProducerStateEntry currentEntry,
|
||||
AppendOrigin origin) {
|
||||
AppendOrigin origin,
|
||||
VerificationStateEntry verificationStateEntry) {
|
||||
this.topicPartition = topicPartition;
|
||||
this.producerId = producerId;
|
||||
this.currentEntry = currentEntry;
|
||||
this.origin = origin;
|
||||
this.verificationStateEntry = verificationStateEntry;
|
||||
|
||||
updatedEntry = currentEntry.withProducerIdAndBatchMetadata(producerId, Optional.empty());
|
||||
}
|
||||
|
@ -105,6 +109,11 @@ public class ProducerAppendInfo {
|
|||
}
|
||||
|
||||
private void checkSequence(short producerEpoch, int appendFirstSeq, long offset) {
|
||||
if (verificationStateEntry != null && appendFirstSeq > verificationStateEntry.lowestSequence()) {
|
||||
throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " +
|
||||
"offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq +
|
||||
" (incoming seq. number), " + verificationStateEntry.lowestSequence() + " (earliest seen sequence)");
|
||||
}
|
||||
if (producerEpoch != updatedEntry.producerEpoch()) {
|
||||
if (appendFirstSeq != 0) {
|
||||
if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) {
|
||||
|
|
|
@ -187,16 +187,22 @@ public class ProducerStateManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Maybe create the VerificationStateEntry for a given producer ID. Return it if it exists, otherwise return null.
|
||||
* Maybe create the VerificationStateEntry for a given producer ID and return it.
|
||||
* This method also updates the sequence and epoch accordingly.
|
||||
*/
|
||||
public VerificationStateEntry verificationStateEntry(long producerId, boolean createIfAbsent) {
|
||||
return verificationStates.computeIfAbsent(producerId, pid -> {
|
||||
if (createIfAbsent)
|
||||
return new VerificationStateEntry(time.milliseconds());
|
||||
else {
|
||||
return null;
|
||||
}
|
||||
});
|
||||
public VerificationStateEntry maybeCreateVerificationStateEntry(long producerId, int sequence, short epoch) {
|
||||
VerificationStateEntry entry = verificationStates.computeIfAbsent(producerId, pid ->
|
||||
new VerificationStateEntry(time.milliseconds(), sequence, epoch)
|
||||
);
|
||||
entry.maybeUpdateLowestSequenceAndEpoch(sequence, epoch);
|
||||
return entry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the VerificationStateEntry for the producer ID if it exists, otherwise return null.
|
||||
*/
|
||||
public VerificationStateEntry verificationStateEntry(long producerId) {
|
||||
return verificationStates.get(producerId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -408,7 +414,7 @@ public class ProducerStateManager {
|
|||
|
||||
public ProducerAppendInfo prepareUpdate(long producerId, AppendOrigin origin) {
|
||||
ProducerStateEntry currentEntry = lastEntry(producerId).orElse(ProducerStateEntry.empty(producerId));
|
||||
return new ProducerAppendInfo(topicPartition, producerId, currentEntry, origin);
|
||||
return new ProducerAppendInfo(topicPartition, producerId, currentEntry, origin, verificationStateEntry(producerId));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -22,16 +22,23 @@ package org.apache.kafka.storage.internals.log;
|
|||
* After verifying, we retain this object until we append to the log. This prevents any race conditions where the transaction
|
||||
* may end via a control marker before we write to the log. This mechanism is used to prevent hanging transactions.
|
||||
* We remove the verification guard object whenever we write data to the transaction or write an end marker for the transaction.
|
||||
*
|
||||
* We also store the lowest seen sequence to block a higher sequence from being written in the case of the lower sequence needing retries.
|
||||
*
|
||||
* Any lingering entries that are never verified are removed via the producer state entry cleanup mechanism.
|
||||
*/
|
||||
public class VerificationStateEntry {
|
||||
|
||||
final private long timestamp;
|
||||
final private Object verificationGuard;
|
||||
private int lowestSequence;
|
||||
private short epoch;
|
||||
|
||||
public VerificationStateEntry(long timestamp) {
|
||||
public VerificationStateEntry(long timestamp, int sequence, short epoch) {
|
||||
this.timestamp = timestamp;
|
||||
this.verificationGuard = new Object();
|
||||
this.lowestSequence = sequence;
|
||||
this.epoch = epoch;
|
||||
}
|
||||
|
||||
public long timestamp() {
|
||||
|
@ -41,4 +48,37 @@ public class VerificationStateEntry {
|
|||
public Object verificationGuard() {
|
||||
return verificationGuard;
|
||||
}
|
||||
|
||||
public int lowestSequence() {
|
||||
return lowestSequence;
|
||||
}
|
||||
|
||||
public short epoch() {
|
||||
return epoch;
|
||||
}
|
||||
|
||||
/**
|
||||
* An OutOfOrderSequence loop can happen for any idempotent/transactional producer when a lower sequence fails with
|
||||
* a retriable error and a higher sequence is successfully written. The lower sequence will fail with
|
||||
* OutOfOrderSequence and retry until retries run out.
|
||||
*
|
||||
* Here, we keep the lowest sequence seen in order to prevent an OutOfOrderSequence loop when verifying. This does
|
||||
* not solve the error loop for idempotent producers or transactional producers that fail before verification
|
||||
* starts. When verification fails with a retriable error (ie. NOT_COORDINATOR), the VerificationStateEntry
|
||||
* maintains the lowest sequence number it sees and blocks higher sequences from being written to the log. However,
|
||||
* if we encounter a new and lower sequence when verifying, we want to block sequences higher than that new
|
||||
* sequence. Additionally, if the epoch is bumped, the sequence is reset and any previous sequence must be disregarded.
|
||||
*
|
||||
* Thus, we update the lowest sequence if there is a batch needing verification that has:
|
||||
* a) a lower sequence for the same epoch
|
||||
* b) a higher epoch -- update the epoch here too
|
||||
*/
|
||||
public void maybeUpdateLowestSequenceAndEpoch(int incomingSequence, short incomingEpoch) {
|
||||
if (incomingEpoch == epoch && incomingSequence < this.lowestSequence)
|
||||
this.lowestSequence = incomingSequence;
|
||||
if (incomingEpoch > this.epoch) {
|
||||
this.epoch = incomingEpoch;
|
||||
this.lowestSequence = incomingSequence;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue