KAFKA-17248: Add reporter for adding thread metrics to telemetry pipeline and a test [2/N] (#17376)

This PR adds a Reporter instance that will add streams thread metrics to the telemetry pipeline.
For testing, the PR adds a unit test.

Reviewers: Matthias Sax <mjsax@apache.org>
This commit is contained in:
Bill Bejeck 2024-10-05 18:28:31 -04:00 committed by GitHub
parent 3bb408c4de
commit 930f165546
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 268 additions and 0 deletions

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.KafkaMetric;
import java.io.Closeable;
import java.time.Duration;
@ -122,6 +123,15 @@ public interface Consumer<K, V> extends Closeable {
*/
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
/**
* @see KafkaConsumer#registerMetricForSubscription(KafkaMetric)
*/
void registerMetricForSubscription(KafkaMetric metric);
/**
* @see KafkaConsumer#unregisterMetricFromSubscription(KafkaMetric)
*/
void unregisterMetricFromSubscription(KafkaMetric metric);
/**
* @see KafkaConsumer#seek(TopicPartition, long)
*/

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.LogContext;
@ -1744,4 +1745,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
return delegate.updateAssignmentMetadataIfNeeded(timer);
}
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
}
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
}
}

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.utils.LogContext;
import java.time.Duration;
@ -76,6 +77,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private Uuid clientInstanceId;
private int injectTimeoutExceptionCounter;
private final List<KafkaMetric> addedMetrics = new ArrayList<>();
public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy);
this.partitions = new HashMap<>();
@ -176,6 +179,16 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
subscriptions.assignFromSubscribed(assignedPartitions);
}
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
addedMetrics.add(metric);
}
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
addedMetrics.remove(metric);
}
@Override
public synchronized void assign(Collection<TopicPartition> partitions) {
ensureNotClosed();
@ -632,4 +645,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
public Duration lastPollTimeout() {
return lastPollTimeout;
}
public List<KafkaMetric> addedMetrics() {
return Collections.unmodifiableList(addedMetrics);
}
}

View File

@ -77,6 +77,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.requests.JoinGroupRequest;
@ -642,6 +643,16 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
);
}
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
}
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
}
/**
* poll implementation using {@link ApplicationEventHandler}.
* 1. Poll for background events. If there's a fetch response event, process the record and return it. If it is

View File

@ -48,6 +48,7 @@ import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.Deserializer;
@ -426,6 +427,17 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
subscribeInternal(topics, Optional.of(listener));
}
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
}
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
}
@Override
public void subscribe(Collection<String> topics) {
subscribeInternal(topics, Optional.empty());

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals.metrics;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class StreamsThreadMetricsDelegatingReporter implements MetricsReporter {
private static final Logger log = LoggerFactory.getLogger(StreamsThreadMetricsDelegatingReporter.class);
private static final String THREAD_ID_TAG = "thread-id";
private final Consumer<byte[], byte[]> consumer;
private final String threadId;
private final String stateUpdaterThreadId;
public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], byte[]> consumer, final String threadId, final String stateUpdaterThreadId) {
this.consumer = Objects.requireNonNull(consumer);
this.threadId = Objects.requireNonNull(threadId);
this.stateUpdaterThreadId = Objects.requireNonNull(stateUpdaterThreadId);
log.debug("Creating MetricsReporter for threadId {} and stateUpdaterId {}", threadId, stateUpdaterThreadId);
}
@Override
public void init(final List<KafkaMetric> metrics) {
metrics.forEach(this::metricChange);
}
@Override
public void metricChange(final KafkaMetric metric) {
if (tagMatchStreamOrStateUpdaterThreadId(metric)) {
log.debug("Registering metric {}", metric.metricName());
consumer.registerMetricForSubscription(metric);
}
}
private boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric metric) {
final Map<String, String> tags = metric.metricName().tags();
final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && (tags.get(THREAD_ID_TAG).equals(threadId) || tags.get(THREAD_ID_TAG).equals(stateUpdaterThreadId));
if (!shouldInclude) {
log.trace("Rejecting metric {}", metric.metricName());
}
return shouldInclude;
}
@Override
public void metricRemoval(final KafkaMetric metric) {
if (tagMatchStreamOrStateUpdaterThreadId(metric)) {
log.debug("Unregistering metric {}", metric.metricName());
consumer.unregisterMetricFromSubscription(metric);
}
}
@Override
public void close() {
// No op
}
@Override
public void configure(final Map<String, ?> configs) {
// No op
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals.metrics;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
class StreamsThreadMetricsDelegatingReporterTest {
private MockConsumer<byte[], byte[]> mockConsumer;
private StreamsThreadMetricsDelegatingReporter streamsThreadMetricsDelegatingReporter;
private KafkaMetric kafkaMetricOneHasThreadIdTag;
private KafkaMetric kafkaMetricTwoHasThreadIdTag;
private KafkaMetric kafkaMetricThreeHasThreadIdTag;
private KafkaMetric kafkaMetricWithoutThreadIdTag;
private final Object lock = new Object();
private final MetricConfig metricConfig = new MetricConfig();
@BeforeEach
public void setUp() {
final Map<String, String> threadIdTagMap = new HashMap<>();
final String threadId = "abcxyz-StreamThread-1";
threadIdTagMap.put("thread-id", threadId);
final Map<String, String> threadIdWithStateUpdaterTagMap = new HashMap<>();
final String stateUpdaterId = "deftuv-StateUpdater-1";
threadIdWithStateUpdaterTagMap.put("thread-id", stateUpdaterId);
final Map<String, String> noThreadIdTagMap = new HashMap<>();
noThreadIdTagMap.put("client-id", "foo");
mockConsumer = new MockConsumer<>(OffsetResetStrategy.NONE);
streamsThreadMetricsDelegatingReporter = new StreamsThreadMetricsDelegatingReporter(mockConsumer, threadId, stateUpdaterId);
final MetricName metricNameOne = new MetricName("metric-one", "test-group-one", "foo bar baz", threadIdTagMap);
final MetricName metricNameTwo = new MetricName("metric-two", "test-group-two", "description two", threadIdWithStateUpdaterTagMap);
final MetricName metricNameThree = new MetricName("metric-three", "test-group-three", "description three", threadIdTagMap);
final MetricName metricNameFour = new MetricName("metric-four", "test-group-three", "description three", noThreadIdTagMap);
kafkaMetricOneHasThreadIdTag = new KafkaMetric(lock, metricNameOne, (Measurable) (m, now) -> 1.0, metricConfig, Time.SYSTEM);
kafkaMetricTwoHasThreadIdTag = new KafkaMetric(lock, metricNameTwo, (Measurable) (m, now) -> 2.0, metricConfig, Time.SYSTEM);
kafkaMetricThreeHasThreadIdTag = new KafkaMetric(lock, metricNameThree, (Measurable) (m, now) -> 3.0, metricConfig, Time.SYSTEM);
kafkaMetricWithoutThreadIdTag = new KafkaMetric(lock, metricNameFour, (Measurable) (m, now) -> 4.0, metricConfig, Time.SYSTEM);
}
@AfterEach
public void tearDown() {
mockConsumer.close();
}
@Test
@DisplayName("Init method should register metrics it receives as parameters")
public void shouldInitMetrics() {
final List<KafkaMetric> allMetrics = Arrays.asList(kafkaMetricOneHasThreadIdTag, kafkaMetricTwoHasThreadIdTag, kafkaMetricThreeHasThreadIdTag);
final List<KafkaMetric> expectedMetrics = Arrays.asList(kafkaMetricOneHasThreadIdTag, kafkaMetricTwoHasThreadIdTag, kafkaMetricThreeHasThreadIdTag);
streamsThreadMetricsDelegatingReporter.init(allMetrics);
assertEquals(expectedMetrics, mockConsumer.addedMetrics());
}
@Test
@DisplayName("Should register metrics with thread-id in tag map")
public void shouldRegisterMetrics() {
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricOneHasThreadIdTag);
assertEquals(kafkaMetricOneHasThreadIdTag, mockConsumer.addedMetrics().get(0));
}
@Test
@DisplayName("Should remove metrics")
public void shouldRemoveMetrics() {
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricOneHasThreadIdTag);
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricTwoHasThreadIdTag);
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricThreeHasThreadIdTag);
List<KafkaMetric> expected = Arrays.asList(kafkaMetricOneHasThreadIdTag, kafkaMetricTwoHasThreadIdTag, kafkaMetricThreeHasThreadIdTag);
assertEquals(expected, mockConsumer.addedMetrics());
streamsThreadMetricsDelegatingReporter.metricRemoval(kafkaMetricOneHasThreadIdTag);
expected = Arrays.asList(kafkaMetricTwoHasThreadIdTag, kafkaMetricThreeHasThreadIdTag);
assertEquals(expected, mockConsumer.addedMetrics());
}
@Test
@DisplayName("Should not register metrics without thread-id tag")
public void shouldNotRegisterMetricsWithoutThreadIdTag() {
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricWithoutThreadIdTag);
assertEquals(0, mockConsumer.addedMetrics().size());
}
}