KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets (#14774)

Rewrote the verification flow to pass a callback to execute after verification completes.
For the TxnOffsetCommit, we will call doTxnCommitOffsets. This allows us to do offset validations post verification.

I've reorganized the verification code and group coordinator code to make these code paths clearer. The followup refactor (https://issues.apache.org/jira/browse/KAFKA-15987) will further clean up the produce verification code.

Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Justine Olshan 2023-12-13 17:45:09 -08:00 committed by GitHub
parent a87e86e015
commit e4249b69bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 763 additions and 372 deletions

View File

@ -29,9 +29,11 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.stats.Meter
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.internals.log.VerificationGuard
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.math.max
@ -909,8 +911,32 @@ private[group] class GroupCoordinator(
val group = groupManager.getGroup(groupId).getOrElse {
groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
}
doTxnCommitOffsets(group, transactionalId, memberId, groupInstanceId, generationId, producerId, producerEpoch,
offsetMetadata, requestLocal, responseCallback)
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
def postVerificationCallback(
error: Errors,
newRequestLocal: RequestLocal,
verificationGuard: VerificationGuard
): Unit = {
if (error != Errors.NONE) {
val finalError = GroupMetadataManager.maybeConvertOffsetCommitError(error)
responseCallback(offsetMetadata.map { case (k, _) => k -> finalError })
} else {
doTxnCommitOffsets(group, memberId, groupInstanceId, generationId, producerId, producerEpoch,
offsetTopicPartition, offsetMetadata, newRequestLocal, responseCallback, Some(verificationGuard))
}
}
groupManager.replicaManager.maybeStartTransactionVerificationForPartition(
topicPartition = offsetTopicPartition,
transactionalId,
producerId,
producerEpoch,
RecordBatch.NO_SEQUENCE,
requestLocal,
postVerificationCallback
)
}
}
@ -952,15 +978,16 @@ private[group] class GroupCoordinator(
}
private def doTxnCommitOffsets(group: GroupMetadata,
transactionalId: String,
memberId: String,
groupInstanceId: Option[String],
generationId: Int,
producerId: Long,
producerEpoch: Short,
offsetTopicPartition: TopicPartition,
offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata],
requestLocal: RequestLocal,
responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit): Unit = {
responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit,
verificationGuard: Option[VerificationGuard]): Unit = {
group.inLock {
val validationErrorOpt = validateOffsetCommit(
group,
@ -973,8 +1000,8 @@ private[group] class GroupCoordinator(
if (validationErrorOpt.isDefined) {
responseCallback(offsetMetadata.map { case (k, _) => k -> validationErrorOpt.get })
} else {
groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, transactionalId, producerId,
producerEpoch, requestLocal)
groupManager.storeOffsets(group, memberId, offsetTopicPartition, offsetMetadata, responseCallback, producerId,
producerEpoch, requestLocal, verificationGuard)
}
}
}
@ -1033,19 +1060,21 @@ private[group] class GroupCoordinator(
isTransactional = false
)
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
if (validationErrorOpt.isDefined) {
responseCallback(offsetMetadata.map { case (k, _) => k -> validationErrorOpt.get })
} else {
group.currentState match {
case Empty =>
groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
groupManager.storeOffsets(group, memberId, offsetTopicPartition, offsetMetadata, responseCallback, verificationGuard = None)
case Stable | PreparingRebalance =>
// During PreparingRebalance phase, we still allow a commit request since we rely
// on heartbeat response to eventually notify the rebalance in progress signal to the consumer
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, requestLocal = requestLocal)
groupManager.storeOffsets(group, memberId, offsetTopicPartition, offsetMetadata, responseCallback, requestLocal = requestLocal, verificationGuard = None)
case CompletingRebalance =>
// We should not receive a commit request if the group has not completed rebalance;

View File

@ -26,7 +26,8 @@ import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.ConcurrentHashMap
import com.yammer.metrics.core.Gauge
import kafka.common.OffsetAndMetadata
import kafka.server.{ReplicaManager, RequestLocal}
import kafka.coordinator.group.GroupMetadataManager.maybeConvertOffsetCommitError
import kafka.server.{LogAppendResult, ReplicaManager, RequestLocal}
import kafka.utils.CoreUtils.inLock
import kafka.utils.Implicits._
import kafka.utils._
@ -47,7 +48,7 @@ import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0, IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, VerificationGuard}
import scala.collection._
import scala.collection.mutable.ArrayBuffer
@ -324,171 +325,178 @@ class GroupMetadataManager(brokerId: Int,
}
}
private def appendForGroup(group: GroupMetadata,
records: Map[TopicPartition, MemoryRecords],
requestLocal: RequestLocal,
callback: Map[TopicPartition, PartitionResponse] => Unit,
transactionalId: String = null): Unit = {
// This method should be called under the group lock to ensure atomicity of the update to the the in-memory and persisted state.
private def appendForGroup(
group: GroupMetadata,
records: Map[TopicPartition, MemoryRecords],
requestLocal: RequestLocal,
callback: Map[TopicPartition, PartitionResponse] => Unit,
verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty
): Unit = {
// call replica manager to append the group message
replicaManager.appendRecords(
replicaManager.appendForGroup(
timeout = config.offsetCommitTimeoutMs.toLong,
requiredAcks = config.offsetCommitRequiredAcks,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = records,
delayedProduceLock = Some(group.lock),
responseCallback = callback,
requestLocal = requestLocal,
transactionalId = transactionalId)
verificationGuards = verificationGuards
)
}
private def generateOffsetRecords(magicValue: Byte,
isTxnOffsetCommit: Boolean,
groupId: String,
offsetTopicPartition: TopicPartition,
filteredOffsetMetadata: Map[TopicIdPartition, OffsetAndMetadata],
producerId: Long,
producerEpoch: Short): Map[TopicPartition, MemoryRecords] = {
// We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
val timestampType = TimestampType.CREATE_TIME
val timestamp = time.milliseconds()
val records = filteredOffsetMetadata.map { case (topicIdPartition, offsetAndMetadata) =>
val key = GroupMetadataManager.offsetCommitKey(groupId, topicIdPartition.topicPartition)
val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, interBrokerProtocolVersion)
new SimpleRecord(timestamp, key, value)
}
val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, records.asJava))
if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting to make a transaction offset commit with an invalid magic: " + magicValue)
val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L, time.milliseconds(),
producerId, producerEpoch, 0, isTxnOffsetCommit, RecordBatch.NO_PARTITION_LEADER_EPOCH)
records.foreach(builder.append)
Map(offsetTopicPartition -> builder.build())
}
private def createPutCacheCallback(isTxnOffsetCommit: Boolean,
group: GroupMetadata,
consumerId: String,
offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata],
filteredOffsetMetadata: Map[TopicIdPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit,
producerId: Long,
records: Map[TopicPartition, MemoryRecords],
preAppendErrors: Map[TopicPartition, LogAppendResult] = Map.empty): Map[TopicPartition, PartitionResponse] => Unit = {
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
// set the callback function to insert offsets into cache after log append completed
def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
// the append response should only contain the topics partition
if (responseStatus.size != 1 || !responseStatus.contains(offsetTopicPartition))
throw new IllegalStateException("Append status %s should only have one partition %s"
.format(responseStatus, offsetTopicPartition))
// construct the commit response status and insert
// the offset and metadata to cache if the append status has no error
val status = responseStatus(offsetTopicPartition)
val responseError = group.inLock {
if (status.error == Errors.NONE) {
if (!group.is(Dead)) {
filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) =>
if (isTxnOffsetCommit)
group.onTxnOffsetCommitAppend(producerId, topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
else
group.onOffsetCommitAppend(topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
}
}
// Record the number of offsets committed to the log
offsetCommitsSensor.record(records.size)
Errors.NONE
} else {
if (!group.is(Dead)) {
if (!group.hasPendingOffsetCommitsFromProducer(producerId))
removeProducerGroup(producerId, group.groupId)
filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) =>
if (isTxnOffsetCommit)
group.failPendingTxnOffsetCommit(producerId, topicIdPartition)
else
group.failPendingOffsetWrite(topicIdPartition, offsetAndMetadata)
}
}
debug(s"Offset commit $filteredOffsetMetadata from group ${group.groupId}, consumer $consumerId " +
s"with generation ${group.generationId} failed when appending to log due to ${status.error.exceptionName}")
maybeConvertOffsetCommitError(status.error)
}
}
// compute the final error codes for the commit response
val commitStatus = offsetMetadata.map { case (topicIdPartition, offsetAndMetadata) =>
if (!validateOffsetMetadataLength(offsetAndMetadata.metadata))
(topicIdPartition, Errors.OFFSET_METADATA_TOO_LARGE)
else if (preAppendErrors.contains(topicIdPartition.topicPartition))
(topicIdPartition, preAppendErrors(topicIdPartition.topicPartition).error)
else
(topicIdPartition, responseError)
}
// finally trigger the callback logic passed from the API layer
responseCallback(commitStatus)
}
putCacheCallback
}
/**
* Store offsets by appending it to the replicated log and then inserting to cache
*
* This method should be called under the group lock to ensure validations and updates are all performed
* atomically.
*/
def storeOffsets(group: GroupMetadata,
consumerId: String,
offsetTopicPartition: TopicPartition,
offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit,
transactionalId: String = null,
producerId: Long = RecordBatch.NO_PRODUCER_ID,
producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
// first filter out partitions with offset metadata size exceeding limit
requestLocal: RequestLocal = RequestLocal.NoCaching,
verificationGuard: Option[VerificationGuard]): Unit = {
if (!group.hasReceivedConsistentOffsetCommits)
warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has received offset commits from consumers as well " +
s"as transactional producers. Mixing both types of offset commits will generally result in surprises and " +
s"should be avoided.")
val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
validateOffsetMetadataLength(offsetAndMetadata.metadata)
}
group.inLock {
if (!group.hasReceivedConsistentOffsetCommits)
warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has received offset commits from consumers as well " +
s"as transactional producers. Mixing both types of offset commits will generally result in surprises and " +
s"should be avoided.")
}
val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
// construct the message set to append
if (filteredOffsetMetadata.isEmpty) {
// compute the final error codes for the commit response
val commitStatus = offsetMetadata.map { case (k, _) => k -> Errors.OFFSET_METADATA_TOO_LARGE }
responseCallback(commitStatus)
} else {
getMagic(partitionFor(group.groupId)) match {
case Some(magicValue) =>
// We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
val timestampType = TimestampType.CREATE_TIME
val timestamp = time.milliseconds()
val records = filteredOffsetMetadata.map { case (topicIdPartition, offsetAndMetadata) =>
val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicIdPartition.topicPartition)
val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, interBrokerProtocolVersion)
new SimpleRecord(timestamp, key, value)
}
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, records.asJava))
if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting to make a transaction offset commit with an invalid magic: " + magicValue)
val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L, time.milliseconds(),
producerId, producerEpoch, 0, isTxnOffsetCommit, RecordBatch.NO_PARTITION_LEADER_EPOCH)
records.foreach(builder.append)
val entries = Map(offsetTopicPartition -> builder.build())
// set the callback function to insert offsets into cache after log append completed
def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
// the append response should only contain the topics partition
if (responseStatus.size != 1 || !responseStatus.contains(offsetTopicPartition))
throw new IllegalStateException("Append status %s should only have one partition %s"
.format(responseStatus, offsetTopicPartition))
// construct the commit response status and insert
// the offset and metadata to cache if the append status has no error
val status = responseStatus(offsetTopicPartition)
val responseError = group.inLock {
if (status.error == Errors.NONE) {
if (!group.is(Dead)) {
filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) =>
if (isTxnOffsetCommit)
group.onTxnOffsetCommitAppend(producerId, topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
else
group.onOffsetCommitAppend(topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
}
}
// Record the number of offsets committed to the log
offsetCommitsSensor.record(records.size)
Errors.NONE
} else {
if (!group.is(Dead)) {
if (!group.hasPendingOffsetCommitsFromProducer(producerId))
removeProducerGroup(producerId, group.groupId)
filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) =>
if (isTxnOffsetCommit)
group.failPendingTxnOffsetCommit(producerId, topicIdPartition)
else
group.failPendingOffsetWrite(topicIdPartition, offsetAndMetadata)
}
}
debug(s"Offset commit $filteredOffsetMetadata from group ${group.groupId}, consumer $consumerId " +
s"with generation ${group.generationId} failed when appending to log due to ${status.error.exceptionName}")
// transform the log append error code to the corresponding the commit status error code
status.error match {
case Errors.UNKNOWN_TOPIC_OR_PARTITION
| Errors.NOT_ENOUGH_REPLICAS
| Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
Errors.COORDINATOR_NOT_AVAILABLE
case Errors.NOT_LEADER_OR_FOLLOWER
| Errors.KAFKA_STORAGE_ERROR =>
Errors.NOT_COORDINATOR
case Errors.MESSAGE_TOO_LARGE
| Errors.RECORD_LIST_TOO_LARGE
| Errors.INVALID_FETCH_SIZE =>
Errors.INVALID_COMMIT_OFFSET_SIZE
case other => other
}
}
}
// compute the final error codes for the commit response
val commitStatus = offsetMetadata.map { case (topicIdPartition, offsetAndMetadata) =>
if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
(topicIdPartition, responseError)
else
(topicIdPartition, Errors.OFFSET_METADATA_TOO_LARGE)
}
// finally trigger the callback logic passed from the API layer
responseCallback(commitStatus)
}
if (isTxnOffsetCommit) {
group.inLock {
addProducerGroup(producerId, group.groupId)
group.prepareTxnOffsetCommit(producerId, offsetMetadata)
}
} else {
group.inLock {
group.prepareOffsetCommit(offsetMetadata)
}
}
appendForGroup(group, entries, requestLocal, putCacheCallback, transactionalId)
case None =>
val commitStatus = offsetMetadata.map { case (topicIdPartition, _) =>
(topicIdPartition, Errors.NOT_COORDINATOR)
}
responseCallback(commitStatus)
}
return
}
val magicOpt = getMagic(partitionFor(group.groupId))
if (magicOpt.isEmpty) {
val commitStatus = offsetMetadata.map { case (topicIdPartition, _) =>
(topicIdPartition, Errors.NOT_COORDINATOR)
}
responseCallback(commitStatus)
return
}
val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
val records = generateOffsetRecords(magicOpt.get, isTxnOffsetCommit, group.groupId, offsetTopicPartition, filteredOffsetMetadata, producerId, producerEpoch)
val putCacheCallback = createPutCacheCallback(isTxnOffsetCommit, group, consumerId, offsetMetadata, filteredOffsetMetadata, responseCallback, producerId, records)
val verificationGuards = verificationGuard.map(guard => offsetTopicPartition -> guard).toMap
if (isTxnOffsetCommit) {
addProducerGroup(producerId, group.groupId)
group.prepareTxnOffsetCommit(producerId, offsetMetadata)
} else {
group.prepareOffsetCommit(offsetMetadata)
}
appendForGroup(group, records, requestLocal, putCacheCallback, verificationGuards)
}
/**
@ -1352,6 +1360,28 @@ object GroupMetadataManager {
"%X".format(BigInt(1, bytes))
}
def maybeConvertOffsetCommitError(error: Errors) : Errors = {
error match {
case Errors.UNKNOWN_TOPIC_OR_PARTITION
| Errors.NOT_ENOUGH_REPLICAS
| Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
Errors.COORDINATOR_NOT_AVAILABLE
case Errors.NOT_LEADER_OR_FOLLOWER
| Errors.KAFKA_STORAGE_ERROR =>
Errors.NOT_COORDINATOR
case Errors.MESSAGE_TOO_LARGE
| Errors.RECORD_LIST_TOO_LARGE
| Errors.INVALID_FETCH_SIZE =>
Errors.INVALID_COMMIT_OFFSET_SIZE
// We may see INVALID_TXN_STATE or INVALID_PID_MAPPING here due to transaction verification.
// They can be returned without mapping to a new error.
case other => other
}
}
}
case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {

View File

@ -767,15 +767,15 @@ class TransactionStateManager(brokerId: Int,
}
if (append) {
replicaManager.appendRecords(
newMetadata.txnTimeoutMs.toLong,
TransactionLog.EnforcedRequiredAcks,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
recordsPerPartition,
updateCacheCallback,
requestLocal = requestLocal)
timeout = newMetadata.txnTimeoutMs.toLong,
requiredAcks = TransactionLog.EnforcedRequiredAcks,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = recordsPerPartition,
responseCallback = updateCacheCallback,
requestLocal = requestLocal)
trace(s"Appending new metadata $newMetadata for transaction id $transactionalId with coordinator epoch $coordinatorEpoch to the local transaction log")
trace(s"Appending new metadata $newMetadata for transaction id $transactionalId with coordinator epoch $coordinatorEpoch to the local transaction log")
}
}
}

View File

@ -24,8 +24,7 @@ import kafka.log.remote.RemoteLogManager
import kafka.log.{LogManager, UnifiedLog}
import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName}
import kafka.server.ReplicaManager.createLogReadResult
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult}
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
import kafka.server.metadata.ZkMetadataCache
import kafka.utils.Implicits._
@ -887,65 +886,29 @@ class ReplicaManager(val config: KafkaConfig,
}
val allResults = localProduceResults ++ errorResults
val produceStatus = allResults.map { case (topicPartition, result) =>
topicPartition -> ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
new PartitionResponse(
result.error,
result.info.firstOffset,
result.info.lastOffset,
result.info.logAppendTime,
result.info.logStartOffset,
result.info.recordErrors,
result.errorMessage
)
) // response status
}
val produceStatus = buildProducePartitionStatus(allResults)
actionQueue.add {
() => allResults.foreach { case (topicPartition, result) =>
val requestKey = TopicPartitionOperationKey(topicPartition)
result.info.leaderHwChange match {
case LeaderHwChange.INCREASED =>
// some delayed operations may be unblocked after HW changed
delayedProducePurgatory.checkAndComplete(requestKey)
delayedFetchPurgatory.checkAndComplete(requestKey)
delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
case LeaderHwChange.SAME =>
// probably unblock some follower fetch requests since log end offset has been updated
delayedFetchPurgatory.checkAndComplete(requestKey)
case LeaderHwChange.NONE =>
// nothing
}
}
}
addCompletePurgatoryAction(actionQueue, allResults)
recordConversionStatsCallback(localProduceResults.map { case (k, v) =>
k -> v.info.recordValidationStats
})
recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordValidationStats })
if (delayedProduceRequestRequired(requiredAcks, allEntries, allResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = allEntries.keys.map(TopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed produce operation is being created, new
// requests may arrive and hence make this operation completable.
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else {
// we can respond immediately
val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
responseCallback(produceResponseStatus)
}
maybeAddDelayedProduce(
requiredAcks,
delayedProduceLock,
timeout,
allEntries,
allResults,
produceStatus,
responseCallback
)
}
private def partitionEntriesForVerification(verificationGuards: mutable.Map[TopicPartition, VerificationGuard],
entriesPerPartition: Map[TopicPartition, MemoryRecords],
verifiedEntries: mutable.Map[TopicPartition, MemoryRecords],
unverifiedEntries: mutable.Map[TopicPartition, MemoryRecords],
errorEntries: mutable.Map[TopicPartition, Errors]): Unit= {
errorEntries: mutable.Map[TopicPartition, Errors]): Unit = {
val transactionalProducerIds = mutable.HashSet[Long]()
entriesPerPartition.foreach { case (topicPartition, records) =>
try {
@ -976,6 +939,313 @@ class ReplicaManager(val config: KafkaConfig,
}
}
private def buildProducePartitionStatus(
results: Map[TopicPartition, LogAppendResult]
): Map[TopicPartition, ProducePartitionStatus] = {
results.map { case (topicPartition, result) =>
topicPartition -> ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
new PartitionResponse(
result.error,
result.info.firstOffset,
result.info.lastOffset,
result.info.logAppendTime,
result.info.logStartOffset,
result.info.recordErrors,
result.errorMessage
)
)
}
}
private def addCompletePurgatoryAction(
actionQueue: ActionQueue,
appendResults: Map[TopicPartition, LogAppendResult]
): Unit = {
actionQueue.add {
() => appendResults.foreach { case (topicPartition, result) =>
val requestKey = TopicPartitionOperationKey(topicPartition)
result.info.leaderHwChange match {
case LeaderHwChange.INCREASED =>
// some delayed operations may be unblocked after HW changed
delayedProducePurgatory.checkAndComplete(requestKey)
delayedFetchPurgatory.checkAndComplete(requestKey)
delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
case LeaderHwChange.SAME =>
// probably unblock some follower fetch requests since log end offset has been updated
delayedFetchPurgatory.checkAndComplete(requestKey)
case LeaderHwChange.NONE =>
// nothing
}
}
}
}
/**
* Append messages to offsets topic, and wait for them to be replicated to other replicas;
* the callback function will be triggered either when timeout or the required acks are satisfied;
* if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.
* This method should not return until the write to the local log is completed because updating offsets requires updating
* the in-memory and persisted state under a lock together.
*
* Noted that all pending delayed check operations are stored in a queue. All callers to ReplicaManager.appendRecords()
* are expected to call ActionQueue.tryCompleteActions for all affected partitions, without holding any conflicting
* locks.
*
* If appending transactional records, call maybeStartTransactionVerificationForPartition(s) and call this method in the callback.
* For example, the GroupCoordinator will pass `doCommitTxnOffsets` as the post-verification callback, and that method eventually call this.
*
* @param timeout maximum time we will wait to append before returning
* @param requiredAcks number of replicas who must acknowledge the append before sending the response
* @param entriesPerPartition the records per partition to be appended
* @param responseCallback callback for sending the response
* @param delayedProduceLock lock for the delayed actions
* @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the
* thread calling this method
* @param verificationGuards the mapping from topic partition to verification guards if transaction verification is used
*/
def appendForGroup(
timeout: Long,
requiredAcks: Short,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock],
requestLocal: RequestLocal,
verificationGuards: Map[TopicPartition, VerificationGuard]
): Unit = {
if (!isValidRequiredAcks(requiredAcks)) {
sendInvalidRequiredAcksResponse(entriesPerPartition, responseCallback)
return
}
// This should only be called on __consumer_offsets until KAFKA-15987 is complete
entriesPerPartition.keys.foreach { tp =>
if (!tp.topic.equals(Topic.GROUP_METADATA_TOPIC_NAME))
throw new IllegalArgumentException()
}
val sTime = time.milliseconds
val localProduceResults = appendToLocalLog(
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition,
requiredAcks,
requestLocal,
verificationGuards.toMap
)
debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
val allResults = localProduceResults
val produceStatus = buildProducePartitionStatus(allResults)
addCompletePurgatoryAction(defaultActionQueue, allResults)
maybeAddDelayedProduce(
requiredAcks,
delayedProduceLock,
timeout,
entriesPerPartition,
allResults,
produceStatus,
responseCallback
)
}
private def maybeAddDelayedProduce(
requiredAcks: Short,
delayedProduceLock: Option[Lock],
timeoutMs: Long,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
initialAppendResults: Map[TopicPartition, LogAppendResult],
initialProduceStatus: Map[TopicPartition, ProducePartitionStatus],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
): Unit = {
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, initialAppendResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus)
val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback, delayedProduceLock)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed produce operation is being created, new
// requests may arrive and hence make this operation completable.
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else {
// we can respond immediately
val produceResponseStatus = initialProduceStatus.map { case (k, status) => k -> status.responseStatus }
responseCallback(produceResponseStatus)
}
}
private def sendInvalidRequiredAcksResponse(entries: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit): Unit = {
// If required.acks is outside accepted range, something is wrong with the client
// Just return an error and don't handle the request at all
val responseStatus = entries.map { case (topicPartition, _) =>
topicPartition -> new PartitionResponse(
Errors.INVALID_REQUIRED_ACKS,
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
RecordBatch.NO_TIMESTAMP,
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
)
}
responseCallback(responseStatus)
}
/**
*
* @param topicPartition the topic partition to maybe verify
* @param transactionalId the transactional id for the transaction
* @param producerId the producer id for the producer writing to the transaction
* @param producerEpoch the epoch of the producer writing to the transaction
* @param baseSequence the base sequence of the first record in the batch we are trying to append
* @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the
* thread calling this method
* @param callback the method to execute once the verification is either completed or returns an error
*
* When the verification returns, the callback will be supplied the error if it exists or Errors.NONE.
* If the verification guard exists, it will also be supplied. Otherwise the SENTINEL verification guard will be returned.
* This guard can not be used for verification and any appends that attenpt to use it will fail.
*/
def maybeStartTransactionVerificationForPartition(
topicPartition: TopicPartition,
transactionalId: String,
producerId: Long,
producerEpoch: Short,
baseSequence: Int,
requestLocal: RequestLocal,
callback: (Errors, RequestLocal, VerificationGuard) => Unit
): Unit = {
def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors],
newRequestLocal: RequestLocal,
verificationGuards: Map[TopicPartition, VerificationGuard]): Unit = {
callback(
preAppendErrors.getOrElse(topicPartition, Errors.NONE),
newRequestLocal,
verificationGuards.getOrElse(topicPartition, VerificationGuard.SENTINEL))
}
maybeStartTransactionVerificationForPartitions(
Map(topicPartition -> baseSequence),
transactionalId,
producerId,
producerEpoch,
requestLocal,
generalizedCallback
)
}
/**
*
* @param topicPartitionBatchInfo the topic partitions to maybe verify mapped to the base sequence of their first record batch
* @param transactionalId the transactional id for the transaction
* @param producerId the producer id for the producer writing to the transaction
* @param producerEpoch the epoch of the producer writing to the transaction
* @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the
* thread calling this method
* @param callback the method to execute once the verification is either completed or returns an error
*
* When the verification returns, the callback will be supplied the errors per topic partition if there were errors.
* The callback will also be supplied the verification guards per partition if they exist. It is possible to have an
* error and a verification guard for a topic partition if the topic partition was unable to be verified by the transaction
* coordinator. Transaction coordinator errors are mapped to append-friendly errors. The callback is wrapped so that it
* is scheduled on a request handler thread. There, it should be called with that request handler thread's thread local and
* not the one supplied to this method.
*/
def maybeStartTransactionVerificationForPartitions(
topicPartitionBatchInfo: Map[TopicPartition, Int],
transactionalId: String,
producerId: Long,
producerEpoch: Short,
requestLocal: RequestLocal,
callback: (Map[TopicPartition, Errors], RequestLocal, Map[TopicPartition, VerificationGuard]) => Unit
): Unit = {
// Skip verification if the request is not transactional or transaction verification is disabled.
if (transactionalId == null ||
!config.transactionPartitionVerificationEnable
|| addPartitionsToTxnManager.isEmpty
) {
callback(Map.empty[TopicPartition, Errors], requestLocal, Map.empty[TopicPartition, VerificationGuard])
return
}
val verificationGuards = mutable.Map[TopicPartition, VerificationGuard]()
val errors = mutable.Map[TopicPartition, Errors]()
topicPartitionBatchInfo.map { case (topicPartition, baseSequence) =>
val errorOrGuard = maybeStartTransactionVerificationForPartition(
topicPartition,
producerId,
producerEpoch,
baseSequence
)
errorOrGuard match {
case Left(error) => errors.put(topicPartition, error)
case Right(verificationGuard) => if (verificationGuard != VerificationGuard.SENTINEL)
verificationGuards.put(topicPartition, verificationGuard)
}
}
// No partitions require verification.
if (verificationGuards.isEmpty) {
callback(errors.toMap, requestLocal, Map.empty[TopicPartition, VerificationGuard])
return
}
def invokeCallback(
requestLocal: RequestLocal,
verificationErrors: Map[TopicPartition, Errors]
): Unit = {
// Map transaction coordinator errors to known errors for the response
val convertedErrors = verificationErrors.map { case (tp, error) =>
error match {
case Errors.CONCURRENT_TRANSACTIONS |
Errors.COORDINATOR_LOAD_IN_PROGRESS |
Errors.COORDINATOR_NOT_AVAILABLE |
Errors.NOT_COORDINATOR => tp -> Errors.NOT_ENOUGH_REPLICAS
case _ => tp -> error
}
}
callback(errors ++ convertedErrors, requestLocal, verificationGuards.toMap)
}
// Wrap the callback to be handled on an arbitrary request handler thread when transaction verification is complete.
val verificationCallback = KafkaRequestHandler.wrapAsyncCallback(
invokeCallback,
requestLocal
)
addPartitionsToTxnManager.get.verifyTransaction(
transactionalId = transactionalId,
producerId = producerId,
producerEpoch = producerEpoch,
topicPartitions = verificationGuards.keys.toSeq,
callback = verificationCallback
)
}
private def maybeStartTransactionVerificationForPartition(
topicPartition: TopicPartition,
producerId: Long,
producerEpoch: Short,
baseSequence: Int
): Either[Errors, VerificationGuard] = {
try {
val verificationGuard = getPartitionOrException(topicPartition)
.maybeStartTransactionVerification(producerId, baseSequence, producerEpoch)
Right(verificationGuard)
} catch {
case e: Exception =>
Left(Errors.forException(e))
}
}
/**
* Delete records on leader replicas of the partition, and wait for delete records operation be propagated to other replicas;
* the callback function will be triggered either when timeout or logStartOffset of all live replicas have reached the specified offset

View File

@ -32,7 +32,7 @@ import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordValidat
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{MockScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard}
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.mockito.Mockito.{CALLS_REAL_METHODS, mock, withSettings}
@ -168,8 +168,32 @@ object AbstractCoordinatorConcurrencyTest {
watchKeys = Collections.newSetFromMap(new ConcurrentHashMap[TopicPartitionOperationKey, java.lang.Boolean]()).asScala
}
override def maybeStartTransactionVerificationForPartition(
topicPartition: TopicPartition,
transactionalId: String,
producerId: Long,
producerEpoch: Short,
baseSequence: Int,
requestLocal: RequestLocal,
callback: (Errors, RequestLocal, VerificationGuard) => Unit
): Unit = {
// Skip verification
callback(Errors.NONE, requestLocal, VerificationGuard.SENTINEL)
}
override def tryCompleteActions(): Unit = watchKeys.map(producePurgatory.checkAndComplete)
override def appendForGroup(timeout: Long,
requiredAcks: Short,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
requestLocal: RequestLocal = RequestLocal.NoCaching,
verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = {
appendRecords(timeout, requiredAcks, true, AppendOrigin.COORDINATOR, entriesPerPartition, responseCallback,
delayedProduceLock, requestLocal = requestLocal)
}
override def appendRecords(timeout: Long,
requiredAcks: Short,
internalTopicsAllowed: Boolean,

View File

@ -38,7 +38,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.AppendOrigin
import org.apache.kafka.storage.internals.log.VerificationGuard
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@ -3782,6 +3782,47 @@ class GroupCoordinatorTest {
assertTrue(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, () => true))
}
@Test
def testVerificationErrorsForTxnOffsetCommits(): Unit = {
val tip1 = new TopicIdPartition(Uuid.randomUuid(), 0, "topic-1")
val offset1 = offsetAndMetadata(0)
val tip2 = new TopicIdPartition(Uuid.randomUuid(), 0, "topic-2")
val offset2 = offsetAndMetadata(0)
val producerId = 1000L
val producerEpoch: Short = 2
def verifyErrors(error: Errors, expectedError: Errors): Unit = {
val commitOffsetResult = commitTransactionalOffsets(groupId,
producerId,
producerEpoch,
Map(tip1 -> offset1, tip2 -> offset2),
verificationError = error)
assertEquals(expectedError, commitOffsetResult(tip1))
assertEquals(expectedError, commitOffsetResult(tip2))
}
verifyErrors(Errors.INVALID_PRODUCER_ID_MAPPING, Errors.INVALID_PRODUCER_ID_MAPPING)
verifyErrors(Errors.INVALID_TXN_STATE, Errors.INVALID_TXN_STATE)
verifyErrors(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE)
verifyErrors(Errors.NOT_LEADER_OR_FOLLOWER, Errors.NOT_COORDINATOR)
verifyErrors(Errors.KAFKA_STORAGE_ERROR, Errors.NOT_COORDINATOR)
}
@Test
def testTxnOffsetMetadataTooLarge(): Unit = {
val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = 37
val producerId = 100L
val producerEpoch: Short = 3
val offsets = Map(
tip -> OffsetAndMetadata(offset, "s" * (OffsetConfig.DefaultMaxMetadataSize + 1), 0)
)
val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, offsets)
assertEquals(Map(tip -> Errors.OFFSET_METADATA_TOO_LARGE), commitOffsetResult)
}
private def getGroup(groupId: String): GroupMetadata = {
val groupOpt = groupCoordinator.groupManager.getGroup(groupId)
assertTrue(groupOpt.isDefined)
@ -3855,17 +3896,13 @@ class GroupCoordinatorTest {
val capturedArgument: ArgumentCaptor[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[scala.collection.Map[TopicPartition, PartitionResponse] => Unit])
when(replicaManager.appendRecords(anyLong,
when(replicaManager.appendForGroup(anyLong,
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any[Map[TopicPartition, MemoryRecords]],
capturedArgument.capture(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any()
any(classOf[RequestLocal]),
any[Map[TopicPartition, VerificationGuard]]
)).thenAnswer(_ => {
capturedArgument.getValue.apply(
Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
@ -3891,17 +3928,14 @@ class GroupCoordinatorTest {
val capturedArgument: ArgumentCaptor[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[scala.collection.Map[TopicPartition, PartitionResponse] => Unit])
when(replicaManager.appendRecords(anyLong,
when(replicaManager.appendForGroup(anyLong,
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any[Map[TopicPartition, MemoryRecords]],
capturedArgument.capture(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any())).thenAnswer(_ => {
any(classOf[RequestLocal]),
any[Map[TopicPartition, VerificationGuard]]
)).thenAnswer(_ => {
capturedArgument.getValue.apply(
Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)
@ -4037,17 +4071,13 @@ class GroupCoordinatorTest {
val capturedArgument: ArgumentCaptor[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[scala.collection.Map[TopicPartition, PartitionResponse] => Unit])
when(replicaManager.appendRecords(anyLong,
when(replicaManager.appendForGroup(anyLong,
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any[Map[TopicPartition, MemoryRecords]],
capturedArgument.capture(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any()
any(classOf[RequestLocal]),
any[Map[TopicPartition, VerificationGuard]]
)).thenAnswer(_ => {
capturedArgument.getValue.apply(
Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
@ -4067,27 +4097,32 @@ class GroupCoordinatorTest {
offsets: Map[TopicIdPartition, OffsetAndMetadata],
memberId: String = JoinGroupRequest.UNKNOWN_MEMBER_ID,
groupInstanceId: Option[String] = Option.empty,
generationId: Int = JoinGroupRequest.UNKNOWN_GENERATION_ID) : CommitOffsetCallbackParams = {
generationId: Int = JoinGroupRequest.UNKNOWN_GENERATION_ID,
verificationError: Errors = Errors.NONE) : CommitOffsetCallbackParams = {
val (responseFuture, responseCallback) = setupCommitOffsetsCallback
val capturedArgument: ArgumentCaptor[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[scala.collection.Map[TopicPartition, PartitionResponse] => Unit])
// Since transactional ID is only used in appendRecords, we can use a dummy value. Ensure it passes through.
// Since transactional ID is only used for verification, we can use a dummy value. Ensure it passes through.
val transactionalId = "dummy-txn-id"
when(replicaManager.appendRecords(anyLong,
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
val postVerificationCallback: ArgumentCaptor[(Errors, RequestLocal, VerificationGuard) => Unit] = ArgumentCaptor.forClass(classOf[(Errors, RequestLocal, VerificationGuard) => Unit])
when(replicaManager.maybeStartTransactionVerificationForPartition(ArgumentMatchers.eq(offsetTopicPartition), ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId), ArgumentMatchers.eq(producerEpoch), any(), any(), postVerificationCallback.capture())).thenAnswer(
_ => postVerificationCallback.getValue()(verificationError, RequestLocal.NoCaching, VerificationGuard.SENTINEL)
)
when(replicaManager.appendForGroup(anyLong,
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any[Map[TopicPartition, MemoryRecords]],
capturedArgument.capture(),
any[Option[ReentrantLock]],
any(),
any(),
ArgumentMatchers.eq(transactionalId),
any()
any(classOf[RequestLocal]),
any[Map[TopicPartition, VerificationGuard]]
)).thenAnswer(_ => {
capturedArgument.getValue.apply(
Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) ->
Map(offsetTopicPartition ->
new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)
)
)

View File

@ -47,17 +47,15 @@ import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, LogAppendInfo, LogOffsetMetadata}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, LogAppendInfo, LogOffsetMetadata, VerificationGuard}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.ArgumentMatchers.{any, anyInt, anyLong, anyShort}
import org.mockito.Mockito.{mock, reset, times, verify, when}
import scala.jdk.CollectionConverters._
import scala.collection._
import scala.collection.{immutable, _}
class GroupMetadataManagerTest {
@ -1177,16 +1175,12 @@ class GroupMetadataManagerTest {
groupMetadataManager.storeGroup(group, Map.empty, callback)
assertEquals(Some(expectedError), maybeError)
verify(replicaManager).appendRecords(anyLong(),
verify(replicaManager).appendForGroup(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any(),
any(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any())
verify(replicaManager).getMagic(any())
}
@ -1215,16 +1209,12 @@ class GroupMetadataManagerTest {
groupMetadataManager.storeGroup(group, Map(memberId -> Array[Byte]()), callback)
assertEquals(Some(Errors.NONE), maybeError)
verify(replicaManager).appendRecords(anyLong(),
verify(replicaManager).appendForGroup(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any(),
any(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any())
verify(replicaManager).getMagic(any())
}
@ -1266,6 +1256,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
expectAppendMessage(Errors.NONE)
@ -1275,7 +1266,7 @@ class GroupMetadataManagerTest {
}
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None)
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
@ -1291,16 +1282,12 @@ class GroupMetadataManagerTest {
assertEquals(Errors.NONE, partitionResponse.error)
assertEquals(offset, partitionResponse.offset)
verify(replicaManager).appendRecords(anyLong(),
verify(replicaManager).appendForGroup(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any(),
any(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any())
// Will update sensor after commit
assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
@ -1313,13 +1300,13 @@ class GroupMetadataManagerTest {
val offset = 37
val producerId = 232L
val producerEpoch = 0.toShort
val transactionalId = "txnId"
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
val offsetAndMetadata = OffsetAndMetadata(offset, "", time.milliseconds())
val offsets = immutable.Map(topicIdPartition -> offsetAndMetadata)
@ -1329,22 +1316,19 @@ class GroupMetadataManagerTest {
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
val verificationGuard = new VerificationGuard()
groupMetadataManager.storeOffsets(group, memberId, offsets, callback, transactionalId, producerId, producerEpoch)
groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, producerId, producerEpoch, verificationGuard = Some(verificationGuard))
assertTrue(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)
verify(replicaManager).appendRecords(anyLong(),
verify(replicaManager).appendForGroup(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any[Map[TopicPartition, MemoryRecords]],
capturedResponseCallback.capture(),
any[Option[ReentrantLock]],
any(),
any(),
ArgumentMatchers.eq(transactionalId),
any())
ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
verify(replicaManager).getMagic(any())
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
@ -1365,23 +1349,24 @@ class GroupMetadataManagerTest {
val offset = 37
val producerId = 232L
val producerEpoch = 0.toShort
val transactionalId = "txnId"
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
val verificationGuard = new VerificationGuard()
groupMetadataManager.storeOffsets(group, memberId, offsets, callback, transactionalId, producerId, producerEpoch)
groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, producerId, producerEpoch, verificationGuard = Some(verificationGuard))
assertTrue(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)
val capturedResponseCallback = verifyAppendAndCaptureCallback()
@ -1395,17 +1380,13 @@ class GroupMetadataManagerTest {
assertFalse(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)
verify(replicaManager).appendRecords(anyLong(),
verify(replicaManager).appendForGroup(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any[Map[TopicPartition, MemoryRecords]],
any(),
any[Option[ReentrantLock]],
any(),
any(),
ArgumentMatchers.eq(transactionalId),
any())
ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
verify(replicaManager).getMagic(any())
}
@ -1416,13 +1397,13 @@ class GroupMetadataManagerTest {
val offset = 37
val producerId = 232L
val producerEpoch = 0.toShort
val transactionalId = "txnId"
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
@ -1431,8 +1412,9 @@ class GroupMetadataManagerTest {
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
val verificationGuard = new VerificationGuard()
groupMetadataManager.storeOffsets(group, memberId, offsets, callback, transactionalId, producerId, producerEpoch)
groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, producerId, producerEpoch, verificationGuard = Some(verificationGuard))
assertTrue(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)
val capturedResponseCallback = verifyAppendAndCaptureCallback()
@ -1446,71 +1428,13 @@ class GroupMetadataManagerTest {
assertFalse(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)
verify(replicaManager).appendRecords(anyLong(),
verify(replicaManager).appendForGroup(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any[Map[TopicPartition, MemoryRecords]],
any(),
any[Option[ReentrantLock]],
any(),
any(),
ArgumentMatchers.eq(transactionalId),
any())
verify(replicaManager).getMagic(any())
}
@ParameterizedTest
@EnumSource(value = classOf[Errors], names = Array("INVALID_TXN_STATE", "INVALID_PRODUCER_ID_MAPPING"))
def testTransactionalCommitOffsetTransactionalErrors(error: Errors): Unit = {
val memberId = ""
val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = 37
val producerId = 232L
val producerEpoch = 0.toShort
val transactionalId = "txnId"
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback, transactionalId, producerId, producerEpoch)
assertTrue(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)
val capturedResponseCallback = verifyAppendAndCaptureCallback()
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
assertFalse(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)
group.completePendingTxnOffsetCommit(producerId, isCommit = false)
assertFalse(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)
assertFalse(commitErrors.contains(topicIdPartition))
assertEquals(error, commitErrors.get(topicIdPartition))
verify(replicaManager).appendRecords(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any[Map[TopicPartition, MemoryRecords]],
any(),
any[Option[ReentrantLock]],
any(),
any(),
ArgumentMatchers.eq(transactionalId),
any())
ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
verify(replicaManager).getMagic(any())
}
@ -1526,6 +1450,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
@ -1533,7 +1458,7 @@ class GroupMetadataManagerTest {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None)
assertFalse(commitErrors.isEmpty)
val maybeError = commitErrors.get.get(topicIdPartition)
@ -1565,6 +1490,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
@ -1575,7 +1501,7 @@ class GroupMetadataManagerTest {
}
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None)
assertTrue(group.hasOffsets)
val capturedResponseCallback = verifyAppendAndCaptureCallback()
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
@ -1613,6 +1539,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
val offsets = immutable.Map(
topicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()),
// This will failed
@ -1627,7 +1554,7 @@ class GroupMetadataManagerTest {
}
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None)
assertTrue(group.hasOffsets)
val capturedResponseCallback = verifyAppendAndCaptureCallback()
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
@ -1652,16 +1579,12 @@ class GroupMetadataManagerTest {
cachedOffsets.get(topicIdPartitionFailed.topicPartition).map(_.offset)
)
verify(replicaManager).appendRecords(anyLong(),
verify(replicaManager).appendForGroup(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any[Map[TopicPartition, MemoryRecords]],
any(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any())
verify(replicaManager).getMagic(any())
assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
@ -1677,6 +1600,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
val offsets = immutable.Map(
topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds())
)
@ -1687,7 +1611,7 @@ class GroupMetadataManagerTest {
}
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None)
assertFalse(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
@ -1720,6 +1644,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
// expire the offset after 1 millisecond
val startMs = time.milliseconds
val offsets = immutable.Map(
@ -1733,7 +1658,7 @@ class GroupMetadataManagerTest {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None)
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
@ -1759,16 +1684,12 @@ class GroupMetadataManagerTest {
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset))
assertEquals(Some(offset), cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset))
verify(replicaManager).appendRecords(anyLong(),
verify(replicaManager).appendForGroup(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any(),
any(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any())
verify(replicaManager, times(2)).getMagic(any())
}
@ -1875,6 +1796,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
// expire the offset after 1 millisecond
val startMs = time.milliseconds
val offsets = immutable.Map(
@ -1888,7 +1810,7 @@ class GroupMetadataManagerTest {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None)
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
@ -1958,6 +1880,7 @@ class GroupMetadataManagerTest {
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
val startMs = time.milliseconds
// old clients, expiry timestamp is explicitly set
val tp1OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs, startMs + 1)
@ -1976,7 +1899,7 @@ class GroupMetadataManagerTest {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None)
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
@ -2136,6 +2059,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
// expire the offset after 1 and 3 milliseconds (old clients) and after default retention (new clients)
val startMs = time.milliseconds
// old clients, expiry timestamp is explicitly set
@ -2151,7 +2075,7 @@ class GroupMetadataManagerTest {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None)
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
@ -2244,6 +2168,7 @@ class GroupMetadataManagerTest {
group.initNextGeneration()
group.transitionTo(Stable)
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
val startMs = time.milliseconds
val t1p0OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
@ -2265,7 +2190,7 @@ class GroupMetadataManagerTest {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None)
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
@ -2866,16 +2791,12 @@ class GroupMetadataManagerTest {
private def verifyAppendAndCaptureCallback(): ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = {
val capturedArgument: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
verify(replicaManager).appendRecords(anyLong(),
verify(replicaManager).appendForGroup(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any[Map[TopicPartition, MemoryRecords]],
capturedArgument.capture(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any())
capturedArgument
}
@ -2883,16 +2804,12 @@ class GroupMetadataManagerTest {
private def expectAppendMessage(error: Errors): ArgumentCaptor[Map[TopicPartition, MemoryRecords]] = {
val capturedCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
val capturedRecords: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
when(replicaManager.appendRecords(anyLong(),
when(replicaManager.appendForGroup(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
capturedRecords.capture(),
capturedCallback.capture(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any()
)).thenAnswer(_ => {
capturedCallback.getValue.apply(

View File

@ -2709,7 +2709,7 @@ class KafkaApisTest {
val request = buildRequest(produceRequest)
val kafkaApis = createKafkaApis()
kafkaApis.handleProduceRequest(request, RequestLocal.withThreadConfinedCaching)
verify(replicaManager).appendRecords(anyLong,

View File

@ -2548,6 +2548,66 @@ class ReplicaManagerTest {
}
}
@ParameterizedTest
@EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", "CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", "COORDINATOR_NOT_AVAILABLE"))
def testMaybeVerificationErrorConversions(error: Errors): Unit = {
val tp0 = new TopicPartition(topic, 0)
val transactionalId = "txn-id"
val producerId = 24L
val producerEpoch = 0.toShort
val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0))
try {
replicaManager.becomeLeaderOrFollower(1,
makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
(_, _) => ())
// Start verification and return the coordinator related errors.
val result = maybeStartTransactionVerificationForPartition(replicaManager, tp0, transactionalId, producerId, producerEpoch)
val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
verify(addPartitionsToTxnManager, times(1)).verifyTransaction(
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
ArgumentMatchers.eq(Seq(tp0)),
appendCallback.capture()
)
// Confirm we did not write to the log and instead returned the converted error with the correct error message.
val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue()
callback(Map(tp0 -> error).toMap)
assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.left.getOrElse(Errors.NONE))
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
@Test
def testPreVerificationError(): Unit = {
val tp0 = new TopicPartition(topic, 0)
val transactionalId = "txn-id"
val producerId = 24L
val producerEpoch = 0.toShort
val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0))
try {
val result = maybeStartTransactionVerificationForPartition(replicaManager, tp0, transactionalId, producerId, producerEpoch)
val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
verify(addPartitionsToTxnManager, times(0)).verifyTransaction(
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
ArgumentMatchers.eq(Seq(tp0)),
appendCallback.capture()
)
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, result.assertFired.left.getOrElse(Errors.NONE))
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testFullLeaderAndIsrStrayPartitions(zkMigrationEnabled: Boolean): Unit = {
@ -2962,6 +3022,32 @@ class ReplicaManagerTest {
result
}
private def maybeStartTransactionVerificationForPartition(replicaManager: ReplicaManager,
topicPartition: TopicPartition,
transactionalId: String,
producerId: Long,
producerEpoch: Short,
baseSequence: Int = 0): CallbackResult[Either[Errors, VerificationGuard]] = {
val result = new CallbackResult[Either[Errors, VerificationGuard]]()
def postVerificationCallback(error: Errors,
requestLocal: RequestLocal,
verificationGuard: VerificationGuard): Unit = {
val errorOrGuard = if (error != Errors.NONE) Left(error) else Right(verificationGuard)
result.fire(errorOrGuard)
}
replicaManager.maybeStartTransactionVerificationForPartition(
topicPartition,
transactionalId,
producerId,
producerEpoch,
baseSequence,
RequestLocal.NoCaching,
postVerificationCallback
)
result
}
private def fetchPartitionAsConsumer(
replicaManager: ReplicaManager,
partition: TopicIdPartition,