KAFKA-19529: State updater sensor names should be unique (#20262)
CI / build (push) Waiting to run Details

All state updater threads use the same metrics instance, but do not use
unique names for their sensors. This can have the following symptoms:

1) Data inserted into one sensor by one thread can affect the metrics of
all state updater threads.
2) If one state updater thread is shutdown, the metrics associated to
all state updater threads are removed.
3) If one state updater thread is started, while another one is removed,
it can happen that a metric is registered with the `Metrics` instance,
but not associated to any `Sensor` (because it is concurrently removed),
which means that the metric will not be removed upon shutdown. If a
thread with the same name later tries to register the same metric, we
may run into a `java.lang.IllegalArgumentException: A metric named ...
already exists`, as described in the ticket.

This change fixes the bug giving unique names to the sensors. A test is
added that there is no interference of the removal of sensors and
metrics during shutdown.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-07-31 08:58:52 +02:00 committed by GitHub
parent a058123cd8
commit 44c6e956ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 82 additions and 28 deletions

View File

@ -36,6 +36,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task.State;
import org.apache.kafka.streams.processor.internals.TaskAndAction.Action;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
@ -89,7 +90,7 @@ public class DefaultStateUpdater implements StateUpdater {
private volatile KafkaFutureImpl<Uuid> clientInstanceIdFuture = new KafkaFutureImpl<>();
public StateUpdaterThread(final String name,
final Metrics metrics,
final StreamsMetricsImpl metrics,
final ChangelogReader changelogReader) {
super(name);
this.changelogReader = changelogReader;
@ -745,7 +746,7 @@ public class DefaultStateUpdater implements StateUpdater {
private final Time time;
private final Logger log;
private final String name;
private final Metrics metrics;
private final StreamsMetricsImpl metrics;
private final Consumer<byte[], byte[]> restoreConsumer;
private final ChangelogReader changelogReader;
private final TopologyMetadata topologyMetadata;
@ -766,7 +767,7 @@ public class DefaultStateUpdater implements StateUpdater {
private StateUpdaterThread stateUpdaterThread = null;
public DefaultStateUpdater(final String name,
final Metrics metrics,
final StreamsMetricsImpl metrics,
final StreamsConfig config,
final Consumer<byte[], byte[]> restoreConsumer,
final ChangelogReader changelogReader,
@ -1062,70 +1063,71 @@ public class DefaultStateUpdater implements StateUpdater {
private final Sensor standbyRestoreRatioSensor;
private final Sensor checkpointRatioSensor;
private final Deque<String> allSensorNames = new LinkedList<>();
private final Deque<Sensor> allSensors = new LinkedList<>();
private final Deque<MetricName> allMetricNames = new LinkedList<>();
private StateUpdaterMetrics(final Metrics metrics, final String threadId) {
private StateUpdaterMetrics(final StreamsMetricsImpl metrics, final String threadId) {
final Map<String, String> threadLevelTags = new LinkedHashMap<>();
threadLevelTags.put(THREAD_ID_TAG, threadId);
final Metrics metricsRegistry = metrics.metricsRegistry();
MetricName metricName = metrics.metricName("active-restoring-tasks",
MetricName metricName = metricsRegistry.metricName("active-restoring-tasks",
STATE_LEVEL_GROUP,
"The number of active tasks currently undergoing restoration",
threadLevelTags);
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
stateUpdaterThread.numRestoringActiveTasks() : 0);
allMetricNames.push(metricName);
metricName = metrics.metricName("standby-updating-tasks",
metricName = metricsRegistry.metricName("standby-updating-tasks",
STATE_LEVEL_GROUP,
"The number of standby tasks currently undergoing state update",
threadLevelTags);
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
stateUpdaterThread.numUpdatingStandbyTasks() : 0);
allMetricNames.push(metricName);
metricName = metrics.metricName("active-paused-tasks",
metricName = metricsRegistry.metricName("active-paused-tasks",
STATE_LEVEL_GROUP,
"The number of active tasks paused restoring",
threadLevelTags);
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
stateUpdaterThread.numPausedActiveTasks() : 0);
allMetricNames.push(metricName);
metricName = metrics.metricName("standby-paused-tasks",
metricName = metricsRegistry.metricName("standby-paused-tasks",
STATE_LEVEL_GROUP,
"The number of standby tasks paused state update",
threadLevelTags);
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
stateUpdaterThread.numPausedStandbyTasks() : 0);
allMetricNames.push(metricName);
this.idleRatioSensor = metrics.sensor("idle-ratio", RecordingLevel.INFO);
this.idleRatioSensor = metrics.threadLevelSensor(threadId, "idle-ratio", RecordingLevel.INFO);
this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
allSensorNames.add("idle-ratio");
allSensors.add(this.idleRatioSensor);
this.activeRestoreRatioSensor = metrics.sensor("active-restore-ratio", RecordingLevel.INFO);
this.activeRestoreRatioSensor = metrics.threadLevelSensor(threadId, "active-restore-ratio", RecordingLevel.INFO);
this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
allSensorNames.add("active-restore-ratio");
allSensors.add(this.activeRestoreRatioSensor);
this.standbyRestoreRatioSensor = metrics.sensor("standby-update-ratio", RecordingLevel.INFO);
this.standbyRestoreRatioSensor = metrics.threadLevelSensor(threadId, "standby-update-ratio", RecordingLevel.INFO);
this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
allSensorNames.add("standby-update-ratio");
allSensors.add(this.standbyRestoreRatioSensor);
this.checkpointRatioSensor = metrics.sensor("checkpoint-ratio", RecordingLevel.INFO);
this.checkpointRatioSensor = metrics.threadLevelSensor(threadId, "checkpoint-ratio", RecordingLevel.INFO);
this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg());
allSensorNames.add("checkpoint-ratio");
allSensors.add(this.checkpointRatioSensor);
this.restoreSensor = metrics.sensor("restore-records", RecordingLevel.INFO);
this.restoreSensor = metrics.threadLevelSensor(threadId, "restore-records", RecordingLevel.INFO);
this.restoreSensor.add(new MetricName("restore-records-rate", STATE_LEVEL_GROUP, RESTORE_RECORDS_RATE_DESCRIPTION, threadLevelTags), new Rate());
this.restoreSensor.add(new MetricName("restore-call-rate", STATE_LEVEL_GROUP, RESTORE_RATE_DESCRIPTION, threadLevelTags), new Rate(new WindowedCount()));
allSensorNames.add("restore-records");
allSensors.add(this.restoreSensor);
}
void clear() {
while (!allSensorNames.isEmpty()) {
metrics.removeSensor(allSensorNames.pop());
while (!allSensors.isEmpty()) {
metrics.removeSensor(allSensors.pop());
}
while (!allMetricNames.isEmpty()) {

View File

@ -649,7 +649,7 @@ public class StreamThread extends Thread implements ProcessingThread {
final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx;
return new DefaultStateUpdater(
name,
streamsMetrics.metricsRegistry(),
streamsMetrics,
streamsConfig,
restoreConsumer,
changelogReader,

View File

@ -28,6 +28,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTask;
import org.apache.kafka.streams.processor.internals.Task.State;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.AfterEach;
@ -105,7 +106,7 @@ class DefaultStateUpdaterTest {
// need an auto-tick timer to work for draining with timeout
private final Time time = new MockTime(1L);
private final Metrics metrics = new Metrics(time);
private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", time);
private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
private final TopologyMetadata topologyMetadata = unnamedTopology().build();
@ -1680,8 +1681,59 @@ class DefaultStateUpdaterTest {
assertThat(metrics.metrics().size(), is(1));
}
@Test
public void shouldRemoveMetricsWithoutInterference() {
final DefaultStateUpdater stateUpdater2 =
new DefaultStateUpdater("test-state-updater2", metrics, config, null, changelogReader, topologyMetadata, time);
final List<MetricName> threadMetrics = getMetricNames("test-state-updater");
final List<MetricName> threadMetrics2 = getMetricNames("test-state-updater2");
stateUpdater.start();
stateUpdater2.start();
for (final MetricName metricName : threadMetrics) {
assertTrue(metrics.metrics().containsKey(metricName));
}
for (final MetricName metricName : threadMetrics2) {
assertTrue(metrics.metrics().containsKey(metricName));
}
stateUpdater2.shutdown(Duration.ofMinutes(1));
for (final MetricName metricName : threadMetrics) {
assertTrue(metrics.metrics().containsKey(metricName));
}
for (final MetricName metricName : threadMetrics2) {
assertFalse(metrics.metrics().containsKey(metricName));
}
stateUpdater.shutdown(Duration.ofMinutes(1));
for (final MetricName metricName : threadMetrics) {
assertFalse(metrics.metrics().containsKey(metricName));
}
for (final MetricName metricName : threadMetrics2) {
assertFalse(metrics.metrics().containsKey(metricName));
}
}
private static List<MetricName> getMetricNames(final String threadId) {
final Map<String, String> tagMap = Map.of("thread-id", threadId);
return List.of(
new MetricName("active-restoring-tasks", "stream-state-updater-metrics", "", tagMap),
new MetricName("standby-updating-tasks", "stream-state-updater-metrics", "", tagMap),
new MetricName("active-paused-tasks", "stream-state-updater-metrics", "", tagMap),
new MetricName("standby-paused-tasks", "stream-state-updater-metrics", "", tagMap),
new MetricName("idle-ratio", "stream-state-updater-metrics", "", tagMap),
new MetricName("standby-update-ratio", "stream-state-updater-metrics", "", tagMap),
new MetricName("checkpoint-ratio", "stream-state-updater-metrics", "", tagMap),
new MetricName("restore-records-rate", "stream-state-updater-metrics", "", tagMap),
new MetricName("restore-call-rate", "stream-state-updater-metrics", "", tagMap)
);
}
@SuppressWarnings("unchecked")
private static <T> void verifyMetric(final Metrics metrics,
private static <T> void verifyMetric(final StreamsMetricsImpl metrics,
final MetricName metricName,
final Matcher<T> matcher) {
assertThat(metrics.metrics().get(metricName).metricName().description(), is(metricName.description()));