From fd3bf0ca65bd64ea7c71829fc31e39efbcb0fec7 Mon Sep 17 00:00:00 2001 From: Jeff Kim Date: Thu, 24 Nov 2022 06:04:43 -0800 Subject: [PATCH] KAFKA-14372: Choose replicas only from ISR for preferred read replica (#12877) The default replica selector chooses a replica on whether the broker.rack matches the client.rack in the fetch request and whether the offset exists in the follower. If the follower is not in the ISR, we know it's lagging behind which will also lag the consumer behind. there are two cases: 1. the follower recovers and joins the isr. the consumer will no longer fall behind. 2. the follower continues to lag behind. after 5 minutes, the consumer will refresh its preferred read replica and the leader will return the same lagging follower since the offset the consumer fetched up to is capped by the follower's HWM. this can go on indefinitely. If the replica selector chooses a broker in the ISR then we can ensure that at least every 5 minutes the consumer will consume from an up-to-date replica. Reviewers: David Jacot --- .../scala/kafka/server/ReplicaManager.scala | 10 ++- .../FetchFromFollowerIntegrationTest.scala | 58 ++++++++++++- .../kafka/server/BaseFetchRequestTest.scala | 7 +- .../kafka/server/ReplicaManagerTest.scala | 81 ++++++++++++++++++- 4 files changed, 146 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index d811611600c..744e3780d8d 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1244,8 +1244,14 @@ class ReplicaManager(val config: KafkaConfig, partition.remoteReplicas.foreach { replica => val replicaState = replica.stateSnapshot - // Exclude replicas that don't have the requested offset (whether or not if they're in the ISR) - if (replicaState.logEndOffset >= fetchOffset && replicaState.logStartOffset <= fetchOffset) { + // Exclude replicas that are not in the ISR as the follower may lag behind. Worst case, the follower + // will continue to lag and the consumer will fall behind the produce. The leader will + // continuously pick the lagging follower when the consumer refreshes its preferred read replica. + // This can go on indefinitely. + if (partition.inSyncReplicaIds.contains(replica.brokerId) && + replicaState.logEndOffset >= fetchOffset && + replicaState.logStartOffset <= fetchOffset) { + replicaInfoSet.add(new DefaultReplicaView( replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()), replicaState.logEndOffset, diff --git a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala index b418fc72d63..a1e0d20b4e8 100644 --- a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala @@ -22,7 +22,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.FetchResponse import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Timeout +import org.junit.jupiter.api.{Test, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -84,9 +84,63 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1) val response = receive[FetchResponse](socket, ApiKeys.FETCH, version) assertEquals(Errors.NONE, response.error) - assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts()) + assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts) } finally { socket.close() } } + + @Test + def testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(): Unit = { + // Create a topic with 2 replicas where broker 0 is the leader and 1 is the follower. + val admin = createAdminClient() + TestUtils.createTopicWithAdmin( + admin, + topic, + brokers, + replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId)) + ) + + TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10) + + val topicPartition = new TopicPartition(topic, 0) + val offsetMap = Map(topicPartition -> 10L) + + val request = createConsumerFetchRequest( + maxResponseBytes = 1000, + maxPartitionBytes = 1000, + Seq(topicPartition), + offsetMap, + ApiKeys.FETCH.latestVersion, + maxWaitMs = 20000, + minBytes = 1, + rackId = followerBrokerId.toString + ) + var response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer) + assertEquals(Errors.NONE, response.error) + assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts) + validatePreferredReadReplica(response, preferredReadReplica = 1) + + // Shutdown follower broker. Consumer will reach out to leader after metadata.max.age.ms + brokers(followerBrokerId).shutdown() + TestUtils.waitUntilTrue(() => { + val endpoints = brokers(leaderBrokerId).metadataCache.getPartitionReplicaEndpoints(topicPartition, listenerName) + !endpoints.contains(followerBrokerId) + }, "follower is still reachable.") + + response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer) + assertEquals(Errors.NONE, response.error) + assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts) + validatePreferredReadReplica(response, preferredReadReplica = -1) + } + + private def validatePreferredReadReplica(response: FetchResponse, preferredReadReplica: Int): Unit = { + assertEquals(1, response.data.responses.size) + response.data.responses.forEach { topicResponse => + assertEquals(1, topicResponse.partitions.size) + topicResponse.partitions.forEach { partitionResponse => + assertEquals(preferredReadReplica, partitionResponse.preferredReadReplica) + } + } + } } diff --git a/core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala index 22b213f8cf8..458973700bd 100644 --- a/core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala @@ -53,10 +53,13 @@ class BaseFetchRequestTest extends BaseRequestTest { offsetMap: Map[TopicPartition, Long], version: Short, maxWaitMs: Int = Int.MaxValue, - minBytes: Int = 0 + minBytes: Int = 0, + rackId: String = "" ): FetchRequest = { FetchRequest.Builder.forConsumer(version, maxWaitMs, minBytes, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap)) - .setMaxBytes(maxResponseBytes).build() + .setMaxBytes(maxResponseBytes) + .rackId(rackId) + .build() } protected def createPartitionMap(maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition], diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 2a5961b8c15..8c66c28103e 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -47,6 +47,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record._ import org.apache.kafka.common.replica.{ClientMetadata, PartitionView, ReplicaSelector, ReplicaView} import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata +import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ @@ -1189,7 +1190,7 @@ class ReplicaManagerTest { val tidp0 = new TopicIdPartition(topicId, tp0) // Make this replica the follower - val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, Seq(new LeaderAndIsrPartitionState() .setTopicName(topic) .setPartitionIndex(0) @@ -1202,7 +1203,7 @@ class ReplicaManagerTest { .setIsNew(false)).asJava, Collections.singletonMap(topic, topicId), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) + replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ()) val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id", InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") @@ -1245,7 +1246,7 @@ class ReplicaManagerTest { val tidp0 = new TopicIdPartition(topicId, tp0) // Make this replica the leader - val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, Seq(new LeaderAndIsrPartitionState() .setTopicName(topic) .setPartitionIndex(0) @@ -1258,7 +1259,7 @@ class ReplicaManagerTest { .setIsNew(false)).asJava, Collections.singletonMap(topic, topicId), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) + replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ()) val metadata = new DefaultClientMetadata("rack-a", "client-id", InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") @@ -1279,6 +1280,75 @@ class ReplicaManagerTest { TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } + @Test + def testPreferredReplicaAsLeaderWhenSameRackFollowerIsOutOfIsr(): Unit = { + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, classOf[MockReplicaSelector].getName)) + + try { + val leaderBrokerId = 0 + val followerBrokerId = 1 + val leaderNode = new Node(leaderBrokerId, "host1", 0, "rack-a") + val followerNode = new Node(followerBrokerId, "host2", 1, "rack-b") + val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava + val topicId = Uuid.randomUuid() + val tp0 = new TopicPartition(topic, 0) + val tidp0 = new TopicIdPartition(topicId, tp0) + + when(replicaManager.metadataCache.getPartitionReplicaEndpoints( + tp0, + new ListenerName("default") + )).thenReturn(Map( + leaderBrokerId -> leaderNode, + followerBrokerId -> followerNode + ).toMap) + + // Make this replica the leader and remove follower from ISR. + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder( + ApiKeys.LEADER_AND_ISR.latestVersion, + 0, + 0, + brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(leaderBrokerId) + .setLeaderEpoch(1) + .setIsr(Seq[Integer](leaderBrokerId).asJava) + .setPartitionEpoch(0) + .setReplicas(brokerList) + .setIsNew(false)).asJava, + Collections.singletonMap(topic, topicId), + Set(leaderNode, followerNode).asJava).build() + + replicaManager.becomeLeaderOrFollower(2, leaderAndIsrRequest, (_, _) => ()) + + val metadata = new DefaultClientMetadata("rack-b", "client-id", + InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") + + val consumerResult = fetchPartitionAsConsumer( + replicaManager, + tidp0, + new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), + clientMetadata = Some(metadata) + ) + + // Fetch from leader succeeds + assertTrue(consumerResult.hasFired) + + // PartitionView passed to ReplicaSelector should not contain the follower as it's not in the ISR + val expectedReplicaViews = Set(new DefaultReplicaView(leaderNode, 0, 0)) + val partitionView = replicaManager.replicaSelectorOpt.get + .asInstanceOf[MockReplicaSelector].getPartitionViewArgument + + assertTrue(partitionView.isDefined) + assertEquals(expectedReplicaViews.asJava, partitionView.get.replicas) + } finally { + replicaManager.shutdown() + } + } + @Test def testFetchFromFollowerShouldNotRunPreferLeaderSelect(): Unit = { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), @@ -4174,11 +4244,14 @@ class ReplicaManagerTest { class MockReplicaSelector extends ReplicaSelector { private val selectionCount = new AtomicLong() + private var partitionViewArgument: Option[PartitionView] = None def getSelectionCount: Long = selectionCount.get + def getPartitionViewArgument: Option[PartitionView] = partitionViewArgument override def select(topicPartition: TopicPartition, clientMetadata: ClientMetadata, partitionView: PartitionView): Optional[ReplicaView] = { selectionCount.incrementAndGet() + partitionViewArgument = Some(partitionView) Optional.of(partitionView.leader) } }