KAFKA-2721; Avoid handling duplicate LeaderAndISR requests

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com?

Closes #436 from lindong28/KAFKA-2721
This commit is contained in:
Dong Lin 2015-11-16 15:50:46 -08:00 committed by Jun Rao
parent 6cbd97597c
commit 6df9e7ff2c
2 changed files with 43 additions and 27 deletions

View File

@ -157,12 +157,12 @@ class Partition(val topic: String,
}
/**
* Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
* and setting the new leader and ISR
* Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset
* from the time when this broker was the leader last time) and setting the new leader and ISR.
* If the leader replica id does not change, return false to indicate the replica manager.
*/
def makeLeader(controllerId: Int,
partitionStateInfo: PartitionStateInfo, correlationId: Int) {
val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
def makeLeader(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
@ -177,29 +177,34 @@ class Partition(val topic: String,
inSyncReplicas = newInSyncReplicas
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
leaderReplicaIdOpt = Some(localBrokerId)
// construct the high watermark metadata for the new leader replica
val newLeaderReplica = getReplica().get
newLeaderReplica.convertHWToLocalOffsetMetadata()
// reset log end offset for remote replicas
assignedReplicas.foreach(r =>
if (r.brokerId != localBrokerId) r.updateLogReadResult(LogReadResult.UnknownLogReadResult))
val isNewLeader =
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {
false
} else {
leaderReplicaIdOpt = Some(localBrokerId)
true
}
val leaderReplica = getReplica().get
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(newLeaderReplica)
if (isNewLeader) {
// construct the high watermark metadata for the new leader replica
leaderReplica.convertHWToLocalOffsetMetadata()
// reset log end offset for remote replicas
assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
}
(maybeIncrementLeaderHW(leaderReplica), isNewLeader)
}
// some delayed operations may be unblocked after HW changed
if (leaderHWIncremented)
tryCompleteDelayedRequests()
isNewLeader
}
/**
* Make the local replica the follower by setting the new leader and ISR to empty
* If the leader replica id does not change, return false to indicate the replica manager
*/
def makeFollower(controllerId: Int,
partitionStateInfo: PartitionStateInfo,
correlationId: Int): Boolean = {
def makeFollower(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch

View File

@ -618,6 +618,7 @@ class ReplicaManager(val config: KafkaConfig,
"epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
.format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
topic, partition.partitionId, partitionStateInfo.allReplicas.mkString(",")))
responseMap.put((topic, partitionId), ErrorMapping.UnknownTopicOrPartitionCode)
}
} else {
// Otherwise record the error code in response
@ -663,7 +664,9 @@ class ReplicaManager(val config: KafkaConfig,
* 3. Add these partitions to the leader partitions set
*
* If an unexpected error is thrown in this function, it will be propagated to KafkaApis where
* the error message will be set on each partition since we do not know which partition caused it
* the error message will be set on each partition since we do not know which partition caused it. Otherwise,
* return the set of partitions that are made leader due to this method
*
* TODO: the above may need to be fixed later
*/
private def makeLeaders(controllerId: Int,
@ -679,18 +682,25 @@ class ReplicaManager(val config: KafkaConfig,
for (partition <- partitionState.keys)
responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()
try {
// First stop fetchers for all the partitions
replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
partitionState.foreach { state =>
stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " +
"%d epoch %d with correlation id %d for partition %s")
.format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId)))
}
// Update the partition information to be the leader
partitionState.foreach{ case (partition, partitionStateInfo) =>
partition.makeLeader(controllerId, partitionStateInfo, correlationId)}
if (partition.makeLeader(controllerId, partitionStateInfo, correlationId))
partitionsToMakeLeaders += partition
else
stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " +
"controller %d epoch %d for partition %s since it is already the leader for the partition.")
.format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(partition.topic, partition.partitionId)));
}
partitionsToMakeLeaders.foreach { partition =>
stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " +
"%d epoch %d with correlation id %d for partition %s")
.format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId)))
}
} catch {
case e: Throwable =>
partitionState.foreach { state =>
@ -709,7 +719,7 @@ class ReplicaManager(val config: KafkaConfig,
.format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
}
partitionState.keySet
partitionsToMakeLeaders
}
/*
@ -726,7 +736,8 @@ class ReplicaManager(val config: KafkaConfig,
* are guaranteed to be flushed to disks
*
* If an unexpected error is thrown in this function, it will be propagated to KafkaApis where
* the error message will be set on each partition since we do not know which partition caused it
* the error message will be set on each partition since we do not know which partition caused it. Otherwise,
* return the set of partitions that are made follower due to this method
*/
private def makeFollowers(controllerId: Int,
epoch: Int,