KAFKA-14372: Choose replicas only from ISR for preferred read replica (#12877)

The default replica selector chooses a replica on whether the broker.rack matches the client.rack in the fetch request and whether the offset exists in the follower. If the follower is not in the ISR, we know it's lagging behind which will also lag the consumer behind. there are two cases:
1. the follower recovers and joins the isr. the consumer will no longer fall behind.
2. the follower continues to lag behind. after 5 minutes, the consumer will refresh its preferred read replica and the leader will return the same lagging follower since the offset the consumer fetched up to is capped by the follower's HWM. this can go on indefinitely.

If the replica selector chooses a broker in the ISR then we can ensure that at least every 5 minutes the consumer will consume from an up-to-date replica. 

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Jeff Kim 2022-11-24 06:04:43 -08:00 committed by David Jacot
parent 9bc80ab118
commit fd3bf0ca65
4 changed files with 146 additions and 10 deletions

View File

@ -1244,8 +1244,14 @@ class ReplicaManager(val config: KafkaConfig,
partition.remoteReplicas.foreach { replica =>
val replicaState = replica.stateSnapshot
// Exclude replicas that don't have the requested offset (whether or not if they're in the ISR)
if (replicaState.logEndOffset >= fetchOffset && replicaState.logStartOffset <= fetchOffset) {
// Exclude replicas that are not in the ISR as the follower may lag behind. Worst case, the follower
// will continue to lag and the consumer will fall behind the produce. The leader will
// continuously pick the lagging follower when the consumer refreshes its preferred read replica.
// This can go on indefinitely.
if (partition.inSyncReplicaIds.contains(replica.brokerId) &&
replicaState.logEndOffset >= fetchOffset &&
replicaState.logStartOffset <= fetchOffset) {
replicaInfoSet.add(new DefaultReplicaView(
replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()),
replicaState.logEndOffset,

View File

@ -22,7 +22,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.FetchResponse
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@ -84,9 +84,63 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
val response = receive[FetchResponse](socket, ApiKeys.FETCH, version)
assertEquals(Errors.NONE, response.error)
assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts())
assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
} finally {
socket.close()
}
}
@Test
def testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(): Unit = {
// Create a topic with 2 replicas where broker 0 is the leader and 1 is the follower.
val admin = createAdminClient()
TestUtils.createTopicWithAdmin(
admin,
topic,
brokers,
replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
)
TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
val topicPartition = new TopicPartition(topic, 0)
val offsetMap = Map(topicPartition -> 10L)
val request = createConsumerFetchRequest(
maxResponseBytes = 1000,
maxPartitionBytes = 1000,
Seq(topicPartition),
offsetMap,
ApiKeys.FETCH.latestVersion,
maxWaitMs = 20000,
minBytes = 1,
rackId = followerBrokerId.toString
)
var response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer)
assertEquals(Errors.NONE, response.error)
assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
validatePreferredReadReplica(response, preferredReadReplica = 1)
// Shutdown follower broker. Consumer will reach out to leader after metadata.max.age.ms
brokers(followerBrokerId).shutdown()
TestUtils.waitUntilTrue(() => {
val endpoints = brokers(leaderBrokerId).metadataCache.getPartitionReplicaEndpoints(topicPartition, listenerName)
!endpoints.contains(followerBrokerId)
}, "follower is still reachable.")
response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer)
assertEquals(Errors.NONE, response.error)
assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
validatePreferredReadReplica(response, preferredReadReplica = -1)
}
private def validatePreferredReadReplica(response: FetchResponse, preferredReadReplica: Int): Unit = {
assertEquals(1, response.data.responses.size)
response.data.responses.forEach { topicResponse =>
assertEquals(1, topicResponse.partitions.size)
topicResponse.partitions.forEach { partitionResponse =>
assertEquals(preferredReadReplica, partitionResponse.preferredReadReplica)
}
}
}
}

View File

@ -53,10 +53,13 @@ class BaseFetchRequestTest extends BaseRequestTest {
offsetMap: Map[TopicPartition, Long],
version: Short,
maxWaitMs: Int = Int.MaxValue,
minBytes: Int = 0
minBytes: Int = 0,
rackId: String = ""
): FetchRequest = {
FetchRequest.Builder.forConsumer(version, maxWaitMs, minBytes, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap))
.setMaxBytes(maxResponseBytes).build()
.setMaxBytes(maxResponseBytes)
.rackId(rackId)
.build()
}
protected def createPartitionMap(maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition],

View File

@ -47,6 +47,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.{ClientMetadata, PartitionView, ReplicaSelector, ReplicaView}
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
@ -1189,7 +1190,7 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)
// Make this replica the follower
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
@ -1202,7 +1203,7 @@ class ReplicaManagerTest {
.setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ())
val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id",
InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
@ -1245,7 +1246,7 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)
// Make this replica the leader
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
@ -1258,7 +1259,7 @@ class ReplicaManagerTest {
.setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ())
val metadata = new DefaultClientMetadata("rack-a", "client-id",
InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
@ -1279,6 +1280,75 @@ class ReplicaManagerTest {
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
def testPreferredReplicaAsLeaderWhenSameRackFollowerIsOutOfIsr(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, classOf[MockReplicaSelector].getName))
try {
val leaderBrokerId = 0
val followerBrokerId = 1
val leaderNode = new Node(leaderBrokerId, "host1", 0, "rack-a")
val followerNode = new Node(followerBrokerId, "host2", 1, "rack-b")
val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
val topicId = Uuid.randomUuid()
val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0)
when(replicaManager.metadataCache.getPartitionReplicaEndpoints(
tp0,
new ListenerName("default")
)).thenReturn(Map(
leaderBrokerId -> leaderNode,
followerBrokerId -> followerNode
).toMap)
// Make this replica the leader and remove follower from ISR.
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
ApiKeys.LEADER_AND_ISR.latestVersion,
0,
0,
brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(leaderBrokerId)
.setLeaderEpoch(1)
.setIsr(Seq[Integer](leaderBrokerId).asJava)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId),
Set(leaderNode, followerNode).asJava).build()
replicaManager.becomeLeaderOrFollower(2, leaderAndIsrRequest, (_, _) => ())
val metadata = new DefaultClientMetadata("rack-b", "client-id",
InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
val consumerResult = fetchPartitionAsConsumer(
replicaManager,
tidp0,
new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()),
clientMetadata = Some(metadata)
)
// Fetch from leader succeeds
assertTrue(consumerResult.hasFired)
// PartitionView passed to ReplicaSelector should not contain the follower as it's not in the ISR
val expectedReplicaViews = Set(new DefaultReplicaView(leaderNode, 0, 0))
val partitionView = replicaManager.replicaSelectorOpt.get
.asInstanceOf[MockReplicaSelector].getPartitionViewArgument
assertTrue(partitionView.isDefined)
assertEquals(expectedReplicaViews.asJava, partitionView.get.replicas)
} finally {
replicaManager.shutdown()
}
}
@Test
def testFetchFromFollowerShouldNotRunPreferLeaderSelect(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
@ -4174,11 +4244,14 @@ class ReplicaManagerTest {
class MockReplicaSelector extends ReplicaSelector {
private val selectionCount = new AtomicLong()
private var partitionViewArgument: Option[PartitionView] = None
def getSelectionCount: Long = selectionCount.get
def getPartitionViewArgument: Option[PartitionView] = partitionViewArgument
override def select(topicPartition: TopicPartition, clientMetadata: ClientMetadata, partitionView: PartitionView): Optional[ReplicaView] = {
selectionCount.incrementAndGet()
partitionViewArgument = Some(partitionView)
Optional.of(partitionView.leader)
}
}