From 930f165546a83bd3a67567b95249dd485df375fe Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Sat, 5 Oct 2024 18:28:31 -0400 Subject: [PATCH] KAFKA-17248: Add reporter for adding thread metrics to telemetry pipeline and a test [2/N] (#17376) This PR adds a Reporter instance that will add streams thread metrics to the telemetry pipeline. For testing, the PR adds a unit test. Reviewers: Matthias Sax --- .../kafka/clients/consumer/Consumer.java | 10 ++ .../kafka/clients/consumer/KafkaConsumer.java | 11 ++ .../kafka/clients/consumer/MockConsumer.java | 17 +++ .../internals/AsyncKafkaConsumer.java | 11 ++ .../internals/ClassicKafkaConsumer.java | 12 ++ ...treamsThreadMetricsDelegatingReporter.java | 86 +++++++++++++ ...msThreadMetricsDelegatingReporterTest.java | 121 ++++++++++++++++++ 7 files changed, 268 insertions(+) create mode 100644 streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index bfef86aec54..055fcfb1b4f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metrics.KafkaMetric; import java.io.Closeable; import java.time.Duration; @@ -122,6 +123,15 @@ public interface Consumer extends Closeable { */ void commitAsync(Map offsets, OffsetCommitCallback callback); + /** + * @see KafkaConsumer#registerMetricForSubscription(KafkaMetric) + */ + void registerMetricForSubscription(KafkaMetric metric); + + /** + * @see KafkaConsumer#unregisterMetricFromSubscription(KafkaMetric) + */ + void unregisterMetricFromSubscription(KafkaMetric metric); /** * @see KafkaConsumer#seek(TopicPartition, long) */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index bd4f66904a7..6710212c566 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.LogContext; @@ -1744,4 +1745,14 @@ public class KafkaConsumer implements Consumer { boolean updateAssignmentMetadataIfNeeded(final Timer timer) { return delegate.updateAssignmentMetadataIfNeeded(timer); } + + @Override + public void registerMetricForSubscription(KafkaMetric metric) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public void unregisterMetricFromSubscription(KafkaMetric metric) { + throw new UnsupportedOperationException("not implemented"); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 6ae28406cf5..8acdfdca4bf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.utils.LogContext; import java.time.Duration; @@ -76,6 +77,8 @@ public class MockConsumer implements Consumer { private Uuid clientInstanceId; private int injectTimeoutExceptionCounter; + private final List addedMetrics = new ArrayList<>(); + public MockConsumer(OffsetResetStrategy offsetResetStrategy) { this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy); this.partitions = new HashMap<>(); @@ -176,6 +179,16 @@ public class MockConsumer implements Consumer { subscriptions.assignFromSubscribed(assignedPartitions); } + @Override + public void registerMetricForSubscription(KafkaMetric metric) { + addedMetrics.add(metric); + } + + @Override + public void unregisterMetricFromSubscription(KafkaMetric metric) { + addedMetrics.remove(metric); + } + @Override public synchronized void assign(Collection partitions) { ensureNotClosed(); @@ -632,4 +645,8 @@ public class MockConsumer implements Consumer { public Duration lastPollTimeout() { return lastPollTimeout; } + + public List addedMetrics() { + return Collections.unmodifiableList(addedMetrics); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 325524e9eae..01a4d29471e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -77,6 +77,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.requests.JoinGroupRequest; @@ -642,6 +643,16 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { ); } + @Override + public void registerMetricForSubscription(KafkaMetric metric) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public void unregisterMetricFromSubscription(KafkaMetric metric) { + throw new UnsupportedOperationException("not implemented"); + } + /** * poll implementation using {@link ApplicationEventHandler}. * 1. Poll for background events. If there's a fetch response event, process the record and return it. If it is diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index b6cf39c0c78..65cff05bce3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -48,6 +48,7 @@ import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.serialization.Deserializer; @@ -426,6 +427,17 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { subscribeInternal(topics, Optional.of(listener)); } + + @Override + public void registerMetricForSubscription(KafkaMetric metric) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public void unregisterMetricFromSubscription(KafkaMetric metric) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public void subscribe(Collection topics) { subscribeInternal(topics, Optional.empty()); diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java new file mode 100644 index 00000000000..0e2a238a29a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.internals.metrics; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsReporter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class StreamsThreadMetricsDelegatingReporter implements MetricsReporter { + + private static final Logger log = LoggerFactory.getLogger(StreamsThreadMetricsDelegatingReporter.class); + private static final String THREAD_ID_TAG = "thread-id"; + private final Consumer consumer; + private final String threadId; + private final String stateUpdaterThreadId; + + + public StreamsThreadMetricsDelegatingReporter(final Consumer consumer, final String threadId, final String stateUpdaterThreadId) { + this.consumer = Objects.requireNonNull(consumer); + this.threadId = Objects.requireNonNull(threadId); + this.stateUpdaterThreadId = Objects.requireNonNull(stateUpdaterThreadId); + log.debug("Creating MetricsReporter for threadId {} and stateUpdaterId {}", threadId, stateUpdaterThreadId); + } + + @Override + public void init(final List metrics) { + metrics.forEach(this::metricChange); + } + + @Override + public void metricChange(final KafkaMetric metric) { + if (tagMatchStreamOrStateUpdaterThreadId(metric)) { + log.debug("Registering metric {}", metric.metricName()); + consumer.registerMetricForSubscription(metric); + } + } + + private boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric metric) { + final Map tags = metric.metricName().tags(); + final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && (tags.get(THREAD_ID_TAG).equals(threadId) || tags.get(THREAD_ID_TAG).equals(stateUpdaterThreadId)); + if (!shouldInclude) { + log.trace("Rejecting metric {}", metric.metricName()); + } + return shouldInclude; + } + + @Override + public void metricRemoval(final KafkaMetric metric) { + if (tagMatchStreamOrStateUpdaterThreadId(metric)) { + log.debug("Unregistering metric {}", metric.metricName()); + consumer.unregisterMetricFromSubscription(metric); + } + } + + @Override + public void close() { + // No op + } + + @Override + public void configure(final Map configs) { + // No op + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java new file mode 100644 index 00000000000..03dbacb82b7 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.internals.metrics; + +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.utils.Time; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class StreamsThreadMetricsDelegatingReporterTest { + + private MockConsumer mockConsumer; + private StreamsThreadMetricsDelegatingReporter streamsThreadMetricsDelegatingReporter; + + private KafkaMetric kafkaMetricOneHasThreadIdTag; + private KafkaMetric kafkaMetricTwoHasThreadIdTag; + private KafkaMetric kafkaMetricThreeHasThreadIdTag; + private KafkaMetric kafkaMetricWithoutThreadIdTag; + private final Object lock = new Object(); + private final MetricConfig metricConfig = new MetricConfig(); + + + @BeforeEach + public void setUp() { + final Map threadIdTagMap = new HashMap<>(); + final String threadId = "abcxyz-StreamThread-1"; + threadIdTagMap.put("thread-id", threadId); + + final Map threadIdWithStateUpdaterTagMap = new HashMap<>(); + final String stateUpdaterId = "deftuv-StateUpdater-1"; + threadIdWithStateUpdaterTagMap.put("thread-id", stateUpdaterId); + + final Map noThreadIdTagMap = new HashMap<>(); + noThreadIdTagMap.put("client-id", "foo"); + + mockConsumer = new MockConsumer<>(OffsetResetStrategy.NONE); + streamsThreadMetricsDelegatingReporter = new StreamsThreadMetricsDelegatingReporter(mockConsumer, threadId, stateUpdaterId); + + final MetricName metricNameOne = new MetricName("metric-one", "test-group-one", "foo bar baz", threadIdTagMap); + final MetricName metricNameTwo = new MetricName("metric-two", "test-group-two", "description two", threadIdWithStateUpdaterTagMap); + final MetricName metricNameThree = new MetricName("metric-three", "test-group-three", "description three", threadIdTagMap); + final MetricName metricNameFour = new MetricName("metric-four", "test-group-three", "description three", noThreadIdTagMap); + + kafkaMetricOneHasThreadIdTag = new KafkaMetric(lock, metricNameOne, (Measurable) (m, now) -> 1.0, metricConfig, Time.SYSTEM); + kafkaMetricTwoHasThreadIdTag = new KafkaMetric(lock, metricNameTwo, (Measurable) (m, now) -> 2.0, metricConfig, Time.SYSTEM); + kafkaMetricThreeHasThreadIdTag = new KafkaMetric(lock, metricNameThree, (Measurable) (m, now) -> 3.0, metricConfig, Time.SYSTEM); + kafkaMetricWithoutThreadIdTag = new KafkaMetric(lock, metricNameFour, (Measurable) (m, now) -> 4.0, metricConfig, Time.SYSTEM); + } + + @AfterEach + public void tearDown() { + mockConsumer.close(); + } + + + @Test + @DisplayName("Init method should register metrics it receives as parameters") + public void shouldInitMetrics() { + final List allMetrics = Arrays.asList(kafkaMetricOneHasThreadIdTag, kafkaMetricTwoHasThreadIdTag, kafkaMetricThreeHasThreadIdTag); + final List expectedMetrics = Arrays.asList(kafkaMetricOneHasThreadIdTag, kafkaMetricTwoHasThreadIdTag, kafkaMetricThreeHasThreadIdTag); + streamsThreadMetricsDelegatingReporter.init(allMetrics); + assertEquals(expectedMetrics, mockConsumer.addedMetrics()); + } + + @Test + @DisplayName("Should register metrics with thread-id in tag map") + public void shouldRegisterMetrics() { + streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricOneHasThreadIdTag); + assertEquals(kafkaMetricOneHasThreadIdTag, mockConsumer.addedMetrics().get(0)); + } + + @Test + @DisplayName("Should remove metrics") + public void shouldRemoveMetrics() { + streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricOneHasThreadIdTag); + streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricTwoHasThreadIdTag); + streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricThreeHasThreadIdTag); + List expected = Arrays.asList(kafkaMetricOneHasThreadIdTag, kafkaMetricTwoHasThreadIdTag, kafkaMetricThreeHasThreadIdTag); + assertEquals(expected, mockConsumer.addedMetrics()); + streamsThreadMetricsDelegatingReporter.metricRemoval(kafkaMetricOneHasThreadIdTag); + expected = Arrays.asList(kafkaMetricTwoHasThreadIdTag, kafkaMetricThreeHasThreadIdTag); + assertEquals(expected, mockConsumer.addedMetrics()); + } + + @Test + @DisplayName("Should not register metrics without thread-id tag") + public void shouldNotRegisterMetricsWithoutThreadIdTag() { + streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricWithoutThreadIdTag); + assertEquals(0, mockConsumer.addedMetrics().size()); + } +} \ No newline at end of file