diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 6a263ccc63e..ed846cd2b4c 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -111,7 +111,7 @@
-
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 71801712507..8dea9c6c9d7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -40,12 +40,14 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@@ -440,9 +442,11 @@ public class Sender implements Runnable {
* Handle a produce response
*/
private void handleProduceResponse(ClientResponse response, Map batches, long now) {
- int correlationId = response.requestHeader().correlationId();
+ RequestHeader requestHeader = response.requestHeader();
+ int correlationId = requestHeader.correlationId();
if (response.wasDisconnected()) {
- log.trace("Cancelled request {} due to node {} being disconnected", response, response.destination());
+ ApiKeys api = ApiKeys.forId(requestHeader.apiKey());
+ log.trace("Cancelled {} request {} with correlation id {} due to node {} being disconnected", api, requestHeader, correlationId, response.destination());
for (ProducerBatch batch : batches.values())
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now);
} else if (response.versionMismatch() != null) {
diff --git a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
index 313087cc761..82c4a8c3a63 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
@@ -45,6 +45,7 @@ private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata,
throw new IllegalStateException(s"Delayed write txn marker operation for metadata $txnMetadata has timed out, this should never happen.")
}
+ // TODO: if we will always return NONE upon completion, we can remove the error code in the param
override def onComplete(): Unit = completionCallback(Errors.NONE)
}
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 8148cb6a67b..ebfbde5b0bb 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -197,7 +197,7 @@ class TransactionCoordinator(brokerId: Int,
case Ongoing =>
// indicate to abort the current ongoing txn first
- Right(coordinatorEpoch, txnMetadata.prepareNoTransit())
+ Right(coordinatorEpoch, txnMetadata.prepareFenceProducerEpoch())
}
}
}
@@ -266,8 +266,8 @@ class TransactionCoordinator(brokerId: Int,
txnManager.loadTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch, txnMarkerChannelManager.addTxnMarkersToSend)
}
- def handleTxnEmigration(txnTopicPartitionId: Int) {
- txnManager.removeTransactionsForTxnTopicPartition(txnTopicPartitionId)
+ def handleTxnEmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int) {
+ txnManager.removeTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch)
txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId)
}
@@ -382,8 +382,8 @@ class TransactionCoordinator(brokerId: Int,
throw new IllegalStateException("Cannot find the metadata in coordinator's cache while it is still the leader of the txn topic partition")
} else {
// this transactional id no longer exists, maybe the corresponding partition has already been migrated out.
- info(s"Updating $transactionalId's transaction state to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed after the transaction message " +
- s"has been appended to the log. The partition ${partitionFor(transactionalId)} may have migrated as the metadata is no longer in the cache")
+ info(s"Updating $transactionalId's transaction state (txn topic partition ${partitionFor(transactionalId)}) to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId " +
+ s"failed after the transaction message has been appended to the log since the corresponding metadata does not exist in the cache anymore")
Left(Errors.NOT_COORDINATOR)
}
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 90c9c42ee3b..9aa3e703f6a 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -230,8 +230,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
case None =>
// this transactional id no longer exists, maybe the corresponding partition has already been migrated out.
// we will stop appending the completed log entry to transaction topic as the new leader should be doing it.
- info(s"Updating $transactionalId's transaction state to $txnMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed after the transaction message " +
- s"has been appended to the log. The partition ${txnStateManager.partitionFor(transactionalId)} may have migrated as the metadata is no longer in the cache")
+ info(s"Updating $transactionalId's transaction state (txn topic partition ${txnStateManager.partitionFor(transactionalId)}) to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId " +
+ s"failed after the transaction message has been appended to the log since the corresponding metadata does not exist in the cache anymore")
}
case other =>
@@ -295,6 +295,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
txnMarkerPurgatory.cancelForKey(transactionalId)
}
+ // FIXME: Currently, operations registered under partition in txnMarkerPurgatory
+ // are only cleaned during coordinator immigration, which happens rarely. This means potential memory leak
def completeSendMarkersForTxnId(transactionalId: String): Unit = {
txnMarkerPurgatory.checkAndComplete(transactionalId)
}
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 39c7914a23a..bcd95a11b6c 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -17,11 +17,10 @@
package kafka.coordinator.transaction
-import kafka.server.DelayedOperationPurgatory
import kafka.utils.Logging
import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler}
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.WriteTxnMarkersResponse
import scala.collection.mutable
@@ -32,18 +31,44 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
txnMarkerChannelManager: TransactionMarkerChannelManager,
txnIdAndMarkerEntries: java.util.List[TxnIdAndMarkerEntry]) extends RequestCompletionHandler with Logging {
override def onComplete(response: ClientResponse): Unit = {
- val correlationId = response.requestHeader.correlationId
+ val requestHeader = response.requestHeader
+ val correlationId = requestHeader.correlationId
if (response.wasDisconnected) {
- trace(s"Cancelled request $response due to node ${response.destination} being disconnected")
- // re-enqueue the markers
+ val api = ApiKeys.forId(requestHeader.apiKey)
+ val correlation = requestHeader.correlationId
+ trace(s"Cancelled $api request $requestHeader with correlation id $correlation due to node ${response.destination} being disconnected")
+
for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries) {
+ val transactionalId = txnIdAndMarker.txnId
val txnMarker = txnIdAndMarker.txnMarkerEntry
- txnMarkerChannelManager.addTxnMarkersToBrokerQueue(txnIdAndMarker.txnId,
- txnMarker.producerId(),
- txnMarker.producerEpoch(),
- txnMarker.transactionResult(),
- txnMarker.coordinatorEpoch(),
- txnMarker.partitions().toSet)
+
+ txnStateManager.getTransactionState(transactionalId) match {
+ case None =>
+ info(s"Transaction metadata for $transactionalId does not exist in the cache" +
+ s"any more; cancel sending transaction markers $txnMarker to the brokers")
+
+ txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
+
+ case Some(epochAndMetadata) =>
+ if (epochAndMetadata.coordinatorEpoch != txnMarker.coordinatorEpoch) {
+ // coordinator epoch has changed, just cancel it from the purgatory
+ info(s"Transaction coordinator epoch for $transactionalId has changed from ${txnMarker.coordinatorEpoch} to " +
+ s"${epochAndMetadata.coordinatorEpoch}; cancel sending transaction markers $txnMarker to the brokers")
+
+ txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
+ } else {
+ // re-enqueue the markers
+ trace(s"Re-enqueuing ${txnMarker.transactionResult} transaction markers for transactional id $transactionalId " +
+ s"under coordinator epoch ${txnMarker.coordinatorEpoch}")
+
+ txnMarkerChannelManager.addTxnMarkersToBrokerQueue(transactionalId,
+ txnMarker.producerId,
+ txnMarker.producerEpoch,
+ txnMarker.transactionResult,
+ txnMarker.coordinatorEpoch,
+ txnMarker.partitions.toSet)
+ }
+ }
}
} else {
trace(s"Received response $response from node ${response.destination} with correlation id $correlationId")
@@ -60,10 +85,9 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
txnStateManager.getTransactionState(transactionalId) match {
case None =>
- info(s"Transaction topic partition for $transactionalId may likely has emigrated, as the corresponding metadata do not exist in the cache" +
+ info(s"Transaction metadata for $transactionalId does not exist in the cache" +
s"any more; cancel sending transaction markers $txnMarker to the brokers")
- // txn topic partition has likely emigrated, just cancel it from the purgatory
txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
case Some(epochAndMetadata) =>
@@ -121,13 +145,16 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
if (!abortSending) {
if (retryPartitions.nonEmpty) {
+ trace(s"Re-enqueuing ${txnMarker.transactionResult} transaction markers for transactional id $transactionalId " +
+ s"under coordinator epoch ${txnMarker.coordinatorEpoch}")
+
// re-enqueue with possible new leaders of the partitions
txnMarkerChannelManager.addTxnMarkersToBrokerQueue(
transactionalId,
- txnMarker.producerId(),
- txnMarker.producerEpoch(),
+ txnMarker.producerId,
+ txnMarker.producerEpoch,
txnMarker.transactionResult,
- txnMarker.coordinatorEpoch(),
+ txnMarker.coordinatorEpoch,
retryPartitions.toSet)
} else {
txnMarkerChannelManager.completeSendMarkersForTxnId(transactionalId)
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 0d176aa816f..d739b9aafc7 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -135,16 +135,26 @@ private[transaction] class TransactionMetadata(val producerId: Long,
}
def removePartition(topicPartition: TopicPartition): Unit = {
- if (pendingState.isDefined && (state != PrepareCommit && state != PrepareAbort))
- throw new IllegalStateException(s"Transation metadata's current state is $state, and its pending state is $state " +
+ if (state != PrepareCommit && state != PrepareAbort)
+ throw new IllegalStateException(s"Transaction metadata's current state is $state, and its pending state is $pendingState " +
s"while trying to remove partitions whose txn marker has been sent, this is not expected")
topicPartitions -= topicPartition
}
- def prepareNoTransit(): TransactionMetadataTransition =
- // do not call transitTo as it will set the pending state
+ // this is visible for test only
+ def prepareNoTransit(): TransactionMetadataTransition = {
+ // do not call transitTo as it will set the pending state, a follow-up call to abort the transaction will set its pending state
TransactionMetadataTransition(producerId, producerEpoch, txnTimeoutMs, state, topicPartitions.toSet, txnStartTimestamp, txnLastUpdateTimestamp)
+ }
+
+ def prepareFenceProducerEpoch(): TransactionMetadataTransition = {
+ // bump up the epoch to let the txn markers be able to override the current producer epoch
+ producerEpoch = (producerEpoch + 1).toShort
+
+ // do not call transitTo as it will set the pending state, a follow-up call to abort the transaction will set its pending state
+ TransactionMetadataTransition(producerId, producerEpoch, txnTimeoutMs, state, topicPartitions.toSet, txnStartTimestamp, txnLastUpdateTimestamp)
+ }
def prepareIncrementProducerEpoch(newTxnTimeoutMs: Int,
updateTimestamp: Long): TransactionMetadataTransition = {
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 2327213f782..0952b5df42d 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -70,11 +70,16 @@ class TransactionStateManager(brokerId: Int,
/** shutting down flag */
private val shuttingDown = new AtomicBoolean(false)
+ // TODO: we need to extend this lock as a read-write lock and reading access to it needs to be covered
+ // by the read lock
/** lock protecting access to loading and owned partition sets */
private val stateLock = new ReentrantLock()
- /** partitions of transaction topic that are being loaded, partition lock should be called BEFORE accessing this set */
- private val loadingPartitions: mutable.Set[Int] = mutable.Set()
+ /** partitions of transaction topic that are being loaded, state lock should be called BEFORE accessing this set */
+ private val loadingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
+
+ /** partitions of transaction topic that are being removed, state lock should be called BEFORE accessing this set */
+ private val leavingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
/** transaction metadata cache indexed by assigned transaction topic partition ids */
private val transactionMetadataCache: mutable.Map[Int, TxnMetadataCacheEntry] = mutable.Map()
@@ -113,6 +118,16 @@ class TransactionStateManager(brokerId: Int,
def getTransactionState(transactionalId: String): Option[CoordinatorEpochAndTxnMetadata] = {
val partitionId = partitionFor(transactionalId)
+ // we only need to check leaving partition set but not loading partition set since there are three possible cases:
+ // 1) it is not in the loading partitions set, hence safe to return NONE
+ // 2) it is in the loading partitions with a smaller epoch, hence safe to return NONE
+ // 3) it is in the loading partition with a larger epoch, return NONE is also fine as it
+ // indicates the metadata is not exist at least for now.
+ //
+ // 4) it is NOT possible to be in the loading partition with the same epoch
+ if (leavingPartitions.exists(_.txnPartitionId == partitionId))
+ return None
+
transactionMetadataCache.get(partitionId).flatMap { cacheEntry =>
cacheEntry.metadataPerTransactionalId.get(transactionalId) match {
case null => None
@@ -174,7 +189,15 @@ class TransactionStateManager(brokerId: Int,
def isCoordinatorLoadingInProgress(transactionalId: String): Boolean = inLock(stateLock) {
val partitionId = partitionFor(transactionalId)
- loadingPartitions.contains(partitionId)
+
+ // we only need to check loading partition set but not leaving partition set since there are three possible cases:
+ // 1) it is not in the leaving partitions set, hence safe to return true
+ // 2) it is in the leaving partitions with a smaller epoch than the latest loading epoch, hence safe to return NONE
+ // 3) it is in the leaving partition with a larger epoch, return true is also OK since the client will then retry
+ // later be notified that this coordinator is no longer be the transaction coordinator for him
+ //
+ // 4) it is NOT possible to be in the leaving partition with the same epoch
+ loadingPartitions.exists(_.txnPartitionId == partitionId)
}
/**
@@ -203,8 +226,9 @@ class TransactionStateManager(brokerId: Int,
try {
while (currOffset < logEndOffset
- && loadingPartitions.contains(topicPartition.partition())
- && !shuttingDown.get()) {
+ && !shuttingDown.get()
+ && inLock(stateLock) {loadingPartitions.exists { idAndEpoch: TransactionPartitionAndLeaderEpoch =>
+ idAndEpoch.txnPartitionId == topicPartition.partition && idAndEpoch.coordinatorEpoch == coordinatorEpoch}}) {
val fetchDataInfo = log.read(currOffset, config.transactionLogLoadBufferSize, maxOffset = None,
minOneMessage = true, isolationLevel = IsolationLevel.READ_UNCOMMITTED)
val memRecords = fetchDataInfo.records match {
@@ -275,38 +299,43 @@ class TransactionStateManager(brokerId: Int,
validateTransactionTopicPartitionCountIsStable()
val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+ val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
inLock(stateLock) {
- loadingPartitions.add(partitionId)
+ leavingPartitions.remove(partitionAndLeaderEpoch)
+ loadingPartitions.add(partitionAndLeaderEpoch)
}
def loadTransactions() {
info(s"Loading transaction metadata from $topicPartition")
val loadedTransactions = loadTransactionMetadata(topicPartition, coordinatorEpoch)
- loadedTransactions.foreach {
- case (transactionalId, txnMetadata) =>
- val result = txnMetadata synchronized {
- // if state is PrepareCommit or PrepareAbort we need to complete the transaction
- txnMetadata.state match {
- case PrepareAbort =>
- Some(TransactionResult.ABORT, txnMetadata.prepareComplete(time.milliseconds()))
- case PrepareCommit =>
- Some(TransactionResult.COMMIT, txnMetadata.prepareComplete(time.milliseconds()))
- case _ =>
- // nothing need to be done
- None
- }
- }
-
- result.foreach { case (command, newMetadata) =>
- sendTxnMarkers(transactionalId, coordinatorEpoch, command, txnMetadata, newMetadata)
- }
- }
-
inLock(stateLock) {
- addLoadedTransactionsToCache(topicPartition.partition, coordinatorEpoch, loadedTransactions)
- loadingPartitions.remove(partitionId)
+ if (loadingPartitions.contains(partitionAndLeaderEpoch)) {
+ addLoadedTransactionsToCache(topicPartition.partition, coordinatorEpoch, loadedTransactions)
+
+ loadedTransactions.foreach {
+ case (transactionalId, txnMetadata) =>
+ val result = txnMetadata synchronized {
+ // if state is PrepareCommit or PrepareAbort we need to complete the transaction
+ txnMetadata.state match {
+ case PrepareAbort =>
+ Some(TransactionResult.ABORT, txnMetadata.prepareComplete(time.milliseconds()))
+ case PrepareCommit =>
+ Some(TransactionResult.COMMIT, txnMetadata.prepareComplete(time.milliseconds()))
+ case _ =>
+ // nothing need to be done
+ None
+ }
+ }
+
+ result.foreach { case (command, newMetadata) =>
+ sendTxnMarkers(transactionalId, coordinatorEpoch, command, txnMetadata, newMetadata)
+ }
+ }
+
+ loadingPartitions.remove(partitionAndLeaderEpoch)
+ }
}
}
@@ -317,23 +346,31 @@ class TransactionStateManager(brokerId: Int,
* When this broker becomes a follower for a transaction log partition, clear out the cache for corresponding transactional ids
* that belong to that partition.
*/
- def removeTransactionsForTxnTopicPartition(partitionId: Int) {
+ def removeTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int) {
validateTransactionTopicPartitionCountIsStable()
val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+ val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
+
+ inLock(stateLock) {
+ loadingPartitions.remove(partitionAndLeaderEpoch)
+ leavingPartitions.add(partitionAndLeaderEpoch)
+ }
def removeTransactions() {
inLock(stateLock) {
- transactionMetadataCache.remove(partitionId) match {
- case Some(txnMetadataCacheEntry) =>
- info(s"Removed ${txnMetadataCacheEntry.metadataPerTransactionalId.size} cached transaction metadata for $topicPartition on follower transition")
+ if (leavingPartitions.contains(partitionAndLeaderEpoch)) {
+ transactionMetadataCache.remove(partitionId) match {
+ case Some(txnMetadataCacheEntry) =>
+ info(s"Removed ${txnMetadataCacheEntry.metadataPerTransactionalId.size} cached transaction metadata for $topicPartition on follower transition")
- case None =>
- info(s"Trying to remove cached transaction metadata for $topicPartition on follower transition but there is no entries remaining; " +
- s"it is likely that another process for removing the cached entries has just executed earlier before")
+ case None =>
+ info(s"Trying to remove cached transaction metadata for $topicPartition on follower transition but there is no entries remaining; " +
+ s"it is likely that another process for removing the cached entries has just executed earlier before")
+ }
+
+ leavingPartitions.remove(partitionAndLeaderEpoch)
}
-
- loadingPartitions.remove(partitionId)
}
}
@@ -437,8 +474,8 @@ class TransactionStateManager(brokerId: Int,
case None =>
// this transactional id no longer exists, maybe the corresponding partition has already been migrated out.
// return NOT_COORDINATOR to let the client re-discover the transaction coordinator
- info(s"Updating $transactionalId's transaction state to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed after the transaction message " +
- s"has been appended to the log. The partition ${partitionFor(transactionalId)} may have migrated as the metadata is no longer in the cache")
+ info(s"Updating $transactionalId's transaction state (txn topic partition ${partitionFor(transactionalId)}) to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId " +
+ s"failed after the transaction message has been appended to the log since the corresponding metadata does not exist in the cache anymore")
responseError = Errors.NOT_COORDINATOR
}
@@ -480,3 +517,5 @@ private[transaction] case class TransactionConfig(transactionalIdExpirationMs: I
removeExpiredTransactionsIntervalMs: Int = TransactionStateManager.DefaultRemoveExpiredTransactionsIntervalMs)
case class TransactionalIdAndProducerIdEpoch(transactionalId: String, producerId: Long, producerEpoch: Short)
+
+case class TransactionPartitionAndLeaderEpoch(txnPartitionId: Int, coordinatorEpoch: Int)
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 64016007bf2..fe7dada2098 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -121,7 +121,7 @@ object DelayedOperationPurgatory {
purgeInterval: Int = 1000,
reaperEnabled: Boolean = true): DelayedOperationPurgatory[T] = {
val timer = new SystemTimer(purgatoryName)
- new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval)
+ new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval, reaperEnabled)
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 31680b0cd42..33b696a9c7c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -150,7 +150,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (partition.topic == GROUP_METADATA_TOPIC_NAME)
groupCoordinator.handleGroupEmigration(partition.partitionId)
else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME)
- txnCoordinator.handleTxnEmigration(partition.partitionId)
+ txnCoordinator.handleTxnEmigration(partition.partitionId, partition.getLeaderEpoch)
}
}
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 3e19bb9d78d..e8669e93103 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -254,7 +254,7 @@ class TransactionsTest extends KafkaServerTestHarness {
}
}
- @Ignore @Test
+ @Test
def testFencingOnSend() {
val transactionalId = "my-t.id"
val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index 2f4f5722321..7271eddaf09 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -499,7 +499,7 @@ class TransactionCoordinatorTest {
.andReturn(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata)))
.anyTimes()
- val originalMetadata = new TransactionMetadata(pid, epoch, txnTimeoutMs, Ongoing, partitions, 0, 0)
+ val originalMetadata = new TransactionMetadata(pid, (epoch + 1).toShort, txnTimeoutMs, Ongoing, partitions, 0, 0)
EasyMock.expect(transactionManager.appendTransactionToLog(
EasyMock.eq(transactionalId),
EasyMock.eq(coordinatorEpoch),
@@ -521,11 +521,11 @@ class TransactionCoordinatorTest {
@Test
def shouldRemoveTransactionsForPartitionOnEmigration(): Unit = {
- EasyMock.expect(transactionManager.removeTransactionsForTxnTopicPartition(0))
+ EasyMock.expect(transactionManager.removeTransactionsForTxnTopicPartition(0, coordinatorEpoch))
EasyMock.expect(transactionMarkerChannelManager.removeMarkersForTxnTopicPartition(0))
EasyMock.replay(transactionManager, transactionMarkerChannelManager)
- coordinator.handleTxnEmigration(0)
+ coordinator.handleTxnEmigration(0, coordinatorEpoch)
EasyMock.verify(transactionManager, transactionMarkerChannelManager)
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 0d3263ae7a2..fb443adc423 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -176,7 +176,7 @@ class TransactionStateManagerTest {
assertTrue(transactionManager.isCoordinatorFor(txnId1))
assertTrue(transactionManager.isCoordinatorFor(txnId2))
- transactionManager.removeTransactionsForTxnTopicPartition(partitionId)
+ transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch)
// let the time advance to trigger the background thread removing
scheduler.tick()