diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index bcb76966f6e..48d42590c1e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -36,6 +36,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.Task.State; import org.apache.kafka.streams.processor.internals.TaskAndAction.Action; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.slf4j.Logger; @@ -89,7 +90,7 @@ public class DefaultStateUpdater implements StateUpdater { private volatile KafkaFutureImpl clientInstanceIdFuture = new KafkaFutureImpl<>(); public StateUpdaterThread(final String name, - final Metrics metrics, + final StreamsMetricsImpl metrics, final ChangelogReader changelogReader) { super(name); this.changelogReader = changelogReader; @@ -745,7 +746,7 @@ public class DefaultStateUpdater implements StateUpdater { private final Time time; private final Logger log; private final String name; - private final Metrics metrics; + private final StreamsMetricsImpl metrics; private final Consumer restoreConsumer; private final ChangelogReader changelogReader; private final TopologyMetadata topologyMetadata; @@ -766,7 +767,7 @@ public class DefaultStateUpdater implements StateUpdater { private StateUpdaterThread stateUpdaterThread = null; public DefaultStateUpdater(final String name, - final Metrics metrics, + final StreamsMetricsImpl metrics, final StreamsConfig config, final Consumer restoreConsumer, final ChangelogReader changelogReader, @@ -1059,74 +1060,75 @@ public class DefaultStateUpdater implements StateUpdater { private final Sensor standbyRestoreRatioSensor; private final Sensor checkpointRatioSensor; - private final Deque allSensorNames = new LinkedList<>(); + private final Deque allSensors = new LinkedList<>(); private final Deque allMetricNames = new LinkedList<>(); - private StateUpdaterMetrics(final Metrics metrics, final String threadId) { + private StateUpdaterMetrics(final StreamsMetricsImpl metrics, final String threadId) { final Map threadLevelTags = new LinkedHashMap<>(); threadLevelTags.put(THREAD_ID_TAG, threadId); + final Metrics metricsRegistry = metrics.metricsRegistry(); - MetricName metricName = metrics.metricName("active-restoring-tasks", + MetricName metricName = metricsRegistry.metricName("active-restoring-tasks", STATE_LEVEL_GROUP, "The number of active tasks currently undergoing restoration", threadLevelTags); - metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? stateUpdaterThread.numRestoringActiveTasks() : 0); allMetricNames.push(metricName); - metricName = metrics.metricName("standby-updating-tasks", + metricName = metricsRegistry.metricName("standby-updating-tasks", STATE_LEVEL_GROUP, "The number of standby tasks currently undergoing state update", threadLevelTags); - metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? stateUpdaterThread.numUpdatingStandbyTasks() : 0); allMetricNames.push(metricName); - metricName = metrics.metricName("active-paused-tasks", + metricName = metricsRegistry.metricName("active-paused-tasks", STATE_LEVEL_GROUP, "The number of active tasks paused restoring", threadLevelTags); - metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? stateUpdaterThread.numPausedActiveTasks() : 0); allMetricNames.push(metricName); - metricName = metrics.metricName("standby-paused-tasks", + metricName = metricsRegistry.metricName("standby-paused-tasks", STATE_LEVEL_GROUP, "The number of standby tasks paused state update", threadLevelTags); - metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? stateUpdaterThread.numPausedStandbyTasks() : 0); allMetricNames.push(metricName); - this.idleRatioSensor = metrics.sensor("idle-ratio", RecordingLevel.INFO); + this.idleRatioSensor = metrics.threadLevelSensor(threadId, "idle-ratio", RecordingLevel.INFO); this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); - allSensorNames.add("idle-ratio"); + allSensors.add(this.idleRatioSensor); - this.activeRestoreRatioSensor = metrics.sensor("active-restore-ratio", RecordingLevel.INFO); + this.activeRestoreRatioSensor = metrics.threadLevelSensor(threadId, "active-restore-ratio", RecordingLevel.INFO); this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); - allSensorNames.add("active-restore-ratio"); + allSensors.add(this.activeRestoreRatioSensor); - this.standbyRestoreRatioSensor = metrics.sensor("standby-update-ratio", RecordingLevel.INFO); + this.standbyRestoreRatioSensor = metrics.threadLevelSensor(threadId, "standby-update-ratio", RecordingLevel.INFO); this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); - allSensorNames.add("standby-update-ratio"); + allSensors.add(this.standbyRestoreRatioSensor); - this.checkpointRatioSensor = metrics.sensor("checkpoint-ratio", RecordingLevel.INFO); + this.checkpointRatioSensor = metrics.threadLevelSensor(threadId, "checkpoint-ratio", RecordingLevel.INFO); this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg()); - allSensorNames.add("checkpoint-ratio"); + allSensors.add(this.checkpointRatioSensor); - this.restoreSensor = metrics.sensor("restore-records", RecordingLevel.INFO); + this.restoreSensor = metrics.threadLevelSensor(threadId, "restore-records", RecordingLevel.INFO); this.restoreSensor.add(new MetricName("restore-records-rate", STATE_LEVEL_GROUP, RESTORE_RECORDS_RATE_DESCRIPTION, threadLevelTags), new Rate()); this.restoreSensor.add(new MetricName("restore-call-rate", STATE_LEVEL_GROUP, RESTORE_RATE_DESCRIPTION, threadLevelTags), new Rate(new WindowedCount())); - allSensorNames.add("restore-records"); + allSensors.add(this.restoreSensor); } void clear() { - while (!allSensorNames.isEmpty()) { - metrics.removeSensor(allSensorNames.pop()); + while (!allSensors.isEmpty()) { + metrics.removeSensor(allSensors.pop()); } while (!allMetricNames.isEmpty()) { - metrics.removeMetric(allMetricNames.pop()); + metrics.metricsRegistry().removeMetric(allMetricNames.pop()); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 20d49b44dbd..782775657e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -550,7 +550,7 @@ public class StreamThread extends Thread implements ProcessingThread { final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx; final StateUpdater stateUpdater = new DefaultStateUpdater( name, - streamsMetrics.metricsRegistry(), + streamsMetrics, streamsConfig, restoreConsumer, changelogReader, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index 25f3f0e587f..9f2a4c9356d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTask; import org.apache.kafka.streams.processor.internals.Task.State; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.hamcrest.Matcher; import org.junit.jupiter.api.AfterEach; @@ -105,7 +106,7 @@ class DefaultStateUpdaterTest { // need an auto-tick timer to work for draining with timeout private final Time time = new MockTime(1L); - private final Metrics metrics = new Metrics(time); + private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", time); private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL)); private final ChangelogReader changelogReader = mock(ChangelogReader.class); private final TopologyMetadata topologyMetadata = unnamedTopology().build(); @@ -1672,8 +1673,59 @@ class DefaultStateUpdaterTest { assertThat(metrics.metrics().size(), is(1)); } + @Test + public void shouldRemoveMetricsWithoutInterference() { + final DefaultStateUpdater stateUpdater2 = + new DefaultStateUpdater("test-state-updater2", metrics, config, null, changelogReader, topologyMetadata, time); + final List threadMetrics = getMetricNames("test-state-updater"); + final List threadMetrics2 = getMetricNames("test-state-updater2"); + + stateUpdater.start(); + stateUpdater2.start(); + + for (final MetricName metricName : threadMetrics) { + assertTrue(metrics.metrics().containsKey(metricName)); + } + for (final MetricName metricName : threadMetrics2) { + assertTrue(metrics.metrics().containsKey(metricName)); + } + + stateUpdater2.shutdown(Duration.ofMinutes(1)); + + for (final MetricName metricName : threadMetrics) { + assertTrue(metrics.metrics().containsKey(metricName)); + } + for (final MetricName metricName : threadMetrics2) { + assertFalse(metrics.metrics().containsKey(metricName)); + } + + stateUpdater.shutdown(Duration.ofMinutes(1)); + + for (final MetricName metricName : threadMetrics) { + assertFalse(metrics.metrics().containsKey(metricName)); + } + for (final MetricName metricName : threadMetrics2) { + assertFalse(metrics.metrics().containsKey(metricName)); + } + } + + private static List getMetricNames(final String threadId) { + final Map tagMap = Map.of("thread-id", threadId); + return List.of( + new MetricName("active-restoring-tasks", "stream-state-updater-metrics", "", tagMap), + new MetricName("standby-updating-tasks", "stream-state-updater-metrics", "", tagMap), + new MetricName("active-paused-tasks", "stream-state-updater-metrics", "", tagMap), + new MetricName("standby-paused-tasks", "stream-state-updater-metrics", "", tagMap), + new MetricName("idle-ratio", "stream-state-updater-metrics", "", tagMap), + new MetricName("standby-update-ratio", "stream-state-updater-metrics", "", tagMap), + new MetricName("checkpoint-ratio", "stream-state-updater-metrics", "", tagMap), + new MetricName("restore-records-rate", "stream-state-updater-metrics", "", tagMap), + new MetricName("restore-call-rate", "stream-state-updater-metrics", "", tagMap) + ); + } + @SuppressWarnings("unchecked") - private static void verifyMetric(final Metrics metrics, + private static void verifyMetric(final StreamsMetricsImpl metrics, final MetricName metricName, final Matcher matcher) { assertThat(metrics.metrics().get(metricName).metricName().description(), is(metricName.description()));