mirror of https://github.com/apache/kafka.git
MINOR Re-add action queue parameter removed from appendRecords (#14753)
In91fa196, I accidentally removed the action queue paramater that was added in7d147cf. I also renamed the actionQueue as to not confuse this in the future. I don't think this broke anything since we don't use verification for group coordinator commits, but I should fix it to be as it was before. Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
parent
bd030f3499
commit
83b7c9a053
|
|
@ -740,9 +740,9 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
/**
|
||||
* TODO: move this action queue to handle thread so we can simplify concurrency handling
|
||||
*/
|
||||
private val actionQueue = new DelayedActionQueue
|
||||
private val defaultActionQueue = new DelayedActionQueue
|
||||
|
||||
def tryCompleteActions(): Unit = actionQueue.tryCompleteActions()
|
||||
def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions()
|
||||
|
||||
/**
|
||||
* Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
|
||||
|
|
@ -763,7 +763,7 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
* @param recordConversionStatsCallback callback for updating stats on record conversions
|
||||
* @param requestLocal container for the stateful instances scoped to this request
|
||||
* @param transactionalId transactional ID if the request is from a producer and the producer is transactional
|
||||
* @param actionQueue the action queue to use. ReplicaManager#actionQueue is used by default.
|
||||
* @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default.
|
||||
*/
|
||||
def appendRecords(timeout: Long,
|
||||
requiredAcks: Short,
|
||||
|
|
@ -775,7 +775,7 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (),
|
||||
requestLocal: RequestLocal = RequestLocal.NoCaching,
|
||||
transactionalId: String = null,
|
||||
actionQueue: ActionQueue = this.actionQueue): Unit = {
|
||||
actionQueue: ActionQueue = this.defaultActionQueue): Unit = {
|
||||
if (isValidRequiredAcks(requiredAcks)) {
|
||||
|
||||
val verificationGuards: mutable.Map[TopicPartition, VerificationGuard] = mutable.Map[TopicPartition, VerificationGuard]()
|
||||
|
|
@ -792,7 +792,7 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
|
||||
if (notYetVerifiedEntriesPerPartition.isEmpty || addPartitionsToTxnManager.isEmpty) {
|
||||
appendEntries(verifiedEntriesPerPartition, internalTopicsAllowed, origin, requiredAcks, verificationGuards.toMap,
|
||||
errorsPerPartition, recordConversionStatsCallback, timeout, responseCallback, delayedProduceLock)(requestLocal, Map.empty)
|
||||
errorsPerPartition, recordConversionStatsCallback, timeout, responseCallback, delayedProduceLock, actionQueue)(requestLocal, Map.empty)
|
||||
} else {
|
||||
// For unverified entries, send a request to verify. When verified, the append process will proceed via the callback.
|
||||
// We verify above that all partitions use the same producer ID.
|
||||
|
|
@ -813,7 +813,8 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
recordConversionStatsCallback,
|
||||
timeout,
|
||||
responseCallback,
|
||||
delayedProduceLock
|
||||
delayedProduceLock,
|
||||
actionQueue
|
||||
),
|
||||
requestLocal)
|
||||
))
|
||||
|
|
@ -846,7 +847,8 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit,
|
||||
timeout: Long,
|
||||
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
|
||||
delayedProduceLock: Option[Lock])
|
||||
delayedProduceLock: Option[Lock],
|
||||
actionQueue: ActionQueue)
|
||||
(requestLocal: RequestLocal, unverifiedEntries: Map[TopicPartition, Errors]): Unit = {
|
||||
val sTime = time.milliseconds
|
||||
val verifiedEntries =
|
||||
|
|
@ -898,19 +900,18 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
}
|
||||
|
||||
actionQueue.add {
|
||||
() =>
|
||||
allResults.foreach { case (topicPartition, result) =>
|
||||
val requestKey = TopicPartitionOperationKey(topicPartition)
|
||||
result.info.leaderHwChange match {
|
||||
case LeaderHwChange.INCREASED =>
|
||||
// some delayed operations may be unblocked after HW changed
|
||||
delayedProducePurgatory.checkAndComplete(requestKey)
|
||||
delayedFetchPurgatory.checkAndComplete(requestKey)
|
||||
delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
|
||||
case LeaderHwChange.SAME =>
|
||||
// probably unblock some follower fetch requests since log end offset has been updated
|
||||
delayedFetchPurgatory.checkAndComplete(requestKey)
|
||||
case LeaderHwChange.NONE =>
|
||||
() => allResults.foreach { case (topicPartition, result) =>
|
||||
val requestKey = TopicPartitionOperationKey(topicPartition)
|
||||
result.info.leaderHwChange match {
|
||||
case LeaderHwChange.INCREASED =>
|
||||
// some delayed operations may be unblocked after HW changed
|
||||
delayedProducePurgatory.checkAndComplete(requestKey)
|
||||
delayedFetchPurgatory.checkAndComplete(requestKey)
|
||||
delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
|
||||
case LeaderHwChange.SAME =>
|
||||
// probably unblock some follower fetch requests since log end offset has been updated
|
||||
delayedFetchPurgatory.checkAndComplete(requestKey)
|
||||
case LeaderHwChange.NONE =>
|
||||
// nothing
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue