From deb58910c8b573a8ed354cad3a76350fbafbe4d1 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Fri, 1 Aug 2025 14:59:15 +0200 Subject: [PATCH] KAFKA-19529: State updater sensor names should be unique (#20262) (#20273) All state updater threads use the same metrics instance, but do not use unique names for their sensors. This can have the following symptoms: 1) Data inserted into one sensor by one thread can affect the metrics of all state updater threads. 2) If one state updater thread is shutdown, the metrics associated to all state updater threads are removed. 3) If one state updater thread is started, while another one is removed, it can happen that a metric is registered with the `Metrics` instance, but not associated to any `Sensor` (because it is concurrently removed), which means that the metric will not be removed upon shutdown. If a thread with the same name later tries to register the same metric, we may run into a `java.lang.IllegalArgumentException: A metric named ... already exists`, as described in the ticket. This change fixes the bug giving unique names to the sensors. A test is added that there is no interference of the removal of sensors and metrics during shutdown. Reviewers: Matthias J. Sax --- .../internals/DefaultStateUpdater.java | 54 +++++++++--------- .../processor/internals/StreamThread.java | 2 +- .../internals/DefaultStateUpdaterTest.java | 56 ++++++++++++++++++- 3 files changed, 83 insertions(+), 29 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index bcb76966f6e..48d42590c1e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -36,6 +36,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.Task.State; import org.apache.kafka.streams.processor.internals.TaskAndAction.Action; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.slf4j.Logger; @@ -89,7 +90,7 @@ public class DefaultStateUpdater implements StateUpdater { private volatile KafkaFutureImpl clientInstanceIdFuture = new KafkaFutureImpl<>(); public StateUpdaterThread(final String name, - final Metrics metrics, + final StreamsMetricsImpl metrics, final ChangelogReader changelogReader) { super(name); this.changelogReader = changelogReader; @@ -745,7 +746,7 @@ public class DefaultStateUpdater implements StateUpdater { private final Time time; private final Logger log; private final String name; - private final Metrics metrics; + private final StreamsMetricsImpl metrics; private final Consumer restoreConsumer; private final ChangelogReader changelogReader; private final TopologyMetadata topologyMetadata; @@ -766,7 +767,7 @@ public class DefaultStateUpdater implements StateUpdater { private StateUpdaterThread stateUpdaterThread = null; public DefaultStateUpdater(final String name, - final Metrics metrics, + final StreamsMetricsImpl metrics, final StreamsConfig config, final Consumer restoreConsumer, final ChangelogReader changelogReader, @@ -1059,74 +1060,75 @@ public class DefaultStateUpdater implements StateUpdater { private final Sensor standbyRestoreRatioSensor; private final Sensor checkpointRatioSensor; - private final Deque allSensorNames = new LinkedList<>(); + private final Deque allSensors = new LinkedList<>(); private final Deque allMetricNames = new LinkedList<>(); - private StateUpdaterMetrics(final Metrics metrics, final String threadId) { + private StateUpdaterMetrics(final StreamsMetricsImpl metrics, final String threadId) { final Map threadLevelTags = new LinkedHashMap<>(); threadLevelTags.put(THREAD_ID_TAG, threadId); + final Metrics metricsRegistry = metrics.metricsRegistry(); - MetricName metricName = metrics.metricName("active-restoring-tasks", + MetricName metricName = metricsRegistry.metricName("active-restoring-tasks", STATE_LEVEL_GROUP, "The number of active tasks currently undergoing restoration", threadLevelTags); - metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? stateUpdaterThread.numRestoringActiveTasks() : 0); allMetricNames.push(metricName); - metricName = metrics.metricName("standby-updating-tasks", + metricName = metricsRegistry.metricName("standby-updating-tasks", STATE_LEVEL_GROUP, "The number of standby tasks currently undergoing state update", threadLevelTags); - metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? stateUpdaterThread.numUpdatingStandbyTasks() : 0); allMetricNames.push(metricName); - metricName = metrics.metricName("active-paused-tasks", + metricName = metricsRegistry.metricName("active-paused-tasks", STATE_LEVEL_GROUP, "The number of active tasks paused restoring", threadLevelTags); - metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? stateUpdaterThread.numPausedActiveTasks() : 0); allMetricNames.push(metricName); - metricName = metrics.metricName("standby-paused-tasks", + metricName = metricsRegistry.metricName("standby-paused-tasks", STATE_LEVEL_GROUP, "The number of standby tasks paused state update", threadLevelTags); - metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? stateUpdaterThread.numPausedStandbyTasks() : 0); allMetricNames.push(metricName); - this.idleRatioSensor = metrics.sensor("idle-ratio", RecordingLevel.INFO); + this.idleRatioSensor = metrics.threadLevelSensor(threadId, "idle-ratio", RecordingLevel.INFO); this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); - allSensorNames.add("idle-ratio"); + allSensors.add(this.idleRatioSensor); - this.activeRestoreRatioSensor = metrics.sensor("active-restore-ratio", RecordingLevel.INFO); + this.activeRestoreRatioSensor = metrics.threadLevelSensor(threadId, "active-restore-ratio", RecordingLevel.INFO); this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); - allSensorNames.add("active-restore-ratio"); + allSensors.add(this.activeRestoreRatioSensor); - this.standbyRestoreRatioSensor = metrics.sensor("standby-update-ratio", RecordingLevel.INFO); + this.standbyRestoreRatioSensor = metrics.threadLevelSensor(threadId, "standby-update-ratio", RecordingLevel.INFO); this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); - allSensorNames.add("standby-update-ratio"); + allSensors.add(this.standbyRestoreRatioSensor); - this.checkpointRatioSensor = metrics.sensor("checkpoint-ratio", RecordingLevel.INFO); + this.checkpointRatioSensor = metrics.threadLevelSensor(threadId, "checkpoint-ratio", RecordingLevel.INFO); this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg()); - allSensorNames.add("checkpoint-ratio"); + allSensors.add(this.checkpointRatioSensor); - this.restoreSensor = metrics.sensor("restore-records", RecordingLevel.INFO); + this.restoreSensor = metrics.threadLevelSensor(threadId, "restore-records", RecordingLevel.INFO); this.restoreSensor.add(new MetricName("restore-records-rate", STATE_LEVEL_GROUP, RESTORE_RECORDS_RATE_DESCRIPTION, threadLevelTags), new Rate()); this.restoreSensor.add(new MetricName("restore-call-rate", STATE_LEVEL_GROUP, RESTORE_RATE_DESCRIPTION, threadLevelTags), new Rate(new WindowedCount())); - allSensorNames.add("restore-records"); + allSensors.add(this.restoreSensor); } void clear() { - while (!allSensorNames.isEmpty()) { - metrics.removeSensor(allSensorNames.pop()); + while (!allSensors.isEmpty()) { + metrics.removeSensor(allSensors.pop()); } while (!allMetricNames.isEmpty()) { - metrics.removeMetric(allMetricNames.pop()); + metrics.metricsRegistry().removeMetric(allMetricNames.pop()); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 20d49b44dbd..782775657e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -550,7 +550,7 @@ public class StreamThread extends Thread implements ProcessingThread { final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx; final StateUpdater stateUpdater = new DefaultStateUpdater( name, - streamsMetrics.metricsRegistry(), + streamsMetrics, streamsConfig, restoreConsumer, changelogReader, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index 25f3f0e587f..9f2a4c9356d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTask; import org.apache.kafka.streams.processor.internals.Task.State; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.hamcrest.Matcher; import org.junit.jupiter.api.AfterEach; @@ -105,7 +106,7 @@ class DefaultStateUpdaterTest { // need an auto-tick timer to work for draining with timeout private final Time time = new MockTime(1L); - private final Metrics metrics = new Metrics(time); + private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", time); private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL)); private final ChangelogReader changelogReader = mock(ChangelogReader.class); private final TopologyMetadata topologyMetadata = unnamedTopology().build(); @@ -1672,8 +1673,59 @@ class DefaultStateUpdaterTest { assertThat(metrics.metrics().size(), is(1)); } + @Test + public void shouldRemoveMetricsWithoutInterference() { + final DefaultStateUpdater stateUpdater2 = + new DefaultStateUpdater("test-state-updater2", metrics, config, null, changelogReader, topologyMetadata, time); + final List threadMetrics = getMetricNames("test-state-updater"); + final List threadMetrics2 = getMetricNames("test-state-updater2"); + + stateUpdater.start(); + stateUpdater2.start(); + + for (final MetricName metricName : threadMetrics) { + assertTrue(metrics.metrics().containsKey(metricName)); + } + for (final MetricName metricName : threadMetrics2) { + assertTrue(metrics.metrics().containsKey(metricName)); + } + + stateUpdater2.shutdown(Duration.ofMinutes(1)); + + for (final MetricName metricName : threadMetrics) { + assertTrue(metrics.metrics().containsKey(metricName)); + } + for (final MetricName metricName : threadMetrics2) { + assertFalse(metrics.metrics().containsKey(metricName)); + } + + stateUpdater.shutdown(Duration.ofMinutes(1)); + + for (final MetricName metricName : threadMetrics) { + assertFalse(metrics.metrics().containsKey(metricName)); + } + for (final MetricName metricName : threadMetrics2) { + assertFalse(metrics.metrics().containsKey(metricName)); + } + } + + private static List getMetricNames(final String threadId) { + final Map tagMap = Map.of("thread-id", threadId); + return List.of( + new MetricName("active-restoring-tasks", "stream-state-updater-metrics", "", tagMap), + new MetricName("standby-updating-tasks", "stream-state-updater-metrics", "", tagMap), + new MetricName("active-paused-tasks", "stream-state-updater-metrics", "", tagMap), + new MetricName("standby-paused-tasks", "stream-state-updater-metrics", "", tagMap), + new MetricName("idle-ratio", "stream-state-updater-metrics", "", tagMap), + new MetricName("standby-update-ratio", "stream-state-updater-metrics", "", tagMap), + new MetricName("checkpoint-ratio", "stream-state-updater-metrics", "", tagMap), + new MetricName("restore-records-rate", "stream-state-updater-metrics", "", tagMap), + new MetricName("restore-call-rate", "stream-state-updater-metrics", "", tagMap) + ); + } + @SuppressWarnings("unchecked") - private static void verifyMetric(final Metrics metrics, + private static void verifyMetric(final StreamsMetricsImpl metrics, final MetricName metricName, final Matcher matcher) { assertThat(metrics.metrics().get(metricName).metricName().description(), is(metricName.description()));