mirror of https://github.com/apache/kafka.git
KAFKA-17320 Move SensorAccess to server-common module (#16864)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
cb835d0d6d
commit
6836fa259e
|
@ -16,8 +16,6 @@
|
|||
*/
|
||||
package kafka.log.remote.quota;
|
||||
|
||||
import kafka.server.SensorAccess;
|
||||
|
||||
import org.apache.kafka.common.MetricName;
|
||||
import org.apache.kafka.common.metrics.KafkaMetric;
|
||||
import org.apache.kafka.common.metrics.MetricConfig;
|
||||
|
@ -29,6 +27,7 @@ import org.apache.kafka.common.metrics.stats.SimpleRate;
|
|||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.server.quota.QuotaType;
|
||||
import org.apache.kafka.server.quota.QuotaUtils;
|
||||
import org.apache.kafka.server.quota.SensorAccess;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -38,8 +37,6 @@ import java.util.Map;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import scala.runtime.BoxedUnit;
|
||||
|
||||
public class RLMQuotaManager {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(RLMQuotaManager.class);
|
||||
|
||||
|
@ -112,10 +109,7 @@ public class RLMQuotaManager {
|
|||
return sensorAccess.getOrCreate(
|
||||
quotaType.toString(),
|
||||
RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS,
|
||||
sensor -> {
|
||||
sensor.add(metricName(), new SimpleRate(), getQuotaMetricConfig(quota));
|
||||
return BoxedUnit.UNIT;
|
||||
}
|
||||
sensor -> sensor.add(metricName(), new SimpleRate(), getQuotaMetricConfig(quota))
|
||||
);
|
||||
}
|
||||
}
|
|
@ -16,17 +16,14 @@
|
|||
*/
|
||||
package kafka.log.remote.quota;
|
||||
|
||||
import kafka.server.SensorAccess;
|
||||
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.common.metrics.stats.Avg;
|
||||
import org.apache.kafka.common.metrics.stats.Max;
|
||||
import org.apache.kafka.server.quota.SensorAccess;
|
||||
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import scala.runtime.BoxedUnit;
|
||||
|
||||
public class RLMQuotaMetrics {
|
||||
|
||||
private final Sensor sensor;
|
||||
|
@ -37,7 +34,6 @@ public class RLMQuotaMetrics {
|
|||
this.sensor = sensorAccess.getOrCreate(name, expirationTime, s -> {
|
||||
s.add(metrics.metricName(name + "-avg", group, String.format(descriptionFormat, "average")), new Avg());
|
||||
s.add(metrics.metricName(name + "-max", group, String.format(descriptionFormat, "maximum")), new Max());
|
||||
return BoxedUnit.UNIT;
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ package kafka.server
|
|||
import java.{lang, util}
|
||||
import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock
|
||||
import java.util.function.Consumer
|
||||
import kafka.network.RequestChannel
|
||||
import kafka.server.ClientQuotaManager._
|
||||
import kafka.utils.Logging
|
||||
|
@ -29,7 +30,7 @@ import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate}
|
|||
import org.apache.kafka.common.security.auth.KafkaPrincipal
|
||||
import org.apache.kafka.common.utils.{Sanitizer, Time}
|
||||
import org.apache.kafka.server.config.{ClientQuotaManagerConfig, ZooKeeperInternals}
|
||||
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType, QuotaType, QuotaUtils, ThrottleCallback, ThrottledChannel}
|
||||
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType, QuotaType, QuotaUtils, SensorAccess, ThrottleCallback, ThrottledChannel}
|
||||
import org.apache.kafka.server.util.ShutdownableThread
|
||||
import org.apache.kafka.network.Session
|
||||
|
||||
|
@ -350,7 +351,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
|
|||
sensorAccessor.getOrCreate(
|
||||
getQuotaSensorName(metricTags),
|
||||
ClientQuotaManager.InactiveSensorExpirationTimeSeconds,
|
||||
registerQuotaMetrics(metricTags)
|
||||
sensor => registerQuotaMetrics(metricTags)(sensor)
|
||||
),
|
||||
sensorAccessor.getOrCreate(
|
||||
getThrottleTimeSensorName(metricTags),
|
||||
|
@ -391,7 +392,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
|
|||
.quota(new Quota(quotaLimit, true))
|
||||
}
|
||||
|
||||
protected def getOrCreateSensor(sensorName: String, expirationTimeSeconds: Long, registerMetrics: Sensor => Unit): Sensor = {
|
||||
protected def getOrCreateSensor(sensorName: String, expirationTimeSeconds: Long, registerMetrics: Consumer[Sensor]): Sensor = {
|
||||
sensorAccessor.getOrCreate(
|
||||
sensorName,
|
||||
expirationTimeSeconds,
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition
|
|||
import org.apache.kafka.common.metrics.stats.SimpleRate
|
||||
import org.apache.kafka.common.utils.Time
|
||||
import org.apache.kafka.server.config.ReplicationQuotaManagerConfig
|
||||
import org.apache.kafka.server.quota.QuotaType
|
||||
import org.apache.kafka.server.quota.{QuotaType, SensorAccess}
|
||||
|
||||
trait ReplicaQuota {
|
||||
def record(value: Long): Unit
|
||||
|
|
|
@ -1,71 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package kafka.server
|
||||
|
||||
import java.util.concurrent.locks.ReadWriteLock
|
||||
|
||||
import org.apache.kafka.common.metrics.{Metrics, Sensor}
|
||||
|
||||
/**
|
||||
* Class which centralises the logic for creating/accessing sensors.
|
||||
* The quota can be updated by wrapping it in the passed MetricConfig
|
||||
*
|
||||
* The later arguments are passed as methods as they are only called when the sensor is instantiated.
|
||||
*/
|
||||
class SensorAccess(lock: ReadWriteLock, metrics: Metrics) {
|
||||
|
||||
def getOrCreate(sensorName: String, expirationTime: Long, registerMetrics: Sensor => Unit): Sensor = {
|
||||
var sensor: Sensor = null
|
||||
|
||||
/* Acquire the read lock to fetch the sensor. It is safe to call getSensor from multiple threads.
|
||||
* The read lock allows a thread to create a sensor in isolation. The thread creating the sensor
|
||||
* will acquire the write lock and prevent the sensors from being read while they are being created.
|
||||
* It should be sufficient to simply check if the sensor is null without acquiring a read lock but the
|
||||
* sensor being present doesn't mean that it is fully initialized i.e. all the Metrics may not have been added.
|
||||
* This read lock waits until the writer thread has released its lock i.e. fully initialized the sensor
|
||||
* at which point it is safe to read
|
||||
*/
|
||||
lock.readLock().lock()
|
||||
try sensor = metrics.getSensor(sensorName)
|
||||
finally lock.readLock().unlock()
|
||||
|
||||
/* If the sensor is null, try to create it else return the existing sensor
|
||||
* The sensor can be null, hence the null checks
|
||||
*/
|
||||
if (sensor == null) {
|
||||
/* Acquire a write lock because the sensor may not have been created and we only want one thread to create it.
|
||||
* Note that multiple threads may acquire the write lock if they all see a null sensor initially
|
||||
* In this case, the writer checks the sensor after acquiring the lock again.
|
||||
* This is safe from Double Checked Locking because the references are read
|
||||
* after acquiring read locks and hence they cannot see a partially published reference
|
||||
*/
|
||||
lock.writeLock().lock()
|
||||
try {
|
||||
// Set the var for both sensors in case another thread has won the race to acquire the write lock. This will
|
||||
// ensure that we initialise `ClientSensors` with non-null parameters.
|
||||
sensor = metrics.getSensor(sensorName)
|
||||
if (sensor == null) {
|
||||
sensor = metrics.sensor(sensorName, null, expirationTime)
|
||||
registerMetrics(sensor)
|
||||
}
|
||||
} finally {
|
||||
lock.writeLock().unlock()
|
||||
}
|
||||
}
|
||||
sensor
|
||||
}
|
||||
}
|
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.server.quota;
|
||||
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* Class which centralises the logic for creating/accessing sensors.
|
||||
* The quota can be updated by wrapping it in the passed MetricConfig.
|
||||
* The later arguments are passed as methods as they are only called when the sensor is instantiated.
|
||||
*/
|
||||
public class SensorAccess {
|
||||
private final ReadWriteLock lock;
|
||||
private final Metrics metrics;
|
||||
|
||||
public SensorAccess(ReadWriteLock lock, Metrics metrics) {
|
||||
this.lock = lock;
|
||||
this.metrics = metrics;
|
||||
}
|
||||
|
||||
public Sensor getOrCreate(String sensorName, long expirationTime, Consumer<Sensor> registerMetrics) {
|
||||
Sensor sensor;
|
||||
|
||||
/* Acquire the read lock to fetch the sensor. It is safe to call getSensor from multiple threads.
|
||||
* The read lock allows a thread to create a sensor in isolation. The thread creating the sensor
|
||||
* will acquire the write lock and prevent the sensors from being read while they are being created.
|
||||
* It should be sufficient to simply check if the sensor is null without acquiring a read lock but the
|
||||
* sensor being present doesn't mean that it is fully initialized i.e. all the Metrics may not have been added.
|
||||
* This read lock waits until the writer thread has released its lock i.e. fully initialized the sensor
|
||||
* at which point it is safe to read
|
||||
*/
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
sensor = metrics.getSensor(sensorName);
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
|
||||
/* If the sensor is null, try to create it else return the existing sensor
|
||||
* The sensor can be null, hence the null checks
|
||||
*/
|
||||
if (sensor == null) {
|
||||
/* Acquire a write lock because the sensor may not have been created and we only want one thread to create it.
|
||||
* Note that multiple threads may acquire the write lock if they all see a null sensor initially
|
||||
* In this case, the writer checks the sensor after acquiring the lock again.
|
||||
* This is safe from Double-Checked Locking because the references are read
|
||||
* after acquiring read locks and hence they cannot see a partially published reference
|
||||
*/
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
// Set the var for both sensors in case another thread has won the race to acquire the write lock. This will
|
||||
// ensure that we initialise `ClientSensors` with non-null parameters.
|
||||
sensor = metrics.getSensor(sensorName);
|
||||
if (sensor == null) {
|
||||
sensor = metrics.sensor(sensorName, null, expirationTime);
|
||||
registerMetrics.accept(sensor);
|
||||
}
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
return sensor;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue