KAFKA-9731: Disable immediate fetch response for hw propagation if replica selector is not defined (#8607)

In the case described in the JIRA, there was a 50%+ increase in the total fetch request rate in
2.4.0 due to this change.

I included a few additional clean-ups:
* Simplify `findPreferredReadReplica` and avoid unnecessary collection copies.
* Use `LongSupplier` instead of `Supplier<Long>` in `SubscriptionState` to avoid unnecessary boxing.

Added a unit test to ReplicaManagerTest and cleaned up the test class a bit including
consistent usage of Time in MockTimer and other components.

Reviewers: Gwen Shapira <gwen@confluent.io>, David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
Ismael Juma 2020-05-04 21:38:53 -07:00 committed by GitHub
parent 69586fb604
commit fbfda2c4ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 135 additions and 71 deletions

View File

@ -39,8 +39,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collector;
import java.util.stream.Collectors;
@ -516,7 +516,7 @@ public class SubscriptionState {
* @param preferredReadReplicaId The preferred read replica
* @param timeMs The time at which this preferred replica is no longer valid
*/
public synchronized void updatePreferredReadReplica(TopicPartition tp, int preferredReadReplicaId, Supplier<Long> timeMs) {
public synchronized void updatePreferredReadReplica(TopicPartition tp, int preferredReadReplicaId, LongSupplier timeMs) {
assignedState(tp).updatePreferredReadReplica(preferredReadReplicaId, timeMs);
}
@ -721,10 +721,10 @@ public class SubscriptionState {
}
}
private void updatePreferredReadReplica(int preferredReadReplica, Supplier<Long> timeMs) {
private void updatePreferredReadReplica(int preferredReadReplica, LongSupplier timeMs) {
if (this.preferredReadReplica == null || preferredReadReplica != this.preferredReadReplica) {
this.preferredReadReplica = preferredReadReplica;
this.preferredReadReplicaExpireTimeMs = timeMs.get();
this.preferredReadReplicaExpireTimeMs = timeMs.getAsLong();
}
}

View File

@ -1053,7 +1053,7 @@ class ReplicaManager(val config: KafkaConfig,
metadata => findPreferredReadReplica(partition, metadata, replicaId, fetchInfo.fetchOffset, fetchTimeMs))
if (preferredReadReplica.isDefined) {
replicaSelectorOpt.foreach{ selector =>
replicaSelectorOpt.foreach { selector =>
debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " +
s"${preferredReadReplica.get} for $clientMetadata")
}
@ -1079,9 +1079,9 @@ class ReplicaManager(val config: KafkaConfig,
fetchOnlyFromLeader = fetchOnlyFromLeader,
minOneMessage = minOneMessage)
// Check if the HW known to the follower is behind the actual HW
val followerNeedsHwUpdate: Boolean = partition.getReplica(replicaId)
.exists(replica => replica.lastSentHighWatermark < readInfo.highWatermark)
// Check if the HW known to the follower is behind the actual HW if a replica selector is defined
val followerNeedsHwUpdate = replicaSelectorOpt.isDefined &&
partition.getReplica(replicaId).exists(replica => replica.lastSentHighWatermark < readInfo.highWatermark)
val fetchDataInfo = if (shouldLeaderThrottle(quota, partition, replicaId)) {
// If the partition is being throttled, simply return an empty set.
@ -1170,44 +1170,35 @@ class ReplicaManager(val config: KafkaConfig,
replicaId: Int,
fetchOffset: Long,
currentTimeMs: Long): Option[Int] = {
if (partition.isLeader) {
if (Request.isValidBrokerId(replicaId)) {
// Don't look up preferred for follower fetches via normal replication
Option.empty
} else {
partition.leaderReplicaIdOpt.flatMap { leaderReplicaId =>
// Don't look up preferred for follower fetches via normal replication
if (Request.isValidBrokerId(replicaId))
None
else {
replicaSelectorOpt.flatMap { replicaSelector =>
val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(partition.topicPartition, new ListenerName(clientMetadata.listenerName))
var replicaInfoSet: Set[ReplicaView] = partition.remoteReplicas
val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(partition.topicPartition,
new ListenerName(clientMetadata.listenerName))
val replicaInfos = partition.remoteReplicas
// Exclude replicas that don't have the requested offset (whether or not if they're in the ISR)
.filter(replica => replica.logEndOffset >= fetchOffset)
.filter(replica => replica.logStartOffset <= fetchOffset)
.filter(replica => replica.logEndOffset >= fetchOffset && replica.logStartOffset <= fetchOffset)
.map(replica => new DefaultReplicaView(
replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()),
replica.logEndOffset,
currentTimeMs - replica.lastCaughtUpTimeMs))
.toSet
if (partition.leaderReplicaIdOpt.isDefined) {
val leaderReplica: ReplicaView = partition.leaderReplicaIdOpt
.map(replicaId => replicaEndpoints.getOrElse(replicaId, Node.noNode()))
.map(leaderNode => new DefaultReplicaView(leaderNode, partition.localLogOrException.logEndOffset, 0L))
.get
replicaInfoSet ++= Set(leaderReplica)
val leaderReplica = new DefaultReplicaView(
replicaEndpoints.getOrElse(leaderReplicaId, Node.noNode()),
partition.localLogOrException.logEndOffset, 0L)
val replicaInfoSet = mutable.Set[ReplicaView]() ++= replicaInfos += leaderReplica
val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)
replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala
.filter(!_.endpoint.isEmpty)
// Even though the replica selector can return the leader, we don't want to send it out with the
// FetchResponse, so we exclude it here
.filter(!_.equals(leaderReplica))
.map(_.endpoint.id)
} else {
None
val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)
replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala.collect {
// Even though the replica selector can return the leader, we don't want to send it out with the
// FetchResponse, so we exclude it here
case selected if !selected.endpoint.isEmpty && selected != leaderReplica => selected.endpoint.id
}
}
}
} else {
None
}
}

View File

@ -221,7 +221,7 @@ class ReplicaManagerTest {
}
private[this] def testFencedErrorCausedByBecomeLeader(loopEpochChange: Int): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
try {
val brokerList = Seq[Integer](0, 1).asJava
val topicPartition = new TopicPartition(topic, 0)
@ -277,7 +277,7 @@ class ReplicaManagerTest {
@Test
def testReceiveOutOfOrderSequenceExceptionWithLogStartOffset(): Unit = {
val timer = new MockTimer
val timer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
try {
@ -337,7 +337,7 @@ class ReplicaManagerTest {
@Test
def testReadCommittedFetchLimitedAtLSO(): Unit = {
val timer = new MockTimer
val timer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
try {
@ -444,7 +444,7 @@ class ReplicaManagerTest {
@Test
def testDelayedFetchIncludesAbortedTransactions(): Unit = {
val timer = new MockTimer
val timer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
try {
@ -521,7 +521,7 @@ class ReplicaManagerTest {
@Test
def testFetchBeyondHighWatermark(): Unit = {
val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1, 2))
val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2))
try {
val brokerList = Seq[Integer](0, 1, 2).asJava
@ -579,7 +579,7 @@ class ReplicaManagerTest {
val maxFetchBytes = 1024 * 1024
val aliveBrokersIds = Seq(0, 1)
val leaderEpoch = 5
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokersIds)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokersIds)
try {
val tp = new TopicPartition(topic, 0)
val replicas = aliveBrokersIds.toList.map(Int.box).asJava
@ -677,7 +677,7 @@ class ReplicaManagerTest {
*/
@Test
def testFetchMessagesWhenNotFollowerForOnePartition(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1, 2))
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2))
try {
// Create 2 partitions, assign replica 0 as the leader for both a different follower (1 and 2) for each
@ -791,8 +791,9 @@ class ReplicaManagerTest {
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true)
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new MockTimer(time),
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch,
expectTruncation = true, localLogOffset = Some(10))
// Initialize partition state to follower, with leader = 1, leaderEpoch = 1
val tp = new TopicPartition(topic, topicPartition)
@ -830,7 +831,7 @@ class ReplicaManagerTest {
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
val (replicaManager, _) = prepareReplicaManagerAndLogManager(
val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time),
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
@ -863,7 +864,7 @@ class ReplicaManagerTest {
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
val (replicaManager, _) = prepareReplicaManagerAndLogManager(
val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time),
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
@ -912,7 +913,7 @@ class ReplicaManagerTest {
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
val (replicaManager, _) = prepareReplicaManagerAndLogManager(
val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time),
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
@ -951,6 +952,70 @@ class ReplicaManagerTest {
assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined)
}
@Test
def testFollowerFetchWithDefaultSelectorNoForcedHwPropagation(): Unit = {
val topicPartition = 0
val followerBrokerId = 0
val leaderBrokerId = 1
val leaderEpoch = 1
val leaderEpochIncrement = 2
val countDownLatch = new CountDownLatch(1)
val timer = new MockTimer(time)
// Prepare the mocked components for the test
val (replicaManager, _) = prepareReplicaManagerAndLogManager(timer,
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
val brokerList = Seq[Integer](0, 1).asJava
val tp0 = new TopicPartition(topic, 0)
replicaManager.createPartition(new TopicPartition(topic, 0))
// Make this replica the follower
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(1)
.setIsr(brokerList)
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
val simpleRecords = Seq(new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))
val appendResult = appendRecords(replicaManager, tp0,
MemoryRecords.withRecords(CompressionType.NONE, simpleRecords.toSeq: _*), AppendOrigin.Client)
// Increment the hw in the leader by fetching from the last offset
val fetchOffset = simpleRecords.size
var followerResult = fetchAsFollower(replicaManager, tp0,
new PartitionData(fetchOffset, 0, 100000, Optional.empty()),
clientMetadata = None)
assertTrue(followerResult.isFired)
assertEquals(0, followerResult.assertFired.highWatermark)
assertTrue("Expected producer request to be acked", appendResult.isFired)
// Fetch from the same offset, no new data is expected and hence the fetch request should
// go to the purgatory
followerResult = fetchAsFollower(replicaManager, tp0,
new PartitionData(fetchOffset, 0, 100000, Optional.empty()),
clientMetadata = None, minBytes = 1000)
assertFalse("Request completed immediately unexpectedly", followerResult.isFired)
// Complete the request in the purgatory by advancing the clock
timer.advanceClock(1001)
assertTrue(followerResult.isFired)
assertEquals(fetchOffset, followerResult.assertFired.highWatermark)
}
@Test(expected = classOf[ClassNotFoundException])
def testUnknownReplicaSelector(): Unit = {
val topicPartition = 0
@ -962,7 +1027,7 @@ class ReplicaManagerTest {
val props = new Properties()
props.put(KafkaConfig.ReplicaSelectorClassProp, "non-a-class")
prepareReplicaManagerAndLogManager(
prepareReplicaManagerAndLogManager(new MockTimer(time),
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true, extraProps = props)
}
@ -976,7 +1041,7 @@ class ReplicaManagerTest {
val leaderEpochIncrement = 2
val countDownLatch = new CountDownLatch(1)
val (replicaManager, _) = prepareReplicaManagerAndLogManager(
val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time),
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
assertFalse(replicaManager.replicaSelectorOpt.isDefined)
@ -984,7 +1049,7 @@ class ReplicaManagerTest {
@Test
def testFetchFollowerNotAllowedForOlderClients(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1))
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
@ -1022,7 +1087,7 @@ class ReplicaManagerTest {
@Test
def testFetchRequestRateMetrics(): Unit = {
val mockTimer = new MockTimer
val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
@ -1067,7 +1132,7 @@ class ReplicaManagerTest {
@Test
def testBecomeFollowerWhileOldClientFetchInPurgatory(): Unit = {
val mockTimer = new MockTimer
val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
@ -1115,7 +1180,7 @@ class ReplicaManagerTest {
@Test
def testBecomeFollowerWhileNewClientFetchInPurgatory(): Unit = {
val mockTimer = new MockTimer
val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
@ -1164,7 +1229,7 @@ class ReplicaManagerTest {
@Test
def testFetchFromLeaderAlwaysAllowed(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1))
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
@ -1205,7 +1270,7 @@ class ReplicaManagerTest {
// In this case, we should ensure that pending purgatory operations are cancelled
// immediately rather than sitting around to timeout.
val mockTimer = new MockTimer
val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
@ -1244,7 +1309,7 @@ class ReplicaManagerTest {
@Test
def testClearProducePurgatoryOnStopReplica(): Unit = {
val mockTimer = new MockTimer
val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
@ -1329,22 +1394,22 @@ class ReplicaManagerTest {
* ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest containing
* 'leaderEpochInLeaderAndIsr' leader epoch for partition 'topicPartition'.
*/
private def prepareReplicaManagerAndLogManager(topicPartition: Int,
private def prepareReplicaManagerAndLogManager(timer: MockTimer,
topicPartition: Int,
leaderEpochInLeaderAndIsr: Int,
followerBrokerId: Int,
leaderBrokerId: Int,
countDownLatch: CountDownLatch,
expectTruncation: Boolean,
localLogOffset: Option[Long] = None,
offsetFromLeader: Long = 5,
leaderEpochFromLeader: Int = 3,
extraProps: Properties = new Properties()) : (ReplicaManager, LogManager) = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
props.asScala ++= extraProps.asScala
val config = KafkaConfig.fromProps(props)
// Setup mock local log to have leader epoch of 3 and offset of 10
val localLogOffset = 10
val offsetFromLeader = 5
val leaderEpochFromLeader = 3
val mockScheduler = new MockScheduler(time)
val mockBrokerTopicStats = new BrokerTopicStats
val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
@ -1365,14 +1430,17 @@ class ReplicaManagerTest {
override def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = {
assertEquals(leaderEpoch, leaderEpochFromLeader)
Some(OffsetAndEpoch(localLogOffset, leaderEpochFromLeader))
localLogOffset.map { logOffset =>
Some(OffsetAndEpoch(logOffset, leaderEpochFromLeader))
}.getOrElse(super.endOffsetForEpoch(leaderEpoch))
}
override def latestEpoch: Option[Int] = Some(leaderEpochFromLeader)
override def logEndOffsetMetadata = LogOffsetMetadata(localLogOffset)
override def logEndOffsetMetadata: LogOffsetMetadata =
localLogOffset.map(LogOffsetMetadata(_)).getOrElse(super.logEndOffsetMetadata)
override def logEndOffset: Long = localLogOffset
override def logEndOffset: Long = localLogOffset.getOrElse(super.logEndOffset)
}
// Expect to call LogManager.truncateTo exactly once
@ -1414,7 +1482,6 @@ class ReplicaManagerTest {
.anyTimes()
EasyMock.replay(metadataCache)
val timer = new MockTimer
val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", timer, reaperEnabled = false)
val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
@ -1822,7 +1889,7 @@ class ReplicaManagerTest {
@Test
def testStopReplicaWithStaleControllerEpoch(): Unit = {
val mockTimer = new MockTimer
val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
@ -1848,7 +1915,7 @@ class ReplicaManagerTest {
@Test
def testStopReplicaWithOfflinePartition(): Unit = {
val mockTimer = new MockTimer
val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
@ -1890,7 +1957,7 @@ class ReplicaManagerTest {
}
private def testStopReplicaWithInexistentPartition(deletePartitions: Boolean, throwIOException: Boolean): Unit = {
val mockTimer = new MockTimer
val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
@ -1983,7 +2050,7 @@ class ReplicaManagerTest {
deletePartition: Boolean,
throwIOException: Boolean,
expectedOutput: Errors): Unit = {
val mockTimer = new MockTimer
val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)

View File

@ -20,9 +20,8 @@ import kafka.utils.MockTime
import scala.collection.mutable
class MockTimer extends Timer {
class MockTimer(val time: MockTime = new MockTime) extends Timer {
val time = new MockTime
private val taskQueue = mutable.PriorityQueue[TimerTaskEntry]()(Ordering[TimerTaskEntry].reverse)
def add(timerTask: TimerTask): Unit = {

View File

@ -154,6 +154,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match>
<Match>
<!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 -->
<Source name="ReplicaManager.scala"/>
<Package name="kafka.server"/>
<Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
</Match>
<Match>
<!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 -->
<Source name="LogManager.scala"/>