MINOR: Add clientTransactionVersion to AddPartitionsToTxn requests and persist the value across transitions (#18086)

We can better keep track of which transactions use TV_2 by storing this information in the clientTransactionVersion field and persisting it across state transitions. Also updated some logging and equality code to include this information.

Added a test to ensure version persists. There aren't many TV2 transitions that don't specify TV, but I did test the InitProducerId + epoch overflow case.

Reviewers: Artem Livshits <alivshits@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
This commit is contained in:
Justine Olshan 2024-12-10 12:59:01 -08:00 committed by GitHub
parent d9a71e083b
commit 3cf8745243
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 61 additions and 41 deletions

View File

@ -391,6 +391,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
producerEpoch: Short, producerEpoch: Short,
partitions: collection.Set[TopicPartition], partitions: collection.Set[TopicPartition],
responseCallback: AddPartitionsCallback, responseCallback: AddPartitionsCallback,
clientTransactionVersion: TransactionVersion,
requestLocal: RequestLocal = RequestLocal.noCaching): Unit = { requestLocal: RequestLocal = RequestLocal.noCaching): Unit = {
if (transactionalId == null || transactionalId.isEmpty) { if (transactionalId == null || transactionalId.isEmpty) {
debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request") debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request")
@ -420,7 +421,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
// this is an optimization: if the partitions are already in the metadata reply OK immediately // this is an optimization: if the partitions are already in the metadata reply OK immediately
Left(Errors.NONE) Left(Errors.NONE)
} else { } else {
Right(coordinatorEpoch, txnMetadata.prepareAddPartitions(partitions.toSet, time.milliseconds())) Right(coordinatorEpoch, txnMetadata.prepareAddPartitions(partitions.toSet, time.milliseconds(), clientTransactionVersion))
} }
} }
} }

View File

@ -255,7 +255,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
def prepareNoTransit(): TxnTransitMetadata = { def prepareNoTransit(): TxnTransitMetadata = {
// do not call transitTo as it will set the pending state, a follow-up call to abort the transaction will set its pending state // do not call transitTo as it will set the pending state, a follow-up call to abort the transaction will set its pending state
TxnTransitMetadata(producerId, previousProducerId, nextProducerId, producerEpoch, lastProducerEpoch, txnTimeoutMs, state, topicPartitions.toSet, TxnTransitMetadata(producerId, previousProducerId, nextProducerId, producerEpoch, lastProducerEpoch, txnTimeoutMs, state, topicPartitions.toSet,
txnStartTimestamp, txnLastUpdateTimestamp, TransactionVersion.TV_0) txnStartTimestamp, txnLastUpdateTimestamp, clientTransactionVersion)
} }
def prepareFenceProducerEpoch(): TxnTransitMetadata = { def prepareFenceProducerEpoch(): TxnTransitMetadata = {
@ -267,7 +267,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
val bumpedEpoch = if (hasFailedEpochFence) producerEpoch else (producerEpoch + 1).toShort val bumpedEpoch = if (hasFailedEpochFence) producerEpoch else (producerEpoch + 1).toShort
prepareTransitionTo(PrepareEpochFence, producerId, bumpedEpoch, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, prepareTransitionTo(PrepareEpochFence, producerId, bumpedEpoch, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs,
topicPartitions.toSet, txnStartTimestamp, txnLastUpdateTimestamp) topicPartitions.toSet, txnStartTimestamp, txnLastUpdateTimestamp, clientTransactionVersion)
} }
def prepareIncrementProducerEpoch(newTxnTimeoutMs: Int, def prepareIncrementProducerEpoch(newTxnTimeoutMs: Int,
@ -306,7 +306,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
epochBumpResult match { epochBumpResult match {
case Right((nextEpoch, lastEpoch)) => Right(prepareTransitionTo(Empty, producerId, nextEpoch, lastEpoch, newTxnTimeoutMs, case Right((nextEpoch, lastEpoch)) => Right(prepareTransitionTo(Empty, producerId, nextEpoch, lastEpoch, newTxnTimeoutMs,
immutable.Set.empty[TopicPartition], -1, updateTimestamp)) immutable.Set.empty[TopicPartition], -1, updateTimestamp, clientTransactionVersion))
case Left(err) => Left(err) case Left(err) => Left(err)
} }
@ -320,17 +320,17 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
throw new IllegalStateException("Cannot rotate producer ids while a transaction is still pending") throw new IllegalStateException("Cannot rotate producer ids while a transaction is still pending")
prepareTransitionTo(Empty, newProducerId, 0, if (recordLastEpoch) producerEpoch else RecordBatch.NO_PRODUCER_EPOCH, prepareTransitionTo(Empty, newProducerId, 0, if (recordLastEpoch) producerEpoch else RecordBatch.NO_PRODUCER_EPOCH,
newTxnTimeoutMs, immutable.Set.empty[TopicPartition], -1, updateTimestamp) newTxnTimeoutMs, immutable.Set.empty[TopicPartition], -1, updateTimestamp, clientTransactionVersion)
} }
def prepareAddPartitions(addedTopicPartitions: immutable.Set[TopicPartition], updateTimestamp: Long): TxnTransitMetadata = { def prepareAddPartitions(addedTopicPartitions: immutable.Set[TopicPartition], updateTimestamp: Long, clientTransactionVersion: TransactionVersion): TxnTransitMetadata = {
val newTxnStartTimestamp = state match { val newTxnStartTimestamp = state match {
case Empty | CompleteAbort | CompleteCommit => updateTimestamp case Empty | CompleteAbort | CompleteCommit => updateTimestamp
case _ => txnStartTimestamp case _ => txnStartTimestamp
} }
prepareTransitionTo(Ongoing, producerId, producerEpoch, lastProducerEpoch, txnTimeoutMs, prepareTransitionTo(Ongoing, producerId, producerEpoch, lastProducerEpoch, txnTimeoutMs,
(topicPartitions ++ addedTopicPartitions).toSet, newTxnStartTimestamp, updateTimestamp) (topicPartitions ++ addedTopicPartitions).toSet, newTxnStartTimestamp, updateTimestamp, clientTransactionVersion)
} }
def prepareAbortOrCommit(newState: TransactionState, clientTransactionVersion: TransactionVersion, nextProducerId: Long, updateTimestamp: Long, noPartitionAdded: Boolean): TxnTransitMetadata = { def prepareAbortOrCommit(newState: TransactionState, clientTransactionVersion: TransactionVersion, nextProducerId: Long, updateTimestamp: Long, noPartitionAdded: Boolean): TxnTransitMetadata = {
@ -371,7 +371,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
def prepareDead(): TxnTransitMetadata = { def prepareDead(): TxnTransitMetadata = {
prepareTransitionTo(Dead, producerId, producerEpoch, lastProducerEpoch, txnTimeoutMs, Set.empty[TopicPartition], prepareTransitionTo(Dead, producerId, producerEpoch, lastProducerEpoch, txnTimeoutMs, Set.empty[TopicPartition],
txnStartTimestamp, txnLastUpdateTimestamp) txnStartTimestamp, txnLastUpdateTimestamp, clientTransactionVersion)
} }
/** /**
@ -394,8 +394,9 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
updatedTxnTimeoutMs: Int, updatedTxnTimeoutMs: Int,
updatedTopicPartitions: immutable.Set[TopicPartition], updatedTopicPartitions: immutable.Set[TopicPartition],
updatedTxnStartTimestamp: Long, updatedTxnStartTimestamp: Long,
updateTimestamp: Long): TxnTransitMetadata = { updateTimestamp: Long,
prepareTransitionTo(updatedState, updatedProducerId, RecordBatch.NO_PRODUCER_ID, updatedEpoch, updatedLastEpoch, updatedTxnTimeoutMs, updatedTopicPartitions, updatedTxnStartTimestamp, updateTimestamp, TransactionVersion.TV_0) clientTransactionVersion: TransactionVersion): TxnTransitMetadata = {
prepareTransitionTo(updatedState, updatedProducerId, RecordBatch.NO_PRODUCER_ID, updatedEpoch, updatedLastEpoch, updatedTxnTimeoutMs, updatedTopicPartitions, updatedTxnStartTimestamp, updateTimestamp, clientTransactionVersion)
} }
private def prepareTransitionTo(updatedState: TransactionState, private def prepareTransitionTo(updatedState: TransactionState,
@ -613,7 +614,8 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
s"pendingState=$pendingState, " + s"pendingState=$pendingState, " +
s"topicPartitions=$topicPartitions, " + s"topicPartitions=$topicPartitions, " +
s"txnStartTimestamp=$txnStartTimestamp, " + s"txnStartTimestamp=$txnStartTimestamp, " +
s"txnLastUpdateTimestamp=$txnLastUpdateTimestamp)" s"txnLastUpdateTimestamp=$txnLastUpdateTimestamp, " +
s"clientTransactionVersion=$clientTransactionVersion)"
} }
override def equals(that: Any): Boolean = that match { override def equals(that: Any): Boolean = that match {
@ -626,13 +628,14 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
state.equals(other.state) && state.equals(other.state) &&
topicPartitions.equals(other.topicPartitions) && topicPartitions.equals(other.topicPartitions) &&
txnStartTimestamp == other.txnStartTimestamp && txnStartTimestamp == other.txnStartTimestamp &&
txnLastUpdateTimestamp == other.txnLastUpdateTimestamp txnLastUpdateTimestamp == other.txnLastUpdateTimestamp &&
clientTransactionVersion == other.clientTransactionVersion
case _ => false case _ => false
} }
override def hashCode(): Int = { override def hashCode(): Int = {
val fields = Seq(transactionalId, producerId, producerEpoch, txnTimeoutMs, state, topicPartitions, val fields = Seq(transactionalId, producerId, producerEpoch, txnTimeoutMs, state, topicPartitions,
txnStartTimestamp, txnLastUpdateTimestamp) txnStartTimestamp, txnLastUpdateTimestamp, clientTransactionVersion)
fields.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) fields.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
} }
} }

View File

@ -2328,14 +2328,11 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendResponseMaybeThrottle(request, createResponse) requestHelper.sendResponseMaybeThrottle(request, createResponse)
} }
// If the request is greater than version 4, we know the client supports transaction version 2.
val clientTransactionVersion = if (endTxnRequest.version() > 4) TransactionVersion.TV_2 else TransactionVersion.TV_0
txnCoordinator.handleEndTransaction(endTxnRequest.data.transactionalId, txnCoordinator.handleEndTransaction(endTxnRequest.data.transactionalId,
endTxnRequest.data.producerId, endTxnRequest.data.producerId,
endTxnRequest.data.producerEpoch, endTxnRequest.data.producerEpoch,
endTxnRequest.result(), endTxnRequest.result(),
clientTransactionVersion, TransactionVersion.transactionVersionForEndTxn(endTxnRequest),
sendResponseCallback, sendResponseCallback,
requestLocal) requestLocal)
} else } else
@ -2614,6 +2611,7 @@ class KafkaApis(val requestChannel: RequestChannel,
transaction.producerEpoch, transaction.producerEpoch,
authorizedPartitions, authorizedPartitions,
sendResponseCallback, sendResponseCallback,
TransactionVersion.transactionVersionForAddPartitionsToTxn(addPartitionsToTxnRequest),
requestLocal) requestLocal)
} else { } else {
txnCoordinator.handleVerifyPartitionsInTransaction(transactionalId, txnCoordinator.handleVerifyPartitionsInTransaction(transactionalId,
@ -2673,6 +2671,7 @@ class KafkaApis(val requestChannel: RequestChannel,
addOffsetsToTxnRequest.data.producerEpoch, addOffsetsToTxnRequest.data.producerEpoch,
Set(offsetTopicPartition), Set(offsetTopicPartition),
sendResponseCallback, sendResponseCallback,
TransactionVersion.TV_0, // This request will always come from the client not using TV 2.
requestLocal) requestLocal)
} }
} }

View File

@ -547,6 +547,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
txnMetadata.producerEpoch, txnMetadata.producerEpoch,
partitions, partitions,
resultCallback, resultCallback,
TransactionVersion.TV_2,
RequestLocal.withThreadConfinedCaching) RequestLocal.withThreadConfinedCaching)
replicaManager.tryCompleteActions() replicaManager.tryCompleteActions()
} }

View File

@ -209,19 +209,19 @@ class TransactionCoordinatorTest {
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId))) when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
.thenReturn(Right(None)) .thenReturn(Right(None))
coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback) coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback, TV_0)
assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error) assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error)
} }
@Test @Test
def shouldRespondWithInvalidRequestAddPartitionsToTransactionWhenTransactionalIdIsEmpty(): Unit = { def shouldRespondWithInvalidRequestAddPartitionsToTransactionWhenTransactionalIdIsEmpty(): Unit = {
coordinator.handleAddPartitionsToTransaction("", 0L, 1, partitions, errorsCallback) coordinator.handleAddPartitionsToTransaction("", 0L, 1, partitions, errorsCallback, TV_0)
assertEquals(Errors.INVALID_REQUEST, error) assertEquals(Errors.INVALID_REQUEST, error)
} }
@Test @Test
def shouldRespondWithInvalidRequestAddPartitionsToTransactionWhenTransactionalIdIsNull(): Unit = { def shouldRespondWithInvalidRequestAddPartitionsToTransactionWhenTransactionalIdIsNull(): Unit = {
coordinator.handleAddPartitionsToTransaction(null, 0L, 1, partitions, errorsCallback) coordinator.handleAddPartitionsToTransaction(null, 0L, 1, partitions, errorsCallback, TV_0)
assertEquals(Errors.INVALID_REQUEST, error) assertEquals(Errors.INVALID_REQUEST, error)
} }
@ -230,7 +230,7 @@ class TransactionCoordinatorTest {
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId))) when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
.thenReturn(Left(Errors.NOT_COORDINATOR)) .thenReturn(Left(Errors.NOT_COORDINATOR))
coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback) coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback, TV_0)
assertEquals(Errors.NOT_COORDINATOR, error) assertEquals(Errors.NOT_COORDINATOR, error)
} }
@ -239,7 +239,7 @@ class TransactionCoordinatorTest {
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId))) when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
.thenReturn(Left(Errors.COORDINATOR_LOAD_IN_PROGRESS)) .thenReturn(Left(Errors.COORDINATOR_LOAD_IN_PROGRESS))
coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback) coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback, TV_0)
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, error) assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, error)
} }
@ -313,7 +313,7 @@ class TransactionCoordinatorTest {
new TransactionMetadata(transactionalId, 0, 0, RecordBatch.NO_PRODUCER_ID, new TransactionMetadata(transactionalId, 0, 0, RecordBatch.NO_PRODUCER_ID,
0, RecordBatch.NO_PRODUCER_EPOCH, 0, state, mutable.Set.empty, 0, 0, TV_2))))) 0, RecordBatch.NO_PRODUCER_EPOCH, 0, state, mutable.Set.empty, 0, 0, TV_2)))))
coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback) coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback, TV_2)
assertEquals(Errors.CONCURRENT_TRANSACTIONS, error) assertEquals(Errors.CONCURRENT_TRANSACTIONS, error)
} }
@ -325,7 +325,7 @@ class TransactionCoordinatorTest {
new TransactionMetadata(transactionalId, 0, 0, RecordBatch.NO_PRODUCER_ID, new TransactionMetadata(transactionalId, 0, 0, RecordBatch.NO_PRODUCER_ID,
10, 9, 0, PrepareCommit, mutable.Set.empty, 0, 0, TV_2))))) 10, 9, 0, PrepareCommit, mutable.Set.empty, 0, 0, TV_2)))))
coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback) coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback, TV_2)
assertEquals(Errors.PRODUCER_FENCED, error) assertEquals(Errors.PRODUCER_FENCED, error)
} }
@ -359,7 +359,7 @@ class TransactionCoordinatorTest {
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId))) when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata)))) .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
coordinator.handleAddPartitionsToTransaction(transactionalId, producerId, producerEpoch, partitions, errorsCallback) coordinator.handleAddPartitionsToTransaction(transactionalId, producerId, producerEpoch, partitions, errorsCallback, clientTransactionVersion)
verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId)) verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId))
verify(transactionManager).appendTransactionToLog( verify(transactionManager).appendTransactionToLog(
@ -379,7 +379,7 @@ class TransactionCoordinatorTest {
new TransactionMetadata(transactionalId, 0, 0, RecordBatch.NO_PRODUCER_ID, new TransactionMetadata(transactionalId, 0, 0, RecordBatch.NO_PRODUCER_ID,
0, RecordBatch.NO_PRODUCER_EPOCH, 0, Empty, partitions, 0, 0, TV_0))))) 0, RecordBatch.NO_PRODUCER_EPOCH, 0, Empty, partitions, 0, 0, TV_0)))))
coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback) coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback, TV_0)
assertEquals(Errors.NONE, error) assertEquals(Errors.NONE, error)
verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId)) verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId))
} }

View File

@ -253,7 +253,7 @@ class TransactionMetadataTest {
clientTransactionVersion = TV_0) clientTransactionVersion = TV_0)
// let new time be smaller; when transiting from Empty the start time would be updated to the update-time // let new time be smaller; when transiting from Empty the start time would be updated to the update-time
var transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0)), time.milliseconds() - 1) var transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0)), time.milliseconds() - 1, TV_0)
txnMetadata.completeTransitionTo(transitMetadata) txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0)), txnMetadata.topicPartitions) assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0)), txnMetadata.topicPartitions)
assertEquals(producerId, txnMetadata.producerId) assertEquals(producerId, txnMetadata.producerId)
@ -263,7 +263,7 @@ class TransactionMetadataTest {
assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp) assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
// add another partition, check that in Ongoing state the start timestamp would not change to update time // add another partition, check that in Ongoing state the start timestamp would not change to update time
transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds() - 2) transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds() - 2, TV_0)
txnMetadata.completeTransitionTo(transitMetadata) txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)), txnMetadata.topicPartitions) assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)), txnMetadata.topicPartitions)
assertEquals(producerId, txnMetadata.producerId) assertEquals(producerId, txnMetadata.producerId)

View File

@ -389,7 +389,7 @@ class TransactionStateManagerTest {
// update the metadata to ongoing with two partitions // update the metadata to ongoing with two partitions
val newMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0), val newMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1)), time.milliseconds()) new TopicPartition("topic1", 1)), time.milliseconds(), TV_0)
// append the new metadata into log // append the new metadata into log
transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch, newMetadata, assertCallback, requestLocal = RequestLocal.withThreadConfinedCaching) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch, newMetadata, assertCallback, requestLocal = RequestLocal.withThreadConfinedCaching)
@ -404,7 +404,7 @@ class TransactionStateManagerTest {
transactionManager.putTransactionStateIfNotExists(txnMetadata1) transactionManager.putTransactionStateIfNotExists(txnMetadata1)
expectedError = Errors.COORDINATOR_NOT_AVAILABLE expectedError = Errors.COORDINATOR_NOT_AVAILABLE
var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION) prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION)
val requestLocal = RequestLocal.withThreadConfinedCaching val requestLocal = RequestLocal.withThreadConfinedCaching
@ -412,19 +412,19 @@ class TransactionStateManagerTest {
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty) assertTrue(txnMetadata1.pendingState.isEmpty)
failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS) prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS)
transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty) assertTrue(txnMetadata1.pendingState.isEmpty)
failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND) prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty) assertTrue(txnMetadata1.pendingState.isEmpty)
failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.REQUEST_TIMED_OUT) prepareForTxnMessageAppend(Errors.REQUEST_TIMED_OUT)
transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
@ -437,7 +437,7 @@ class TransactionStateManagerTest {
transactionManager.putTransactionStateIfNotExists(txnMetadata1) transactionManager.putTransactionStateIfNotExists(txnMetadata1)
expectedError = Errors.NOT_COORDINATOR expectedError = Errors.NOT_COORDINATOR
var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.NOT_LEADER_OR_FOLLOWER) prepareForTxnMessageAppend(Errors.NOT_LEADER_OR_FOLLOWER)
val requestLocal = RequestLocal.withThreadConfinedCaching val requestLocal = RequestLocal.withThreadConfinedCaching
@ -445,7 +445,7 @@ class TransactionStateManagerTest {
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty) assertTrue(txnMetadata1.pendingState.isEmpty)
failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.NONE) prepareForTxnMessageAppend(Errors.NONE)
transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch) transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch)
transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
@ -468,7 +468,7 @@ class TransactionStateManagerTest {
transactionManager.putTransactionStateIfNotExists(txnMetadata1) transactionManager.putTransactionStateIfNotExists(txnMetadata1)
expectedError = Errors.COORDINATOR_LOAD_IN_PROGRESS expectedError = Errors.COORDINATOR_LOAD_IN_PROGRESS
val failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) val failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.NONE) prepareForTxnMessageAppend(Errors.NONE)
transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch) transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch)
@ -482,7 +482,7 @@ class TransactionStateManagerTest {
transactionManager.putTransactionStateIfNotExists(txnMetadata1) transactionManager.putTransactionStateIfNotExists(txnMetadata1)
expectedError = Errors.UNKNOWN_SERVER_ERROR expectedError = Errors.UNKNOWN_SERVER_ERROR
var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.MESSAGE_TOO_LARGE) prepareForTxnMessageAppend(Errors.MESSAGE_TOO_LARGE)
val requestLocal = RequestLocal.withThreadConfinedCaching val requestLocal = RequestLocal.withThreadConfinedCaching
@ -490,7 +490,7 @@ class TransactionStateManagerTest {
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty) assertTrue(txnMetadata1.pendingState.isEmpty)
failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.RECORD_LIST_TOO_LARGE) prepareForTxnMessageAppend(Errors.RECORD_LIST_TOO_LARGE)
transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
@ -503,7 +503,7 @@ class TransactionStateManagerTest {
transactionManager.putTransactionStateIfNotExists(txnMetadata1) transactionManager.putTransactionStateIfNotExists(txnMetadata1)
expectedError = Errors.COORDINATOR_NOT_AVAILABLE expectedError = Errors.COORDINATOR_NOT_AVAILABLE
val failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) val failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION) prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION)
transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, _ => true, RequestLocal.withThreadConfinedCaching) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, _ => true, RequestLocal.withThreadConfinedCaching)
@ -522,7 +522,7 @@ class TransactionStateManagerTest {
expectedError = Errors.NOT_COORDINATOR expectedError = Errors.NOT_COORDINATOR
val newMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0), val newMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1)), time.milliseconds()) new TopicPartition("topic1", 1)), time.milliseconds(), TV_0)
// modify the cache while trying to append the new metadata // modify the cache while trying to append the new metadata
txnMetadata1.producerEpoch = (txnMetadata1.producerEpoch + 1).toShort txnMetadata1.producerEpoch = (txnMetadata1.producerEpoch + 1).toShort
@ -541,7 +541,7 @@ class TransactionStateManagerTest {
expectedError = Errors.INVALID_PRODUCER_EPOCH expectedError = Errors.INVALID_PRODUCER_EPOCH
val newMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0), val newMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1)), time.milliseconds()) new TopicPartition("topic1", 1)), time.milliseconds(), TV_0)
// modify the cache while trying to append the new metadata // modify the cache while trying to append the new metadata
txnMetadata1.pendingState = None txnMetadata1.pendingState = None

View File

@ -2301,6 +2301,7 @@ class KafkaApisTest extends Logging {
ArgumentMatchers.eq(epoch), ArgumentMatchers.eq(epoch),
ArgumentMatchers.eq(Set(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition))), ArgumentMatchers.eq(Set(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition))),
responseCallback.capture(), responseCallback.capture(),
ArgumentMatchers.eq(TransactionVersion.TV_0),
ArgumentMatchers.eq(requestLocal) ArgumentMatchers.eq(requestLocal)
)).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED)) )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
val kafkaApis = createKafkaApis() val kafkaApis = createKafkaApis()
@ -2359,6 +2360,7 @@ class KafkaApisTest extends Logging {
ArgumentMatchers.eq(epoch), ArgumentMatchers.eq(epoch),
ArgumentMatchers.eq(Set(topicPartition)), ArgumentMatchers.eq(Set(topicPartition)),
responseCallback.capture(), responseCallback.capture(),
ArgumentMatchers.eq(TransactionVersion.TV_0),
ArgumentMatchers.eq(requestLocal) ArgumentMatchers.eq(requestLocal)
)).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED)) )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
val kafkaApis = createKafkaApis() val kafkaApis = createKafkaApis()
@ -2434,6 +2436,7 @@ class KafkaApisTest extends Logging {
ArgumentMatchers.eq(epoch), ArgumentMatchers.eq(epoch),
ArgumentMatchers.eq(Set(tp0)), ArgumentMatchers.eq(Set(tp0)),
responseCallback.capture(), responseCallback.capture(),
any[TransactionVersion],
ArgumentMatchers.eq(requestLocal) ArgumentMatchers.eq(requestLocal)
)).thenAnswer(_ => responseCallback.getValue.apply(Errors.NONE)) )).thenAnswer(_ => responseCallback.getValue.apply(Errors.NONE))

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.kafka.server.common; package org.apache.kafka.server.common;
import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
import org.apache.kafka.common.requests.EndTxnRequest;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
@ -55,6 +58,16 @@ public enum TransactionVersion implements FeatureVersion {
return (TransactionVersion) Feature.TRANSACTION_VERSION.fromFeatureLevel(version, true); return (TransactionVersion) Feature.TRANSACTION_VERSION.fromFeatureLevel(version, true);
} }
public static TransactionVersion transactionVersionForAddPartitionsToTxn(AddPartitionsToTxnRequest request) {
// If the request is greater than version 3, we know the client supports transaction version 2.
return request.version() > 3 ? TV_2 : TV_0;
}
public static TransactionVersion transactionVersionForEndTxn(EndTxnRequest request) {
// If the request is greater than version 4, we know the client supports transaction version 2.
return request.version() > 4 ? TV_2 : TV_0;
}
@Override @Override
public String featureName() { public String featureName() {
return FEATURE_NAME; return FEATURE_NAME;