KAFKA-13778: Fetch from follower should never run the preferred read replica selection (#11965)

The current preferred read replica selection logic relies on `partition.leaderReplicaIdOpt` to determine if the selection must be run. The issue is that `partition.leaderReplicaIdOpt` is defined for both the leader and the followers thus the logic is ran all the time. The impact is not too bad as the leader is selected most of the time when the logic is ran by the follower and the leader is filtered out. However there are cases where the selection on a follower could redirect the consumer to another follower under certain rare conditions. For instance with the `RackAwareReplicaSelector `, the follower must have stale replica states from a previous leadership and must have other followers in the same rack for instance. Other implementation of the selection logic could be more impacted.

This patch ensures that the preferred read replica selection is only ran by the leader.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
bozhao12 2022-04-06 00:56:23 +08:00 committed by GitHub
parent 3ceedac79e
commit 4218fc61fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 68 additions and 3 deletions

View File

@ -256,6 +256,7 @@ class Partition(val topicPartition: TopicPartition,
// start offset for 'leaderEpoch' above (leader epoch of the current leader for this partition),
// defined when this broker is leader for partition
@volatile private var leaderEpochStartOffsetOpt: Option[Long] = None
// Replica ID of the leader, defined when this broker is leader or follower for the partition.
@volatile var leaderReplicaIdOpt: Option[Int] = None
@volatile private[cluster] var partitionState: PartitionState = CommittedPartitionState(Set.empty, LeaderRecoveryState.RECOVERED)
@volatile var assignmentState: AssignmentState = SimpleAssignmentState(Seq.empty)
@ -433,6 +434,10 @@ class Partition(val topicPartition: TopicPartition,
*/
def isLeader: Boolean = leaderReplicaIdOpt.contains(localBrokerId)
def leaderIdIfLocal: Option[Int] = {
leaderReplicaIdOpt.filter(_ == localBrokerId)
}
private def localLogWithEpochOrException(currentLeaderEpoch: Optional[Integer],
requireLeader: Boolean): UnifiedLog = {
getLocalLog(currentLeaderEpoch, requireLeader) match {

View File

@ -1233,7 +1233,7 @@ class ReplicaManager(val config: KafkaConfig,
replicaId: Int,
fetchOffset: Long,
currentTimeMs: Long): Option[Int] = {
partition.leaderReplicaIdOpt.flatMap { leaderReplicaId =>
partition.leaderIdIfLocal.flatMap { leaderReplicaId =>
// Don't look up preferred for follower fetches via normal replication
if (Request.isValidBrokerId(replicaId))
None

View File

@ -21,7 +21,7 @@ import java.io.File
import java.net.InetAddress
import java.nio.file.Files
import java.util
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.stream.IntStream
import java.util.{Collections, Optional, Properties}
@ -44,7 +44,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.replica.{ClientMetadata, PartitionView, ReplicaSelector, ReplicaView}
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@ -1300,6 +1300,54 @@ class ReplicaManagerTest {
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
def testFetchFromFollowerShouldNotRunPreferLeaderSelect(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, classOf[MockReplicaSelector].getName))
try {
val leaderBrokerId = 0
val followerBrokerId = 1
val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
val topicId = Uuid.randomUuid()
val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0)
initializeLogAndTopicId(replicaManager, tp0, topicId)
// Make this replica the follower
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(1)
.setLeaderEpoch(1)
.setIsr(brokerList)
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ())
val metadata = new DefaultClientMetadata("rack-a", "client-id",
InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, "default")
val consumerResult = fetchAsConsumer(replicaManager, tidp0,
new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000,
Optional.empty()), clientMetadata = Some(metadata))
// Fetch from follower succeeds
assertTrue(consumerResult.isFired)
// Expect not run the preferred read replica selection
assertEquals(0, replicaManager.replicaSelectorOpt.get.asInstanceOf[MockReplicaSelector].getSelectionCount)
// Only leader will compute preferred replica
assertTrue(consumerResult.assertFired.preferredReadReplica.isEmpty)
} finally replicaManager.shutdown(checkpointHW = false)
}
@Test
def testFetchShouldReturnImmediatelyWhenPreferredReadReplicaIsDefined(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
@ -3719,3 +3767,15 @@ class ReplicaManagerTest {
}
}
}
class MockReplicaSelector extends ReplicaSelector {
private val selectionCount = new AtomicLong()
def getSelectionCount: Long = selectionCount.get
override def select(topicPartition: TopicPartition, clientMetadata: ClientMetadata, partitionView: PartitionView): Optional[ReplicaView] = {
selectionCount.incrementAndGet()
Optional.of(partitionView.leader)
}
}