KAFKA-15265: Remote fetch throttle metrics (#16087)

As part of [KIP-956](https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas), we have added quota for remote fetches from remote storage. In this PR, we are adding the following metrics for remote fetch throttling.

remote-fetch-throttle-time-avg : The average time in millis remote fetches was throttled by a broker 
remote-fetch-throttle-time-max : The max time in millis remote fetches was throttled by a broker

Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>
This commit is contained in:
Abhijeet Kumar 2024-07-03 14:11:18 +05:30 committed by GitHub
parent f2dbc55d24
commit b2da186f21
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 121 additions and 25 deletions

View File

@ -21,6 +21,7 @@ import kafka.cluster.Partition;
import kafka.log.UnifiedLog;
import kafka.log.remote.quota.RLMQuotaManager;
import kafka.log.remote.quota.RLMQuotaManagerConfig;
import kafka.log.remote.quota.RLMQuotaMetrics;
import kafka.server.BrokerTopicStats;
import kafka.server.QuotaType;
import kafka.server.StopPartition;
@ -34,6 +35,7 @@ import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Quota;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
@ -137,6 +139,7 @@ import java.util.stream.Stream;
import scala.Option;
import scala.collection.JavaConverters;
import static kafka.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS;
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
@ -171,6 +174,7 @@ public class RemoteLogManager implements Closeable {
private final Condition copyQuotaManagerLockCondition = copyQuotaManagerLock.newCondition();
private final RLMQuotaManager rlmCopyQuotaManager;
private final RLMQuotaManager rlmFetchQuotaManager;
private final Sensor fetchThrottleTimeSensor;
private final RemoteIndexCache indexCache;
private final RemoteStorageThreadPool remoteStorageReaderThreadPool;
@ -229,6 +233,9 @@ public class RemoteLogManager implements Closeable {
rlmCopyQuotaManager = createRLMCopyQuotaManager();
rlmFetchQuotaManager = createRLMFetchQuotaManager();
fetchThrottleTimeSensor = new RLMQuotaMetrics(metrics, "remote-fetch-throttle-time", RemoteLogManager.class.getSimpleName(),
"The %s time in millis remote fetches was throttled by a broker", INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS).sensor();
indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir);
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
@ -286,8 +293,12 @@ public class RemoteLogManager implements Closeable {
"Tracking fetch byte-rate for Remote Log Manager", time);
}
public boolean isRemoteLogFetchQuotaExceeded() {
return rlmFetchQuotaManager.isQuotaExceeded();
public long getFetchThrottleTimeMs() {
return rlmFetchQuotaManager.getThrottleTimeMs();
}
public Sensor fetchThrottleTimeSensor() {
return fetchThrottleTimeSensor;
}
static RLMQuotaManagerConfig copyQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) {
@ -804,7 +815,7 @@ public class RemoteLogManager implements Closeable {
copyQuotaManagerLock.lock();
try {
while (rlmCopyQuotaManager.isQuotaExceeded()) {
while (rlmCopyQuotaManager.getThrottleTimeMs() > 0) {
logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available.");
// If the thread gets interrupted while waiting, the InterruptedException is thrown
// back to the caller. It's important to note that the task being executed is already

View File

@ -18,6 +18,7 @@ package kafka.log.remote.quota;
import kafka.server.QuotaType;
import kafka.server.SensorAccess;
import kafka.utils.QuotaUtils;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
@ -80,16 +81,16 @@ public class RLMQuotaManager {
}
}
public boolean isQuotaExceeded() {
public long getThrottleTimeMs() {
Sensor sensorInstance = sensor();
try {
sensorInstance.checkQuotas();
} catch (QuotaViolationException qve) {
LOGGER.debug("Quota violated for sensor ({}), metric: ({}), metric-value: ({}), bound: ({})",
sensorInstance.name(), qve.metric().metricName(), qve.value(), qve.bound());
return true;
return QuotaUtils.throttleTime(qve, time.milliseconds());
}
return false;
return 0L;
}
public void record(double value) {

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log.remote.quota;
import kafka.server.SensorAccess;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import scala.runtime.BoxedUnit;
public class RLMQuotaMetrics {
private final Sensor sensor;
public RLMQuotaMetrics(Metrics metrics, String name, String group, String descriptionFormat, long expirationTime) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
SensorAccess sensorAccess = new SensorAccess(lock, metrics);
this.sensor = sensorAccess.getOrCreate(name, expirationTime, s -> {
s.add(metrics.metricName(name + "-avg", group, String.format(descriptionFormat, "average")), new Avg());
s.add(metrics.metricName(name + "-max", group, String.format(descriptionFormat, "maximum")), new Max());
return BoxedUnit.UNIT;
});
}
public Sensor sensor() {
return sensor;
}
}

View File

@ -1762,7 +1762,11 @@ class ReplicaManager(val config: KafkaConfig,
createLogReadResult(highWatermark, leaderLogStartOffset, leaderLogEndOffset,
new OffsetMovedToTieredStorageException("Given offset" + offset + " is moved to tiered storage"))
} else {
val fetchDataInfo = if (remoteLogManager.get.isRemoteLogFetchQuotaExceeded) {
val throttleTimeMs = remoteLogManager.get.getFetchThrottleTimeMs()
val fetchDataInfo = if (throttleTimeMs > 0) {
// Record the throttle time for the remote log fetches
remoteLogManager.get.fetchThrottleTimeSensor().record(throttleTimeMs, time.milliseconds())
// We do not want to send an exception in a LogReadResult response (like we do in other cases when we send
// UnknownOffsetMetadata), because it is classified as an error in reading the data, and a response is
// immediately sent back to the client. Instead, we want to serve data for the other topic partitions of the

View File

@ -2870,7 +2870,7 @@ public class RemoteLogManagerTest {
assertTimeoutPreemptively(Duration.ofMillis(100), () -> task.copyLogSegmentsToRemote(mockLog));
// Verify quota check was performed
verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded();
verify(rlmCopyQuotaManager, times(1)).getThrottleTimeMs();
// Verify bytes to copy was recorded with the quota manager
verify(rlmCopyQuotaManager, times(1)).record(10);
@ -2893,7 +2893,7 @@ public class RemoteLogManagerTest {
Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds);
// Ensure the copy operation is waiting for quota to be available
TestUtils.waitForCondition(() -> {
verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded();
verify(rlmCopyQuotaManager, atLeast(1)).getThrottleTimeMs();
return true;
}, "Quota exceeded check did not happen");
// Verify RLM is able to shut down
@ -2958,7 +2958,7 @@ public class RemoteLogManagerTest {
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty());
when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaExceeded ? 1000L : 0L);
doNothing().when(rlmCopyQuotaManager).record(anyInt());
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
@ -3029,8 +3029,8 @@ public class RemoteLogManagerTest {
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty());
// After the first call, isQuotaExceeded should return true
when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(false, true);
// After the first call, getThrottleTimeMs should return non-zero throttle time
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(0L, 1000L);
doNothing().when(rlmCopyQuotaManager).record(anyInt());
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);

View File

@ -47,19 +47,19 @@ public class RLMQuotaManagerTest {
RLMQuotaManager quotaManager = new RLMQuotaManager(
new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, DESCRIPTION, time);
assertFalse(quotaManager.isQuotaExceeded());
assertEquals(0L, quotaManager.getThrottleTimeMs());
quotaManager.record(500);
// Move clock by 1 sec, quota is violated
moveClock(1);
assertTrue(quotaManager.isQuotaExceeded());
assertEquals(9_000L, quotaManager.getThrottleTimeMs());
// Move clock by another 8 secs, quota is still violated for the window
moveClock(8);
assertTrue(quotaManager.isQuotaExceeded());
assertEquals(1_000L, quotaManager.getThrottleTimeMs());
// Move clock by 1 sec, quota is no more violated
moveClock(1);
assertFalse(quotaManager.isQuotaExceeded());
assertEquals(0L, quotaManager.getThrottleTimeMs());
}
@Test
@ -67,9 +67,9 @@ public class RLMQuotaManagerTest {
RLMQuotaManager quotaManager = new RLMQuotaManager(
new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, DESCRIPTION, time);
assertFalse(quotaManager.isQuotaExceeded());
assertFalse(quotaManager.getThrottleTimeMs() > 0);
quotaManager.record(51);
assertTrue(quotaManager.isQuotaExceeded());
assertTrue(quotaManager.getThrottleTimeMs() > 0);
Map<MetricName, KafkaMetric> fetchQuotaMetrics = metrics.metrics().entrySet().stream()
.filter(entry -> entry.getKey().name().equals("byte-rate") && entry.getKey().group().equals(QUOTA_TYPE.toString()))
@ -88,7 +88,7 @@ public class RLMQuotaManagerTest {
// Update quota to 60, quota is no more violated
Quota quota60Bytes = new Quota(60, true);
quotaManager.updateQuota(quota60Bytes);
assertFalse(quotaManager.isQuotaExceeded());
assertFalse(quotaManager.getThrottleTimeMs() > 0);
// Verify quota metrics were updated
Map<MetricName, MetricConfig> configForQuotaMetricsAfterFirstUpdate = extractMetricConfig(fetchQuotaMetrics);
@ -100,7 +100,7 @@ public class RLMQuotaManagerTest {
// Update quota to 40, quota is violated again
Quota quota40Bytes = new Quota(40, true);
quotaManager.updateQuota(quota40Bytes);
assertTrue(quotaManager.isQuotaExceeded());
assertTrue(quotaManager.getThrottleTimeMs() > 0);
// Verify quota metrics were updated
assertNotEquals(configForQuotaMetricsAfterFirstUpdate, extractMetricConfig(fetchQuotaMetrics));

View File

@ -23,6 +23,8 @@ import kafka.cluster.PartitionTest.MockPartitionListener
import kafka.cluster.{BrokerEndPoint, Partition}
import kafka.log._
import kafka.log.remote.RemoteLogManager
import kafka.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS
import kafka.log.remote.quota.RLMQuotaMetrics
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile}
import kafka.server.epoch.util.MockBlockingSender
@ -116,6 +118,8 @@ class ReplicaManagerTest {
private var addPartitionsToTxnManager: AddPartitionsToTxnManager = _
private var brokerTopicStats: BrokerTopicStats = _
private val transactionSupportedOperation = genericError
private val quotaExceededThrottleTime = 1000
private val quotaAvailableThrottleTime = 0
// Constants defined for readability
private val zkVersion = 0
@ -133,6 +137,13 @@ class ReplicaManagerTest {
alterPartitionManager = mock(classOf[AlterPartitionManager])
quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
mockRemoteLogManager = mock(classOf[RemoteLogManager])
when(mockRemoteLogManager.fetchThrottleTimeSensor()).thenReturn(
new RLMQuotaMetrics(metrics,
"remote-fetch-throttle-time",
classOf[RemoteLogManager].getSimpleName,
"The %s time in millis remote fetches was throttled by a broker",
INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS)
.sensor())
addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
// Anytime we try to verify, just automatically run the callback as though the transaction was verified.
@ -3357,7 +3368,8 @@ class ReplicaManagerTest {
defaultTopicRemoteLogStorageEnable: Boolean = true,
setupLogDirMetaProperties: Boolean = false,
directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP,
buildRemoteLogAuxState: Boolean = false
buildRemoteLogAuxState: Boolean = false,
remoteFetchQuotaExceeded: Option[Boolean] = None
): ReplicaManager = {
val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
@ -3411,6 +3423,15 @@ class ReplicaManagerTest {
when(metadataCache.contains(new TopicPartition(topic, 0))).thenReturn(true)
if (remoteFetchQuotaExceeded.isDefined) {
assertFalse(remoteLogManager.isDefined)
if (remoteFetchQuotaExceeded.get) {
when(mockRemoteLogManager.getFetchThrottleTimeMs()).thenReturn(quotaExceededThrottleTime)
} else {
when(mockRemoteLogManager.getFetchThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime)
}
}
// Transactional appends attempt to schedule to the request handler thread using a non request handler thread. Set this to avoid error.
KafkaRequestHandler.setBypassThreadCheck(true)
@ -3967,7 +3988,7 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0)
// create a replicaManager with remoteLog enabled
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteFetchQuotaExceeded = Some(false))
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
@ -4022,7 +4043,7 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0)
// create a replicaManager with remoteLog enabled
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog= true)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog= true, remoteFetchQuotaExceeded = Some(false))
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
@ -6713,7 +6734,7 @@ class ReplicaManagerTest {
@Test
def testRemoteReadQuotaExceeded(): Unit = {
when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(true)
when(mockRemoteLogManager.getFetchThrottleTimeMs).thenReturn(quotaExceededThrottleTime)
val tp0 = new TopicPartition(topic, 0)
val tpId0 = new TopicIdPartition(topicId, tp0)
@ -6727,11 +6748,17 @@ class ReplicaManagerTest {
assertFalse(fetchInfo.firstEntryIncomplete)
assertFalse(fetchInfo.abortedTransactions.isPresent)
assertFalse(fetchInfo.delayedRemoteStorageFetch.isPresent)
val allMetrics = metrics.metrics()
val avgMetric = allMetrics.get(metrics.metricName("remote-fetch-throttle-time-avg", "RemoteLogManager"))
val maxMetric = allMetrics.get(metrics.metricName("remote-fetch-throttle-time-max", "RemoteLogManager"))
assertEquals(quotaExceededThrottleTime, avgMetric.metricValue.asInstanceOf[Double].toLong)
assertEquals(quotaExceededThrottleTime, maxMetric.metricValue.asInstanceOf[Double].toLong)
}
@Test
def testRemoteReadQuotaNotExceeded(): Unit = {
when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(false)
when(mockRemoteLogManager.getFetchThrottleTimeMs).thenReturn(quotaAvailableThrottleTime)
val tp0 = new TopicPartition(topic, 0)
val tpId0 = new TopicIdPartition(topicId, tp0)
@ -6745,6 +6772,12 @@ class ReplicaManagerTest {
assertEquals(-1, fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
assertEquals(MemoryRecords.EMPTY, fetchInfo.records)
assertTrue(fetchInfo.delayedRemoteStorageFetch.isPresent)
val allMetrics = metrics.metrics()
val avgMetric = allMetrics.get(metrics.metricName("remote-fetch-throttle-time-avg", "RemoteLogManager"))
val maxMetric = allMetrics.get(metrics.metricName("remote-fetch-throttle-time-max", "RemoteLogManager"))
assertEquals(Double.NaN, avgMetric.metricValue)
assertEquals(Double.NaN, maxMetric.metricValue)
}
private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): Seq[(TopicIdPartition, LogReadResult)] = {