KAFKA-5231: Bump up producer epoch when sending abort txn markers on InitPid

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Jason Gustafson, Jun Rao

Closes #3066 from guozhangwang/K5231-bump-up-epoch-when-abort-txn
This commit is contained in:
Guozhang Wang 2017-05-17 18:05:12 -07:00
parent b3a33ce4b8
commit c64cfd2e2b
13 changed files with 159 additions and 76 deletions

View File

@ -111,7 +111,7 @@
<module name="ClassFanOutComplexity">
<!-- default is 20 -->
<property name="max" value="35"/>
<property name="max" value="40"/>
</module>
<module name="CyclomaticComplexity">
<!-- default is 10-->

View File

@ -40,12 +40,14 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@ -440,9 +442,11 @@ public class Sender implements Runnable {
* Handle a produce response
*/
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
int correlationId = response.requestHeader().correlationId();
RequestHeader requestHeader = response.requestHeader();
int correlationId = requestHeader.correlationId();
if (response.wasDisconnected()) {
log.trace("Cancelled request {} due to node {} being disconnected", response, response.destination());
ApiKeys api = ApiKeys.forId(requestHeader.apiKey());
log.trace("Cancelled {} request {} with correlation id {} due to node {} being disconnected", api, requestHeader, correlationId, response.destination());
for (ProducerBatch batch : batches.values())
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now);
} else if (response.versionMismatch() != null) {

View File

@ -45,6 +45,7 @@ private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata,
throw new IllegalStateException(s"Delayed write txn marker operation for metadata $txnMetadata has timed out, this should never happen.")
}
// TODO: if we will always return NONE upon completion, we can remove the error code in the param
override def onComplete(): Unit = completionCallback(Errors.NONE)
}

View File

@ -197,7 +197,7 @@ class TransactionCoordinator(brokerId: Int,
case Ongoing =>
// indicate to abort the current ongoing txn first
Right(coordinatorEpoch, txnMetadata.prepareNoTransit())
Right(coordinatorEpoch, txnMetadata.prepareFenceProducerEpoch())
}
}
}
@ -266,8 +266,8 @@ class TransactionCoordinator(brokerId: Int,
txnManager.loadTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch, txnMarkerChannelManager.addTxnMarkersToSend)
}
def handleTxnEmigration(txnTopicPartitionId: Int) {
txnManager.removeTransactionsForTxnTopicPartition(txnTopicPartitionId)
def handleTxnEmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int) {
txnManager.removeTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch)
txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId)
}
@ -382,8 +382,8 @@ class TransactionCoordinator(brokerId: Int,
throw new IllegalStateException("Cannot find the metadata in coordinator's cache while it is still the leader of the txn topic partition")
} else {
// this transactional id no longer exists, maybe the corresponding partition has already been migrated out.
info(s"Updating $transactionalId's transaction state to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed after the transaction message " +
s"has been appended to the log. The partition ${partitionFor(transactionalId)} may have migrated as the metadata is no longer in the cache")
info(s"Updating $transactionalId's transaction state (txn topic partition ${partitionFor(transactionalId)}) to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId " +
s"failed after the transaction message has been appended to the log since the corresponding metadata does not exist in the cache anymore")
Left(Errors.NOT_COORDINATOR)
}

View File

@ -230,8 +230,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
case None =>
// this transactional id no longer exists, maybe the corresponding partition has already been migrated out.
// we will stop appending the completed log entry to transaction topic as the new leader should be doing it.
info(s"Updating $transactionalId's transaction state to $txnMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed after the transaction message " +
s"has been appended to the log. The partition ${txnStateManager.partitionFor(transactionalId)} may have migrated as the metadata is no longer in the cache")
info(s"Updating $transactionalId's transaction state (txn topic partition ${txnStateManager.partitionFor(transactionalId)}) to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId " +
s"failed after the transaction message has been appended to the log since the corresponding metadata does not exist in the cache anymore")
}
case other =>
@ -295,6 +295,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
txnMarkerPurgatory.cancelForKey(transactionalId)
}
// FIXME: Currently, operations registered under partition in txnMarkerPurgatory
// are only cleaned during coordinator immigration, which happens rarely. This means potential memory leak
def completeSendMarkersForTxnId(transactionalId: String): Unit = {
txnMarkerPurgatory.checkAndComplete(transactionalId)
}

View File

@ -17,11 +17,10 @@
package kafka.coordinator.transaction
import kafka.server.DelayedOperationPurgatory
import kafka.utils.Logging
import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.WriteTxnMarkersResponse
import scala.collection.mutable
@ -32,18 +31,44 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
txnMarkerChannelManager: TransactionMarkerChannelManager,
txnIdAndMarkerEntries: java.util.List[TxnIdAndMarkerEntry]) extends RequestCompletionHandler with Logging {
override def onComplete(response: ClientResponse): Unit = {
val correlationId = response.requestHeader.correlationId
val requestHeader = response.requestHeader
val correlationId = requestHeader.correlationId
if (response.wasDisconnected) {
trace(s"Cancelled request $response due to node ${response.destination} being disconnected")
// re-enqueue the markers
val api = ApiKeys.forId(requestHeader.apiKey)
val correlation = requestHeader.correlationId
trace(s"Cancelled $api request $requestHeader with correlation id $correlation due to node ${response.destination} being disconnected")
for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries) {
val transactionalId = txnIdAndMarker.txnId
val txnMarker = txnIdAndMarker.txnMarkerEntry
txnMarkerChannelManager.addTxnMarkersToBrokerQueue(txnIdAndMarker.txnId,
txnMarker.producerId(),
txnMarker.producerEpoch(),
txnMarker.transactionResult(),
txnMarker.coordinatorEpoch(),
txnMarker.partitions().toSet)
txnStateManager.getTransactionState(transactionalId) match {
case None =>
info(s"Transaction metadata for $transactionalId does not exist in the cache" +
s"any more; cancel sending transaction markers $txnMarker to the brokers")
txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
case Some(epochAndMetadata) =>
if (epochAndMetadata.coordinatorEpoch != txnMarker.coordinatorEpoch) {
// coordinator epoch has changed, just cancel it from the purgatory
info(s"Transaction coordinator epoch for $transactionalId has changed from ${txnMarker.coordinatorEpoch} to " +
s"${epochAndMetadata.coordinatorEpoch}; cancel sending transaction markers $txnMarker to the brokers")
txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
} else {
// re-enqueue the markers
trace(s"Re-enqueuing ${txnMarker.transactionResult} transaction markers for transactional id $transactionalId " +
s"under coordinator epoch ${txnMarker.coordinatorEpoch}")
txnMarkerChannelManager.addTxnMarkersToBrokerQueue(transactionalId,
txnMarker.producerId,
txnMarker.producerEpoch,
txnMarker.transactionResult,
txnMarker.coordinatorEpoch,
txnMarker.partitions.toSet)
}
}
}
} else {
trace(s"Received response $response from node ${response.destination} with correlation id $correlationId")
@ -60,10 +85,9 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
txnStateManager.getTransactionState(transactionalId) match {
case None =>
info(s"Transaction topic partition for $transactionalId may likely has emigrated, as the corresponding metadata do not exist in the cache" +
info(s"Transaction metadata for $transactionalId does not exist in the cache" +
s"any more; cancel sending transaction markers $txnMarker to the brokers")
// txn topic partition has likely emigrated, just cancel it from the purgatory
txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
case Some(epochAndMetadata) =>
@ -121,13 +145,16 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
if (!abortSending) {
if (retryPartitions.nonEmpty) {
trace(s"Re-enqueuing ${txnMarker.transactionResult} transaction markers for transactional id $transactionalId " +
s"under coordinator epoch ${txnMarker.coordinatorEpoch}")
// re-enqueue with possible new leaders of the partitions
txnMarkerChannelManager.addTxnMarkersToBrokerQueue(
transactionalId,
txnMarker.producerId(),
txnMarker.producerEpoch(),
txnMarker.producerId,
txnMarker.producerEpoch,
txnMarker.transactionResult,
txnMarker.coordinatorEpoch(),
txnMarker.coordinatorEpoch,
retryPartitions.toSet)
} else {
txnMarkerChannelManager.completeSendMarkersForTxnId(transactionalId)

View File

@ -135,16 +135,26 @@ private[transaction] class TransactionMetadata(val producerId: Long,
}
def removePartition(topicPartition: TopicPartition): Unit = {
if (pendingState.isDefined && (state != PrepareCommit && state != PrepareAbort))
throw new IllegalStateException(s"Transation metadata's current state is $state, and its pending state is $state " +
if (state != PrepareCommit && state != PrepareAbort)
throw new IllegalStateException(s"Transaction metadata's current state is $state, and its pending state is $pendingState " +
s"while trying to remove partitions whose txn marker has been sent, this is not expected")
topicPartitions -= topicPartition
}
def prepareNoTransit(): TransactionMetadataTransition =
// do not call transitTo as it will set the pending state
// this is visible for test only
def prepareNoTransit(): TransactionMetadataTransition = {
// do not call transitTo as it will set the pending state, a follow-up call to abort the transaction will set its pending state
TransactionMetadataTransition(producerId, producerEpoch, txnTimeoutMs, state, topicPartitions.toSet, txnStartTimestamp, txnLastUpdateTimestamp)
}
def prepareFenceProducerEpoch(): TransactionMetadataTransition = {
// bump up the epoch to let the txn markers be able to override the current producer epoch
producerEpoch = (producerEpoch + 1).toShort
// do not call transitTo as it will set the pending state, a follow-up call to abort the transaction will set its pending state
TransactionMetadataTransition(producerId, producerEpoch, txnTimeoutMs, state, topicPartitions.toSet, txnStartTimestamp, txnLastUpdateTimestamp)
}
def prepareIncrementProducerEpoch(newTxnTimeoutMs: Int,
updateTimestamp: Long): TransactionMetadataTransition = {

View File

@ -70,11 +70,16 @@ class TransactionStateManager(brokerId: Int,
/** shutting down flag */
private val shuttingDown = new AtomicBoolean(false)
// TODO: we need to extend this lock as a read-write lock and reading access to it needs to be covered
// by the read lock
/** lock protecting access to loading and owned partition sets */
private val stateLock = new ReentrantLock()
/** partitions of transaction topic that are being loaded, partition lock should be called BEFORE accessing this set */
private val loadingPartitions: mutable.Set[Int] = mutable.Set()
/** partitions of transaction topic that are being loaded, state lock should be called BEFORE accessing this set */
private val loadingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
/** partitions of transaction topic that are being removed, state lock should be called BEFORE accessing this set */
private val leavingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
/** transaction metadata cache indexed by assigned transaction topic partition ids */
private val transactionMetadataCache: mutable.Map[Int, TxnMetadataCacheEntry] = mutable.Map()
@ -113,6 +118,16 @@ class TransactionStateManager(brokerId: Int,
def getTransactionState(transactionalId: String): Option[CoordinatorEpochAndTxnMetadata] = {
val partitionId = partitionFor(transactionalId)
// we only need to check leaving partition set but not loading partition set since there are three possible cases:
// 1) it is not in the loading partitions set, hence safe to return NONE
// 2) it is in the loading partitions with a smaller epoch, hence safe to return NONE
// 3) it is in the loading partition with a larger epoch, return NONE is also fine as it
// indicates the metadata is not exist at least for now.
//
// 4) it is NOT possible to be in the loading partition with the same epoch
if (leavingPartitions.exists(_.txnPartitionId == partitionId))
return None
transactionMetadataCache.get(partitionId).flatMap { cacheEntry =>
cacheEntry.metadataPerTransactionalId.get(transactionalId) match {
case null => None
@ -174,7 +189,15 @@ class TransactionStateManager(brokerId: Int,
def isCoordinatorLoadingInProgress(transactionalId: String): Boolean = inLock(stateLock) {
val partitionId = partitionFor(transactionalId)
loadingPartitions.contains(partitionId)
// we only need to check loading partition set but not leaving partition set since there are three possible cases:
// 1) it is not in the leaving partitions set, hence safe to return true
// 2) it is in the leaving partitions with a smaller epoch than the latest loading epoch, hence safe to return NONE
// 3) it is in the leaving partition with a larger epoch, return true is also OK since the client will then retry
// later be notified that this coordinator is no longer be the transaction coordinator for him
//
// 4) it is NOT possible to be in the leaving partition with the same epoch
loadingPartitions.exists(_.txnPartitionId == partitionId)
}
/**
@ -203,8 +226,9 @@ class TransactionStateManager(brokerId: Int,
try {
while (currOffset < logEndOffset
&& loadingPartitions.contains(topicPartition.partition())
&& !shuttingDown.get()) {
&& !shuttingDown.get()
&& inLock(stateLock) {loadingPartitions.exists { idAndEpoch: TransactionPartitionAndLeaderEpoch =>
idAndEpoch.txnPartitionId == topicPartition.partition && idAndEpoch.coordinatorEpoch == coordinatorEpoch}}) {
val fetchDataInfo = log.read(currOffset, config.transactionLogLoadBufferSize, maxOffset = None,
minOneMessage = true, isolationLevel = IsolationLevel.READ_UNCOMMITTED)
val memRecords = fetchDataInfo.records match {
@ -275,38 +299,43 @@ class TransactionStateManager(brokerId: Int,
validateTransactionTopicPartitionCountIsStable()
val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
inLock(stateLock) {
loadingPartitions.add(partitionId)
leavingPartitions.remove(partitionAndLeaderEpoch)
loadingPartitions.add(partitionAndLeaderEpoch)
}
def loadTransactions() {
info(s"Loading transaction metadata from $topicPartition")
val loadedTransactions = loadTransactionMetadata(topicPartition, coordinatorEpoch)
loadedTransactions.foreach {
case (transactionalId, txnMetadata) =>
val result = txnMetadata synchronized {
// if state is PrepareCommit or PrepareAbort we need to complete the transaction
txnMetadata.state match {
case PrepareAbort =>
Some(TransactionResult.ABORT, txnMetadata.prepareComplete(time.milliseconds()))
case PrepareCommit =>
Some(TransactionResult.COMMIT, txnMetadata.prepareComplete(time.milliseconds()))
case _ =>
// nothing need to be done
None
}
}
result.foreach { case (command, newMetadata) =>
sendTxnMarkers(transactionalId, coordinatorEpoch, command, txnMetadata, newMetadata)
}
}
inLock(stateLock) {
addLoadedTransactionsToCache(topicPartition.partition, coordinatorEpoch, loadedTransactions)
loadingPartitions.remove(partitionId)
if (loadingPartitions.contains(partitionAndLeaderEpoch)) {
addLoadedTransactionsToCache(topicPartition.partition, coordinatorEpoch, loadedTransactions)
loadedTransactions.foreach {
case (transactionalId, txnMetadata) =>
val result = txnMetadata synchronized {
// if state is PrepareCommit or PrepareAbort we need to complete the transaction
txnMetadata.state match {
case PrepareAbort =>
Some(TransactionResult.ABORT, txnMetadata.prepareComplete(time.milliseconds()))
case PrepareCommit =>
Some(TransactionResult.COMMIT, txnMetadata.prepareComplete(time.milliseconds()))
case _ =>
// nothing need to be done
None
}
}
result.foreach { case (command, newMetadata) =>
sendTxnMarkers(transactionalId, coordinatorEpoch, command, txnMetadata, newMetadata)
}
}
loadingPartitions.remove(partitionAndLeaderEpoch)
}
}
}
@ -317,23 +346,31 @@ class TransactionStateManager(brokerId: Int,
* When this broker becomes a follower for a transaction log partition, clear out the cache for corresponding transactional ids
* that belong to that partition.
*/
def removeTransactionsForTxnTopicPartition(partitionId: Int) {
def removeTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int) {
validateTransactionTopicPartitionCountIsStable()
val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
inLock(stateLock) {
loadingPartitions.remove(partitionAndLeaderEpoch)
leavingPartitions.add(partitionAndLeaderEpoch)
}
def removeTransactions() {
inLock(stateLock) {
transactionMetadataCache.remove(partitionId) match {
case Some(txnMetadataCacheEntry) =>
info(s"Removed ${txnMetadataCacheEntry.metadataPerTransactionalId.size} cached transaction metadata for $topicPartition on follower transition")
if (leavingPartitions.contains(partitionAndLeaderEpoch)) {
transactionMetadataCache.remove(partitionId) match {
case Some(txnMetadataCacheEntry) =>
info(s"Removed ${txnMetadataCacheEntry.metadataPerTransactionalId.size} cached transaction metadata for $topicPartition on follower transition")
case None =>
info(s"Trying to remove cached transaction metadata for $topicPartition on follower transition but there is no entries remaining; " +
s"it is likely that another process for removing the cached entries has just executed earlier before")
case None =>
info(s"Trying to remove cached transaction metadata for $topicPartition on follower transition but there is no entries remaining; " +
s"it is likely that another process for removing the cached entries has just executed earlier before")
}
leavingPartitions.remove(partitionAndLeaderEpoch)
}
loadingPartitions.remove(partitionId)
}
}
@ -437,8 +474,8 @@ class TransactionStateManager(brokerId: Int,
case None =>
// this transactional id no longer exists, maybe the corresponding partition has already been migrated out.
// return NOT_COORDINATOR to let the client re-discover the transaction coordinator
info(s"Updating $transactionalId's transaction state to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed after the transaction message " +
s"has been appended to the log. The partition ${partitionFor(transactionalId)} may have migrated as the metadata is no longer in the cache")
info(s"Updating $transactionalId's transaction state (txn topic partition ${partitionFor(transactionalId)}) to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId " +
s"failed after the transaction message has been appended to the log since the corresponding metadata does not exist in the cache anymore")
responseError = Errors.NOT_COORDINATOR
}
@ -480,3 +517,5 @@ private[transaction] case class TransactionConfig(transactionalIdExpirationMs: I
removeExpiredTransactionsIntervalMs: Int = TransactionStateManager.DefaultRemoveExpiredTransactionsIntervalMs)
case class TransactionalIdAndProducerIdEpoch(transactionalId: String, producerId: Long, producerEpoch: Short)
case class TransactionPartitionAndLeaderEpoch(txnPartitionId: Int, coordinatorEpoch: Int)

View File

@ -121,7 +121,7 @@ object DelayedOperationPurgatory {
purgeInterval: Int = 1000,
reaperEnabled: Boolean = true): DelayedOperationPurgatory[T] = {
val timer = new SystemTimer(purgatoryName)
new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval)
new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval, reaperEnabled)
}
}

View File

@ -150,7 +150,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (partition.topic == GROUP_METADATA_TOPIC_NAME)
groupCoordinator.handleGroupEmigration(partition.partitionId)
else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME)
txnCoordinator.handleTxnEmigration(partition.partitionId)
txnCoordinator.handleTxnEmigration(partition.partitionId, partition.getLeaderEpoch)
}
}

View File

@ -254,7 +254,7 @@ class TransactionsTest extends KafkaServerTestHarness {
}
}
@Ignore @Test
@Test
def testFencingOnSend() {
val transactionalId = "my-t.id"
val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)

View File

@ -499,7 +499,7 @@ class TransactionCoordinatorTest {
.andReturn(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata)))
.anyTimes()
val originalMetadata = new TransactionMetadata(pid, epoch, txnTimeoutMs, Ongoing, partitions, 0, 0)
val originalMetadata = new TransactionMetadata(pid, (epoch + 1).toShort, txnTimeoutMs, Ongoing, partitions, 0, 0)
EasyMock.expect(transactionManager.appendTransactionToLog(
EasyMock.eq(transactionalId),
EasyMock.eq(coordinatorEpoch),
@ -521,11 +521,11 @@ class TransactionCoordinatorTest {
@Test
def shouldRemoveTransactionsForPartitionOnEmigration(): Unit = {
EasyMock.expect(transactionManager.removeTransactionsForTxnTopicPartition(0))
EasyMock.expect(transactionManager.removeTransactionsForTxnTopicPartition(0, coordinatorEpoch))
EasyMock.expect(transactionMarkerChannelManager.removeMarkersForTxnTopicPartition(0))
EasyMock.replay(transactionManager, transactionMarkerChannelManager)
coordinator.handleTxnEmigration(0)
coordinator.handleTxnEmigration(0, coordinatorEpoch)
EasyMock.verify(transactionManager, transactionMarkerChannelManager)
}

View File

@ -176,7 +176,7 @@ class TransactionStateManagerTest {
assertTrue(transactionManager.isCoordinatorFor(txnId1))
assertTrue(transactionManager.isCoordinatorFor(txnId2))
transactionManager.removeTransactionsForTxnTopicPartition(partitionId)
transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch)
// let the time advance to trigger the background thread removing
scheduler.tick()