add batch flush interval time ms metric

This commit is contained in:
Jeff Kim 2025-01-21 14:38:25 -05:00
parent faff2de6a5
commit 4831bc829b
5 changed files with 49 additions and 4 deletions

View File

@ -780,6 +780,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
}
long flushStartMs = time.milliseconds();
runtimeMetrics.recordFlushIntervalTime(flushStartMs - currentBatch.appendTimeMs);
// Write the records to the log and update the last written offset.
long offset = partitionWriter.append(
tp,

View File

@ -67,6 +67,13 @@ public interface CoordinatorRuntimeMetrics extends AutoCloseable {
*/
void recordFlushTime(long durationMs);
/**
* Record the flush time.
*
* @param durationMs The flush time in milliseconds.
*/
void recordFlushIntervalTime(long durationMs);
/**
* Record the thread idle time.
* @param idleTimeMs The idle time in milliseconds.

View File

@ -65,6 +65,11 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
*/
public static final String BATCH_FLUSH_TIME_METRIC_NAME = "batch-flush-time-ms";
/**
* The flush time metric name.
*/
public static final String BATCH_FLUSH_INTERVAL_METRIC_NAME = "batch-flush-interval-time-ms";
/**
* Metric to count the number of partitions in Loading state.
*/
@ -123,6 +128,11 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
*/
private final Sensor flushTimeSensor;
/**
* Sensor to measure the flush interval time.
*/
private final Sensor flushIntervalTimeSensor;
public CoordinatorRuntimeMetricsImpl(Metrics metrics, String metricsGroup) {
this.metrics = Objects.requireNonNull(metrics);
this.metricsGroup = Objects.requireNonNull(metricsGroup);
@ -209,6 +219,15 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
);
this.flushTimeSensor = metrics.sensor("FlushTime");
this.flushTimeSensor.add(flushTimeHistogram);
KafkaMetricHistogram flushIntervalHistogram = KafkaMetricHistogram.newLatencyHistogram(
suffix -> kafkaMetricName(
BATCH_FLUSH_INTERVAL_METRIC_NAME + "-" + suffix,
"The " + suffix + " flush interval time in milliseconds"
)
);
this.flushIntervalTimeSensor = metrics.sensor("FlushIntervalTime");
this.flushIntervalTimeSensor.add(flushIntervalHistogram);
}
/**
@ -237,6 +256,7 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
metrics.removeSensor(eventProcessingTimeSensor.name());
metrics.removeSensor(eventPurgatoryTimeSensor.name());
metrics.removeSensor(flushTimeSensor.name());
metrics.removeSensor(flushIntervalTimeSensor.name());
}
/**
@ -293,7 +313,7 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
@Override
public void recordEventPurgatoryTime(long purgatoryTimeMs) {
eventPurgatoryTimeSensor.record(Math.min(MAX_LATENCY_MS, purgatoryTimeMs));
eventPurgatoryTimeSensor.record(purgatoryTimeMs);
}
@Override
@ -301,6 +321,11 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
flushTimeSensor.record(durationMs);
}
@Override
public void recordFlushIntervalTime(long durationMs) {
flushIntervalTimeSensor.record(durationMs);
}
@Override
public void recordThreadIdleTime(long idleTimeMs) {
threadIdleSensor.record(idleTimeMs);

View File

@ -67,6 +67,7 @@ public final class KafkaMetricHistogram implements CompoundStat {
*/
private final Function<String, MetricName> metricNameFactory;
private final HdrHistogram hdrHistogram;
private final long highestTrackableValue;
/**
* Creates a new histogram with the purpose of tracking latency values. As such, the histogram
@ -105,6 +106,7 @@ public final class KafkaMetricHistogram implements CompoundStat {
) {
this.metricNameFactory = metricNameFactory;
this.hdrHistogram = new HdrHistogram(highestTrackableValue, numberOfSignificantValueDigits);
this.highestTrackableValue = highestTrackableValue;
}
@Override
@ -125,6 +127,6 @@ public final class KafkaMetricHistogram implements CompoundStat {
@Override
public void record(MetricConfig config, double value, long timeMs) {
hdrHistogram.record((long) value);
hdrHistogram.record(Math.min(highestTrackableValue, (long) value));
}
}

View File

@ -31,6 +31,7 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.stream.IntStream;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_FLUSH_INTERVAL_METRIC_NAME;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_FLUSH_TIME_METRIC_NAME;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_PROCESSING_TIME_METRIC_NAME;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_PURGATORY_TIME_METRIC_NAME;
@ -76,7 +77,12 @@ public class CoordinatorRuntimeMetricsImplTest {
kafkaMetricName(metrics, "batch-flush-time-ms-p50"),
kafkaMetricName(metrics, "batch-flush-time-ms-p95"),
kafkaMetricName(metrics, "batch-flush-time-ms-p99"),
kafkaMetricName(metrics, "batch-flush-time-ms-p999")
kafkaMetricName(metrics, "batch-flush-time-ms-p999"),
kafkaMetricName(metrics, "batch-flush-interval-time-ms-max"),
kafkaMetricName(metrics, "batch-flush-interval-time-ms-p50"),
kafkaMetricName(metrics, "batch-flush-interval-time-ms-p95"),
kafkaMetricName(metrics, "batch-flush-interval-time-ms-p99"),
kafkaMetricName(metrics, "batch-flush-interval-time-ms-p999")
));
try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
@ -160,7 +166,8 @@ public class CoordinatorRuntimeMetricsImplTest {
EVENT_QUEUE_TIME_METRIC_NAME,
EVENT_PROCESSING_TIME_METRIC_NAME,
EVENT_PURGATORY_TIME_METRIC_NAME,
BATCH_FLUSH_TIME_METRIC_NAME
BATCH_FLUSH_TIME_METRIC_NAME,
BATCH_FLUSH_INTERVAL_METRIC_NAME
})
public void testHistogramMetrics(String metricNamePrefix) {
Time time = new MockTime();
@ -181,6 +188,9 @@ public class CoordinatorRuntimeMetricsImplTest {
break;
case BATCH_FLUSH_TIME_METRIC_NAME:
runtimeMetrics.recordFlushTime(i);
break;
case BATCH_FLUSH_INTERVAL_METRIC_NAME:
runtimeMetrics.recordFlushIntervalTime(i);
}
});