KAFKA-17153 KafkaMetricsGroup#newGauge should accept functional interface instead of `com.yammer.metrics.core.Gague` (#16618)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kuan-Po Tseng 2024-07-21 18:17:04 +08:00 committed by GitHub
parent 28f6fbdd55
commit e9a8c3c455
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 25 additions and 40 deletions

View File

@ -82,7 +82,6 @@ import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool;
import org.apache.kafka.storage.internals.log.TransactionIndex;
import org.apache.kafka.storage.internals.log.TxnIndexSearchResult;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Timer;
import org.slf4j.Logger;
@ -253,12 +252,7 @@ public class RemoteLogManager implements Closeable {
followerThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
"RLMFollowerScheduledThreadPool", "kafka-rlm-follower-thread-pool-");
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC, new Gauge<Double>() {
@Override
public Double value() {
return rlmCopyThreadPool.getIdlePercent();
}
});
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC, rlmCopyThreadPool::getIdlePercent);
remoteReadTimer = metricsGroup.newTimer(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC,
TimeUnit.MILLISECONDS, TimeUnit.SECONDS);

View File

@ -24,6 +24,7 @@ import java.util.{Optional, OptionalInt}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Supplier
import com.yammer.metrics.core.Gauge
import kafka.common.OffsetAndMetadata
import kafka.coordinator.group.GroupMetadataManager.maybeConvertOffsetCommitError
@ -126,9 +127,9 @@ class GroupMetadataManager(brokerId: Int,
this.logIdent = s"[GroupMetadataManager brokerId=$brokerId] "
private def recreateGauge[T](name: String, gauge: Gauge[T]): Gauge[T] = {
private def recreateGauge[T](name: String, metric: Supplier[T]): Gauge[T] = {
metricsGroup.removeMetric(name)
metricsGroup.newGauge(name, gauge)
metricsGroup.newGauge(name, metric)
}
recreateGauge("NumOffsets",

View File

@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class KafkaMetricsGroup {
@ -70,16 +71,21 @@ public class KafkaMetricsGroup {
return new MetricName(group, typeName, name, scope, nameBuilder.toString());
}
public final <T> Gauge<T> newGauge(String name, Gauge<T> metric, Map<String, String> tags) {
return KafkaYammerMetrics.defaultRegistry().newGauge(metricName(name, tags), metric);
public <T> Gauge<T> newGauge(String name, Supplier<T> metric, Map<String, String> tags) {
return newGauge(metricName(name, tags), metric);
}
public final <T> Gauge<T> newGauge(String name, Gauge<T> metric) {
public <T> Gauge<T> newGauge(String name, Supplier<T> metric) {
return newGauge(name, metric, Collections.emptyMap());
}
public final <T> Gauge<T> newGauge(MetricName metricName, Gauge<T> metric) {
return KafkaYammerMetrics.defaultRegistry().newGauge(metricName, metric);
public <T> Gauge<T> newGauge(MetricName name, Supplier<T> metric) {
return KafkaYammerMetrics.defaultRegistry().newGauge(name, new Gauge<T>() {
@Override
public T value() {
return metric.get();
}
});
}
public final Meter newMeter(String name, String eventType,

View File

@ -36,8 +36,6 @@ import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import com.yammer.metrics.core.Gauge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -106,16 +104,7 @@ public class AssignmentsManager {
"broker-" + brokerId + "-directory-assignments-manager-",
new ShutdownEvent());
channelManager.start();
this.metricsGroup.newGauge(QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME, new Gauge<Integer>() {
@Override
public Integer value() {
return getMapSize(inflight) + getMapSize(pending);
}
private int getMapSize(Map<TopicIdPartition, AssignmentEvent> map) {
return map == null ? 0 : map.size();
}
});
this.metricsGroup.newGauge(QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME, () -> getMapSize(inflight) + getMapSize(pending));
if (dirIdToPath == null) dirIdToPath = id -> Optional.empty();
this.dirIdToPath = dirIdToPath;
if (topicIdToName == null) topicIdToName = id -> Optional.empty();
@ -492,4 +481,8 @@ public class AssignmentsManager {
.setBrokerEpoch(brokerEpoch)
.setDirectories(new ArrayList<>(directoryMap.values()));
}
private static int getMapSize(Map<TopicIdPartition, AssignmentEvent> map) {
return map == null ? 0 : map.size();
}
}

View File

@ -21,8 +21,6 @@ import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import com.yammer.metrics.core.Gauge;
import org.slf4j.Logger;
import java.util.concurrent.LinkedBlockingQueue;
@ -39,6 +37,7 @@ public class RemoteStorageThreadPool extends ThreadPoolExecutor {
private final Logger logger;
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass());
@SuppressWarnings("this-escape")
public RemoteStorageThreadPool(String threadNamePrefix,
int numThreads,
int maxPendingTasks) {
@ -51,18 +50,10 @@ public class RemoteStorageThreadPool extends ThreadPoolExecutor {
}
}.logger(RemoteStorageThreadPool.class);
metricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(), new Gauge<Integer>() {
@Override
public Integer value() {
return RemoteStorageThreadPool.this.getQueue().size();
}
});
metricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(), new Gauge<Double>() {
@Override
public Double value() {
return 1 - (double) RemoteStorageThreadPool.this.getActiveCount() / (double) RemoteStorageThreadPool.this.getCorePoolSize();
}
});
metricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
() -> getQueue().size());
metricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
() -> 1 - (double) getActiveCount() / (double) getCorePoolSize());
}
@Override