MINOR: Move Throttler to storage module (#16023)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2024-05-22 18:47:31 +02:00 committed by GitHub
parent a98c9be6b0
commit e4e1116156
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 226 additions and 238 deletions

View File

@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
import kafka.common._
import kafka.log.LogCleaner.{CleanerRecopyPercentMetricName, DeadThreadCountMetricName, MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
import kafka.server.{BrokerReconfigurable, KafkaConfig}
import kafka.utils._
import kafka.utils.{Logging, Pool}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException}
@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, TransactionIndex}
import org.apache.kafka.storage.internals.utils.Throttler
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
@ -109,12 +110,7 @@ class LogCleaner(initialConfig: CleanerConfig,
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs, logDirFailureChannel)
/* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */
private[log] val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond,
checkIntervalMs = 300,
throttleDown = true,
"cleaner-io",
"bytes",
time = time)
private[log] val throttler = new Throttler(config.maxIoBytesPerSecond, 300, "cleaner-io", "bytes", time)
private[log] val cleaners = mutable.ArrayBuffer[CleanerThread]()

View File

@ -1,111 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import java.util.concurrent.TimeUnit
import java.util.Random
import scala.math._
/**
* A class to measure and throttle the rate of some process. The throttler takes a desired rate-per-second
* (the units of the process don't matter, it could be bytes or a count of some other thing), and will sleep for
* an appropriate amount of time when maybeThrottle() is called to attain the desired rate.
*
* @param desiredRatePerSec: The rate we want to hit in units/sec
* @param checkIntervalMs: The interval at which to check our rate
* @param throttleDown: Does throttling increase or decrease our rate?
* @param time: The time implementation to use
*/
@threadsafe
class Throttler(@volatile var desiredRatePerSec: Double,
checkIntervalMs: Long = 100L,
throttleDown: Boolean = true,
metricName: String = "throttler",
units: String = "entries",
time: Time = Time.SYSTEM) extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
private val lock = new Object
private val meter = metricsGroup.newMeter(metricName, units, TimeUnit.SECONDS)
private val checkIntervalNs = TimeUnit.MILLISECONDS.toNanos(checkIntervalMs)
private var periodStartNs: Long = time.nanoseconds
private var observedSoFar: Double = 0.0
def maybeThrottle(observed: Double): Unit = {
val msPerSec = TimeUnit.SECONDS.toMillis(1)
val nsPerSec = TimeUnit.SECONDS.toNanos(1)
val currentDesiredRatePerSec = desiredRatePerSec
meter.mark(observed.toLong)
lock synchronized {
observedSoFar += observed
val now = time.nanoseconds
val elapsedNs = now - periodStartNs
// if we have completed an interval AND we have observed something, maybe
// we should take a little nap
if (elapsedNs > checkIntervalNs && observedSoFar > 0) {
val rateInSecs = (observedSoFar * nsPerSec) / elapsedNs
val needAdjustment = !(throttleDown ^ (rateInSecs > currentDesiredRatePerSec))
if (needAdjustment) {
// solve for the amount of time to sleep to make us hit the desired rate
val desiredRateMs = currentDesiredRatePerSec / msPerSec.toDouble
val elapsedMs = TimeUnit.NANOSECONDS.toMillis(elapsedNs)
val sleepTime = round(observedSoFar / desiredRateMs - elapsedMs)
if (sleepTime > 0) {
trace("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, currentDesiredRatePerSec, sleepTime))
time.sleep(sleepTime)
}
}
periodStartNs = time.nanoseconds()
observedSoFar = 0
}
}
}
def updateDesiredRatePerSec(updatedDesiredRatePerSec: Double): Unit = {
desiredRatePerSec = updatedDesiredRatePerSec
}
}
object Throttler {
def main(args: Array[String]): Unit = {
val rand = new Random()
val throttler = new Throttler(100000, 100, true, time = Time.SYSTEM)
val interval = 30000
var start = System.currentTimeMillis
var total = 0
while (true) {
val value = rand.nextInt(1000)
Thread.sleep(1)
throttler.maybeThrottle(value)
total += value
val now = System.currentTimeMillis
if (now - start >= interval) {
println(total / (interval/1000.0))
start = now
total = 0
}
}
}
}

View File

@ -19,7 +19,7 @@ package kafka.log
import kafka.common._
import kafka.server.{BrokerTopicStats, KafkaConfig}
import kafka.utils._
import kafka.utils.{CoreUtils, Logging, Pool, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
@ -30,6 +30,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.utils.Throttler
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.ArgumentMatchers
@ -59,7 +60,7 @@ class LogCleanerTest extends Logging {
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
val logConfig = new LogConfig(logProps)
val time = new MockTime()
val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
val throttler = new Throttler(Double.MaxValue, Long.MaxValue, "throttler", "entries", time)
val tombstoneRetentionMs = 86400000
val largeTimestamp = Long.MaxValue - tombstoneRetentionMs - 1
val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false)

View File

@ -20,7 +20,7 @@ package kafka.log
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
import kafka.server.{BrokerTopicStats, KafkaConfig}
import kafka.utils._
import kafka.utils.TestUtils
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid}
@ -39,6 +39,7 @@ import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard}
import org.apache.kafka.storage.internals.utils.Throttler
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@ -1046,7 +1047,7 @@ class UnifiedLogTest {
ioBufferSize = 64 * 1024,
maxIoBufferSize = 64 * 1024,
dupBufferLoadFactor = 0.75,
throttler = new Throttler(Double.MaxValue, Long.MaxValue, false, time = mockTime),
throttler = new Throttler(Double.MaxValue, Long.MaxValue, "throttler", "entries", mockTime),
time = mockTime,
checkDone = _ => {})

View File

@ -1,108 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import org.apache.kafka.server.util.MockTime
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions.{assertTrue, assertEquals}
class ThrottlerTest {
@Test
def testThrottleDesiredRate(): Unit = {
val throttleCheckIntervalMs = 100
val desiredCountPerSec = 1000.0
val desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0
val mockTime = new MockTime()
val throttler = new Throttler(desiredRatePerSec = desiredCountPerSec,
checkIntervalMs = throttleCheckIntervalMs,
time = mockTime)
// Observe desiredCountPerInterval at t1
val t1 = mockTime.milliseconds()
throttler.maybeThrottle(desiredCountPerInterval)
assertEquals(t1, mockTime.milliseconds())
// Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1,
mockTime.sleep(throttleCheckIntervalMs + 1)
throttler.maybeThrottle(desiredCountPerInterval)
val t2 = mockTime.milliseconds()
assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs)
// Observe desiredCountPerInterval at t2
throttler.maybeThrottle(desiredCountPerInterval)
assertEquals(t2, mockTime.milliseconds())
// Observe desiredCountPerInterval at t2 + throttleCheckIntervalMs + 1
mockTime.sleep(throttleCheckIntervalMs + 1)
throttler.maybeThrottle(desiredCountPerInterval)
val t3 = mockTime.milliseconds()
assertTrue(t3 >= t2 + 2 * throttleCheckIntervalMs)
val elapsedTimeMs = t3 - t1
val actualCountPerSec = 4 * desiredCountPerInterval * 1000 / elapsedTimeMs
assertTrue(actualCountPerSec <= desiredCountPerSec)
}
@Test
def testUpdateThrottleDesiredRate(): Unit = {
val throttleCheckIntervalMs = 100
val desiredCountPerSec = 1000.0
val desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0
val updatedDesiredCountPerSec = 1500.0
val updatedDesiredCountPerInterval = updatedDesiredCountPerSec * throttleCheckIntervalMs / 1000.0
val mockTime = new MockTime()
val throttler = new Throttler(desiredRatePerSec = desiredCountPerSec,
checkIntervalMs = throttleCheckIntervalMs,
time = mockTime)
// Observe desiredCountPerInterval at t1
val t1 = mockTime.milliseconds()
throttler.maybeThrottle(desiredCountPerInterval)
assertEquals(t1, mockTime.milliseconds())
// Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1,
mockTime.sleep(throttleCheckIntervalMs + 1)
throttler.maybeThrottle(desiredCountPerInterval)
val t2 = mockTime.milliseconds()
assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs)
val elapsedTimeMs = t2 - t1
val actualCountPerSec = 2 * desiredCountPerInterval * 1000 / elapsedTimeMs
assertTrue(actualCountPerSec <= desiredCountPerSec)
// Update ThrottleDesiredRate
throttler.updateDesiredRatePerSec(updatedDesiredCountPerSec)
// Observe updatedDesiredCountPerInterval at t2
throttler.maybeThrottle(updatedDesiredCountPerInterval)
assertEquals(t2, mockTime.milliseconds())
// Observe updatedDesiredCountPerInterval at t2 + throttleCheckIntervalMs + 1
mockTime.sleep(throttleCheckIntervalMs + 1)
throttler.maybeThrottle(updatedDesiredCountPerInterval)
val t3 = mockTime.milliseconds()
assertTrue(t3 >= t2 + 2 * throttleCheckIntervalMs)
val updatedElapsedTimeMs = t3 - t2
val updatedActualCountPerSec = 2 * updatedDesiredCountPerInterval * 1000 / updatedElapsedTimeMs
assertTrue(updatedActualCountPerSec <= updatedDesiredCountPerSec)
}
}

View File

@ -490,14 +490,6 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH"/>
</Match>
<Match>
<!-- Suppress a warning about an intentional infinite loop. -->
<Package name="kafka.utils"/>
<Source name="Throttler.scala"/>
<Method name="main"/>
<Bug pattern="IL_INFINITE_LOOP"/>
</Match>
<Match>
<!-- Suppress a spurious warning about calling notify without modifying
other state under the monitor. -->

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.utils;
import com.yammer.metrics.core.Meter;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
* A class to measure and throttle the rate of some process.
*/
public class Throttler {
private static final Logger LOG = LoggerFactory.getLogger(Throttler.class);
private static final long MS_PER_SEC = TimeUnit.SECONDS.toMillis(1);
private static final long NS_PER_SEC = TimeUnit.SECONDS.toNanos(1);
private final Object lock = new Object();
private final long checkIntervalNs;
private final Meter meter;
private final Time time;
private volatile double desiredRatePerSec;
private long periodStartNs;
private double observedSoFar;
/**
* The throttler takes a desired rate-per-second (the units of the process don't matter, it could be bytes
* or a count of some other thing), and will sleep for an appropriate amount of time when maybeThrottle()
* is called to attain the desired rate.
*
* @param desiredRatePerSec The rate we want to hit in units/sec
* @param checkIntervalMs The interval at which to check our rate
* @param metricName The name of the metric
* @param units The name of the unit
* @param time The time implementation to use
*/
public Throttler(double desiredRatePerSec,
long checkIntervalMs,
String metricName,
String units,
Time time) {
this.desiredRatePerSec = desiredRatePerSec;
this.checkIntervalNs = TimeUnit.MILLISECONDS.toNanos(checkIntervalMs);
this.meter = new KafkaMetricsGroup(Throttler.class).newMeter(metricName, units, TimeUnit.SECONDS);
this.time = time;
this.periodStartNs = time.nanoseconds();
}
public void updateDesiredRatePerSec(double updatedDesiredRatePerSec) {
desiredRatePerSec = updatedDesiredRatePerSec;
}
public double desiredRatePerSec() {
return desiredRatePerSec;
}
public void maybeThrottle(double observed) {
double currentDesiredRatePerSec = desiredRatePerSec;
meter.mark((long) observed);
synchronized (lock) {
observedSoFar += observed;
long now = time.nanoseconds();
long elapsedNs = now - periodStartNs;
// if we have completed an interval AND we have observed something, maybe
// we should take a little nap
if (elapsedNs > checkIntervalNs && observedSoFar > 0) {
double rateInSecs = (observedSoFar * NS_PER_SEC) / elapsedNs;
if (rateInSecs > currentDesiredRatePerSec) {
// solve for the amount of time to sleep to make us hit the desired rate
double desiredRateMs = currentDesiredRatePerSec / MS_PER_SEC;
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(elapsedNs);
long sleepTime = Math.round(observedSoFar / desiredRateMs - elapsedMs);
if (sleepTime > 0) {
LOG.trace("Natural rate is {} per second but desired rate is {}, sleeping for {} ms to compensate.", rateInSecs, currentDesiredRatePerSec, sleepTime);
time.sleep(sleepTime);
}
}
periodStartNs = time.nanoseconds();
observedSoFar = 0.0;
}
}
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.utils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.MockTime;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
class ThrottlerTest {
@Test
public void testThrottleDesiredRate() {
long throttleCheckIntervalMs = 100L;
double desiredCountPerSec = 1000.0;
double desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0;
Time mockTime = new MockTime();
Throttler throttler = new Throttler(desiredCountPerSec,
throttleCheckIntervalMs,
"throttler",
"entries",
mockTime);
// Observe desiredCountPerInterval at t1
long t1 = mockTime.milliseconds();
throttler.maybeThrottle(desiredCountPerInterval);
assertEquals(t1, mockTime.milliseconds());
// Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1,
mockTime.sleep(throttleCheckIntervalMs + 1);
throttler.maybeThrottle(desiredCountPerInterval);
long t2 = mockTime.milliseconds();
assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs);
// Observe desiredCountPerInterval at t2
throttler.maybeThrottle(desiredCountPerInterval);
assertEquals(t2, mockTime.milliseconds());
// Observe desiredCountPerInterval at t2 + throttleCheckIntervalMs + 1
mockTime.sleep(throttleCheckIntervalMs + 1);
throttler.maybeThrottle(desiredCountPerInterval);
long t3 = mockTime.milliseconds();
assertTrue(t3 >= t2 + 2 * throttleCheckIntervalMs);
long elapsedTimeMs = t3 - t1;
double actualCountPerSec = 4 * desiredCountPerInterval * 1000 / elapsedTimeMs;
assertTrue(actualCountPerSec <= desiredCountPerSec);
}
@Test
public void testUpdateThrottleDesiredRate() {
long throttleCheckIntervalMs = 100L;
double desiredCountPerSec = 1000.0;
double desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0;
double updatedDesiredCountPerSec = 1500.0;
double updatedDesiredCountPerInterval = updatedDesiredCountPerSec * throttleCheckIntervalMs / 1000.0;
Time mockTime = new MockTime();
Throttler throttler = new Throttler(desiredCountPerSec,
throttleCheckIntervalMs,
"throttler",
"entries",
mockTime);
// Observe desiredCountPerInterval at t1
long t1 = mockTime.milliseconds();
throttler.maybeThrottle(desiredCountPerInterval);
assertEquals(t1, mockTime.milliseconds());
// Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1,
mockTime.sleep(throttleCheckIntervalMs + 1);
throttler.maybeThrottle(desiredCountPerInterval);
long t2 = mockTime.milliseconds();
assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs);
long elapsedTimeMs = t2 - t1;
double actualCountPerSec = 2 * desiredCountPerInterval * 1000 / elapsedTimeMs;
assertTrue(actualCountPerSec <= desiredCountPerSec);
// Update ThrottleDesiredRate
throttler.updateDesiredRatePerSec(updatedDesiredCountPerSec);
// Observe updatedDesiredCountPerInterval at t2
throttler.maybeThrottle(updatedDesiredCountPerInterval);
assertEquals(t2, mockTime.milliseconds());
// Observe updatedDesiredCountPerInterval at t2 + throttleCheckIntervalMs + 1
mockTime.sleep(throttleCheckIntervalMs + 1);
throttler.maybeThrottle(updatedDesiredCountPerInterval);
long t3 = mockTime.milliseconds();
assertTrue(t3 >= t2 + 2 * throttleCheckIntervalMs);
long updatedElapsedTimeMs = t3 - t2;
double updatedActualCountPerSec = 2 * updatedDesiredCountPerInterval * 1000 / updatedElapsedTimeMs;
assertTrue(updatedActualCountPerSec <= updatedDesiredCountPerSec);
}
}