KAFKA-17248 - KIP 1076 implementation (#17021)

Implementation of KIP-1076 to allow for adding client application metrics to the KIP-714 framework

Reviewers: Apoorv Mittal <amittal@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Matthias Sax <mjsax@apache.org>
This commit is contained in:
Bill Bejeck 2024-11-05 11:29:54 -05:00 committed by GitHub
parent 36c131ef4a
commit 4ed0a958e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 748 additions and 81 deletions

View File

@ -2552,7 +2552,7 @@ project(':streams') {
}
dependencies {
api project(':clients')
api project(path: ':clients', configuration: 'shadow')
// `org.rocksdb.Options` is part of Kafka Streams public api via `RocksDBConfigSetter`
api libs.rocksDBJni
@ -2572,6 +2572,7 @@ project(':streams') {
testImplementation libs.mockitoCore
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
testImplementation libs.junitPlatformSuiteEngine // supports suite test
testImplementation project(':group-coordinator')
testRuntimeOnly project(':streams:test-utils')
testRuntimeOnly runtimeTestLibs

View File

@ -419,6 +419,11 @@
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.tools" />
<allow pkg="org.apache.kafka.server.config" />
<allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
<allow class="org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.MetricsData"/>
<allow class="org.apache.kafka.server.telemetry.ClientTelemetry" />
<allow class="org.apache.kafka.server.telemetry.ClientTelemetryPayload" />
<allow class="org.apache.kafka.server.telemetry.ClientTelemetryReceiver" />
<allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
<allow class="org.apache.kafka.coordinator.transaction.TransactionLogConfig" />
<allow pkg="org.apache.kafka.coordinator.group" />

View File

@ -153,7 +153,6 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName;
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData;
import org.apache.kafka.common.message.ListGroupsRequestData;
@ -228,8 +227,6 @@ import org.apache.kafka.common.requests.ElectLeadersRequest;
import org.apache.kafka.common.requests.ElectLeadersResponse;
import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
@ -254,6 +251,8 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
import org.apache.kafka.common.security.token.delegation.TokenInformation;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.KafkaThread;
@ -409,6 +408,7 @@ public class KafkaAdminClient extends AdminClient {
private final boolean clientTelemetryEnabled;
private final MetadataRecoveryStrategy metadataRecoveryStrategy;
private final AdminFetchMetricsManager adminFetchMetricsManager;
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
/**
* The telemetry requests client instance id.
@ -529,6 +529,7 @@ public class KafkaAdminClient extends AdminClient {
String clientId = generateClientId(config);
ApiVersions apiVersions = new ApiVersions();
LogContext logContext = createLogContext(clientId);
Optional<ClientTelemetryReporter> clientTelemetryReporter;
try {
// Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it
@ -540,6 +541,8 @@ public class KafkaAdminClient extends AdminClient {
adminAddresses.usingBootstrapControllers());
metadataManager.update(Cluster.bootstrap(adminAddresses.addresses()), time.milliseconds());
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
clientTelemetryReporter.ifPresent(reporters::add);
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
@ -557,10 +560,13 @@ public class KafkaAdminClient extends AdminClient {
time,
1,
(int) TimeUnit.HOURS.toMillis(1),
null,
metadataManager.updater(),
(hostResolver == null) ? new DefaultHostResolver() : hostResolver);
(hostResolver == null) ? new DefaultHostResolver() : hostResolver,
null,
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient,
timeoutProcessorFactory, logContext);
timeoutProcessorFactory, logContext, clientTelemetryReporter);
} catch (Throwable exc) {
closeQuietly(metrics, "Metrics");
closeQuietly(networkClient, "NetworkClient");
@ -575,12 +581,13 @@ public class KafkaAdminClient extends AdminClient {
Time time) {
Metrics metrics = null;
String clientId = generateClientId(config);
Optional<ClientTelemetryReporter> clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
try {
metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time);
LogContext logContext = createLogContext(clientId);
return new KafkaAdminClient(config, clientId, time, metadataManager, metrics,
client, null, logContext);
client, null, logContext, clientTelemetryReporter);
} catch (Throwable exc) {
closeQuietly(metrics, "Metrics");
throw new KafkaException("Failed to create new KafkaAdminClient", exc);
@ -598,7 +605,8 @@ public class KafkaAdminClient extends AdminClient {
Metrics metrics,
KafkaClient client,
TimeoutProcessorFactory timeoutProcessorFactory,
LogContext logContext) {
LogContext logContext,
Optional<ClientTelemetryReporter> clientTelemetryReporter) {
this.clientId = clientId;
this.log = logContext.logger(KafkaAdminClient.class);
this.logContext = logContext;
@ -622,6 +630,9 @@ public class KafkaAdminClient extends AdminClient {
retryBackoffMaxMs,
CommonClientConfigs.RETRY_BACKOFF_JITTER);
this.clientTelemetryEnabled = config.getBoolean(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG);
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(this.clientId, config);
this.clientTelemetryReporter = clientTelemetryReporter;
this.clientTelemetryReporter.ifPresent(reporters::add);
this.metadataRecoveryStrategy = MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);
config.logUnused();
@ -664,6 +675,8 @@ public class KafkaAdminClient extends AdminClient {
long now = time.milliseconds();
long newHardShutdownTimeMs = now + waitTimeMs;
long prev = INVALID_SHUTDOWN_TIME;
clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
metrics.close();
while (true) {
if (hardShutdownTimeMs.compareAndSet(prev, newHardShutdownTimeMs)) {
if (prev == INVALID_SHUTDOWN_TIME) {
@ -4272,12 +4285,18 @@ public class KafkaAdminClient extends AdminClient {
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricChange(metric);
}
}
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricRemoval(metric);
}
}
@Override
@ -5050,47 +5069,16 @@ public class KafkaAdminClient extends AdminClient {
throw new IllegalArgumentException("The timeout cannot be negative.");
}
if (!clientTelemetryEnabled) {
if (clientTelemetryReporter.isEmpty()) {
throw new IllegalStateException("Telemetry is not enabled. Set config `" + AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
}
if (clientInstanceId != null) {
return clientInstanceId;
}
final long now = time.milliseconds();
final KafkaFutureImpl<Uuid> future = new KafkaFutureImpl<>();
runnable.call(new Call("getTelemetrySubscriptions", calcDeadlineMs(now, (int) timeout.toMillis()),
new LeastLoadedNodeProvider()) {
@Override
GetTelemetrySubscriptionsRequest.Builder createRequest(int timeoutMs) {
return new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true);
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
GetTelemetrySubscriptionsResponse response = (GetTelemetrySubscriptionsResponse) abstractResponse;
if (response.error() != Errors.NONE) {
future.completeExceptionally(response.error().exception());
} else {
future.complete(response.data().clientInstanceId());
}
}
@Override
void handleFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
}, now);
try {
clientInstanceId = future.get();
} catch (Exception e) {
log.error("Error occurred while fetching client instance id", e);
throw new KafkaException("Error occurred while fetching client instance id", e);
}
clientInstanceId = ClientTelemetryUtils.fetchClientInstanceId(clientTelemetryReporter.get(), timeout);
return clientInstanceId;
}

View File

@ -1434,6 +1434,46 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
delegate.resume(partitions);
}
/**
* Add the provided application metric for subscription.
* This metric will be added to this client's metrics
* that are available for subscription and sent as
* telemetry data to the broker.
* The provided metric must map to an OTLP metric data point
* type in the OpenTelemetry v1 metrics protobuf message types.
* Specifically, the metric should be one of the following:
* <ul>
* <li>
* `Sum`: Monotonic total count meter (Counter). Suitable for metrics like total number of X, e.g., total bytes sent.
* </li>
* <li>
* `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable for metrics like current value of Y, e.g., current queue count.
* </li>
* </ul>
* Metrics not matching these types are silently ignored.
* Executing this method for a previously registered metric is a benign operation and results in updating that metrics entry.
*
* @param metric The application metric to register
*/
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
delegate.registerMetricForSubscription(metric);
}
/**
* Remove the provided application metric for subscription.
* This metric is removed from this client's metrics
* and will not be available for subscription any longer.
* Executing this method with a metric that has not been registered is a
* benign operation and does not result in any action taken (no-op).
*
* @param metric The application metric to remove
*/
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
delegate.unregisterMetricFromSubscription(metric);
}
/**
* Get the set of partitions that were previously paused by a call to {@link #pause(Collection)}.
*
@ -1713,14 +1753,4 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
return delegate.updateAssignmentMetadataIfNeeded(timer);
}
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
}
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
}
}

View File

@ -641,12 +641,18 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricChange(metric);
}
}
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricRemoval(metric);
}
}
/**

View File

@ -430,12 +430,18 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricChange(metric);
}
}
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricRemoval(metric);
}
}
@Override

View File

@ -57,6 +57,7 @@ import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@ -1300,6 +1301,53 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
return Collections.unmodifiableMap(this.metrics.metrics());
}
/**
* Add the provided application metric for subscription.
* This metric will be added to this client's metrics
* that are available for subscription and sent as
* telemetry data to the broker.
* The provided metric must map to an OTLP metric data point
* type in the OpenTelemetry v1 metrics protobuf message types.
* Specifically, the metric should be one of the following:
* <ul>
* <li>
* `Sum`: Monotonic total count meter (Counter). Suitable for metrics like total number of X, e.g., total bytes sent.
* </li>
* <li>
* `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable for metrics like current value of Y, e.g., current queue count.
* </li>
* </ul>
* Metrics not matching these types are silently ignored.
* Executing this method for a previously registered metric is a benign operation and results in updating that metrics entry.
*
* @param metric The application metric to register
*/
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricChange(metric);
}
}
/**
* Remove the provided application metric for subscription.
* This metric is removed from this client's metrics
* and will not be available for subscription any longer.
* Executing this method with a metric that has not been registered is a
* benign operation and does not result in any action taken (no-op).
*
* @param metric The application metric to remove
*/
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricRemoval(metric);
}
}
/**
* Determines the client's unique client instance ID used for telemetry. This ID is unique to
* this specific client instance and will not change after it is initially generated.

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
@ -36,6 +37,7 @@ import org.apache.kafka.common.utils.Time;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
@ -71,6 +73,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
private boolean producerFenced;
private boolean sentOffsets;
private long commitCount = 0L;
private final List<KafkaMetric> addedMetrics = new ArrayList<>();
public RuntimeException initTransactionException = null;
public RuntimeException beginTransactionException = null;
@ -607,4 +610,17 @@ public class MockProducer<K, V> implements Producer<K, V> {
}
}
public List<KafkaMetric> addedMetrics() {
return Collections.unmodifiableList(addedMetrics);
}
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
addedMetrics.add(metric);
}
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
addedMetrics.remove(metric);
}
}

View File

@ -24,6 +24,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.metrics.KafkaMetric;
import java.io.Closeable;
import java.time.Duration;
@ -71,6 +72,16 @@ public interface Producer<K, V> extends Closeable {
*/
void abortTransaction() throws ProducerFencedException;
/**
* @see KafkaProducer#registerMetricForSubscription(KafkaMetric)
*/
void registerMetricForSubscription(KafkaMetric metric);
/**
* @see KafkaProducer#unregisterMetricFromSubscription(KafkaMetric)
*/
void unregisterMetricFromSubscription(KafkaMetric metric);
/**
* See {@link KafkaProducer#send(ProducerRecord)}
*/

View File

@ -121,7 +121,6 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
import org.apache.kafka.common.message.InitProducerIdResponseData;
@ -198,8 +197,6 @@ import org.apache.kafka.common.requests.DescribeUserScramCredentialsResponse;
import org.apache.kafka.common.requests.ElectLeadersResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
@ -235,6 +232,8 @@ import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@ -248,6 +247,8 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;
import org.mockito.internal.stubbing.answers.CallsRealMethods;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -300,10 +301,14 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
/**
* A unit test for KafkaAdminClient.
@ -469,6 +474,35 @@ public class KafkaAdminClientTest {
admin.close();
}
@Test
public void testExplicitlyEnableTelemetryReporter() {
Properties props = new Properties();
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "true");
try (KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props)) {
List<ClientTelemetryReporter> telemetryReporterList = admin.metrics.reporters().stream()
.filter(r -> r instanceof ClientTelemetryReporter)
.map(r -> (ClientTelemetryReporter) r)
.collect(Collectors.toList());
assertEquals(telemetryReporterList.size(), 1);
}
}
@Test
public void testTelemetryReporterIsDisabledByDefault() {
Properties props = new Properties();
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
try (KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props)) {
List<ClientTelemetryReporter> telemetryReporterList = admin.metrics.reporters().stream()
.filter(r -> r instanceof ClientTelemetryReporter)
.map(r -> (ClientTelemetryReporter) r)
.collect(Collectors.toList());
assertTrue(telemetryReporterList.isEmpty());
}
}
private static Cluster mockCluster(int numNodes, int controllerIndex) {
HashMap<Integer, Node> nodes = new HashMap<>();
for (int i = 0; i < numNodes; i++)
@ -8333,18 +8367,21 @@ public class KafkaAdminClientTest {
@Test
public void testClientInstanceId() {
try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "true")) {
Uuid expected = Uuid.randomUuid();
GetTelemetrySubscriptionsResponseData responseData =
new GetTelemetrySubscriptionsResponseData().setClientInstanceId(expected).setErrorCode(Errors.NONE.code());
try (MockedStatic<CommonClientConfigs> mockedCommonClientConfigs = mockStatic(CommonClientConfigs.class, new CallsRealMethods())) {
ClientTelemetryReporter clientTelemetryReporter = mock(ClientTelemetryReporter.class);
clientTelemetryReporter.configure(any());
mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter(anyString(), any())).thenReturn(Optional.of(clientTelemetryReporter));
env.kafkaClient().prepareResponse(
request -> request instanceof GetTelemetrySubscriptionsRequest,
new GetTelemetrySubscriptionsResponse(responseData));
try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "true")) {
ClientTelemetrySender clientTelemetrySender = mock(ClientTelemetrySender.class);
Uuid expectedUuid = Uuid.randomUuid();
when(clientTelemetryReporter.telemetrySender()).thenReturn(clientTelemetrySender);
when(clientTelemetrySender.clientInstanceId(any())).thenReturn(Optional.of(expectedUuid));
Uuid result = env.adminClient().clientInstanceId(Duration.ofSeconds(1));
assertEquals(expected, result);
Uuid result = env.adminClient().clientInstanceId(Duration.ofSeconds(1));
assertEquals(expectedUuid, result);
}
}
}

View File

@ -0,0 +1,506 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.telemetry.ClientTelemetry;
import org.apache.kafka.server.telemetry.ClientTelemetryPayload;
import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
import org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.MetricsData;
import org.apache.kafka.streams.ClientInstanceIds;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ClientMetricsCommand;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600)
@Tag("integration")
public class KafkaStreamsTelemetryIntegrationTest {
private String appId;
private String inputTopicTwoPartitions;
private String outputTopicTwoPartitions;
private String inputTopicOnePartition;
private String outputTopicOnePartition;
private Properties streamsApplicationProperties = new Properties();
private Properties streamsSecondApplicationProperties = new Properties();
private static EmbeddedKafkaCluster cluster;
private static final List<TestingMetricsInterceptingConsumer<byte[], byte[]>> INTERCEPTING_CONSUMERS = new ArrayList<>();
private static final List<TestingMetricsInterceptingAdminClient> INTERCEPTING_ADMIN_CLIENTS = new ArrayList<>();
private static final int NUM_BROKERS = 3;
private static final int FIRST_INSTANCE_CLIENT = 0;
private static final int SECOND_INSTANCE_CLIENT = 1;
private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsTelemetryIntegrationTest.class);
@BeforeAll
public static void startCluster() throws IOException {
final Properties properties = new Properties();
properties.put("metric.reporters", TelemetryPlugin.class.getName());
cluster = new EmbeddedKafkaCluster(NUM_BROKERS, properties);
cluster.start();
}
@BeforeEach
public void setUp(final TestInfo testInfo) throws InterruptedException {
appId = safeUniqueTestName(testInfo);
inputTopicTwoPartitions = appId + "-input-two";
outputTopicTwoPartitions = appId + "-output-two";
inputTopicOnePartition = appId + "-input-one";
outputTopicOnePartition = appId + "-output-one";
cluster.createTopic(inputTopicTwoPartitions, 2, 1);
cluster.createTopic(outputTopicTwoPartitions, 2, 1);
cluster.createTopic(inputTopicOnePartition, 1, 1);
cluster.createTopic(outputTopicOnePartition, 1, 1);
}
@AfterAll
public static void closeCluster() {
cluster.stop();
}
@AfterEach
public void tearDown() throws Exception {
INTERCEPTING_CONSUMERS.clear();
INTERCEPTING_ADMIN_CLIENTS.clear();
IntegrationTestUtils.purgeLocalStreamsState(streamsApplicationProperties);
if (!streamsSecondApplicationProperties.isEmpty()) {
IntegrationTestUtils.purgeLocalStreamsState(streamsSecondApplicationProperties);
}
}
@ParameterizedTest
@ValueSource(strings = {"INFO", "DEBUG", "TRACE"})
@DisplayName("End-to-end test validating metrics pushed to broker")
public void shouldPushMetricsToBroker(final String recordingLevel) throws Exception {
streamsApplicationProperties = props(true);
streamsApplicationProperties.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, recordingLevel);
final Topology topology = simpleTopology();
subscribeForStreamsMetrics();
try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final ClientInstanceIds clientInstanceIds = streams.clientInstanceIds(Duration.ofSeconds(60));
final Uuid adminInstanceId = clientInstanceIds.adminInstanceId();
final Uuid mainConsumerInstanceId = clientInstanceIds.consumerInstanceIds().entrySet().stream()
.filter(entry -> !entry.getKey().endsWith("-restore-consumer")
&& !entry.getKey().endsWith("GlobalStreamThread"))
.map(Map.Entry::getValue)
.findFirst().orElseThrow();
assertNotNull(adminInstanceId);
assertNotNull(mainConsumerInstanceId);
LOG.info("Main consumer instance id {}", mainConsumerInstanceId);
TestUtils.waitForCondition(() -> !TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId).isEmpty(),
30_000,
"Never received subscribed metrics");
final List<String> expectedMetrics = streams.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.tags().containsKey("thread-id")).map(mn -> {
final String name = mn.name().replace('-', '.');
final String group = mn.group().replace("-metrics", "").replace('-', '.');
return "org.apache.kafka." + group + "." + name;
}).sorted().collect(Collectors.toList());
final List<String> actualMetrics = new ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId));
assertEquals(expectedMetrics, actualMetrics);
TestUtils.waitForCondition(() -> !TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId).isEmpty(),
30_000,
"Never received subscribed metrics");
final List<String> actualInstanceMetrics = TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId);
final List<String> expectedInstanceMetrics = Arrays.asList("org.apache.kafka.stream.alive.stream.threads", "org.apache.kafka.stream.failed.stream.threads");
assertEquals(expectedInstanceMetrics, actualInstanceMetrics);
}
}
@ParameterizedTest
@MethodSource("singleAndMultiTaskParameters")
@DisplayName("Streams metrics should get passed to Admin and Consumer")
public void shouldPassMetrics(final String topologyType, final boolean stateUpdaterEnabled) throws Exception {
streamsApplicationProperties = props(stateUpdaterEnabled);
final Topology topology = topologyType.equals("simple") ? simpleTopology() : complexTopology();
try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<MetricName> streamsThreadMetrics = streams.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
final List<MetricName> streamsClientMetrics = streams.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.group().equals("stream-metrics")).collect(Collectors.toList());
final List<MetricName> consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList());
final List<MetricName> adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList());
assertEquals(streamsThreadMetrics.size(), consumerPassedStreamThreadMetricNames.size());
consumerPassedStreamThreadMetricNames.forEach(metricName -> assertTrue(streamsThreadMetrics.contains(metricName), "Streams metrics doesn't contain " + metricName));
assertEquals(streamsClientMetrics.size(), adminPassedStreamClientMetricNames.size());
adminPassedStreamClientMetricNames.forEach(metricName -> assertTrue(streamsClientMetrics.contains(metricName), "Client metrics doesn't contain " + metricName));
}
}
@ParameterizedTest
@MethodSource("multiTaskParameters")
@DisplayName("Correct streams metrics should get passed with dynamic membership")
public void shouldPassCorrectMetricsDynamicInstances(final boolean stateUpdaterEnabled) throws Exception {
streamsApplicationProperties = props(stateUpdaterEnabled);
streamsApplicationProperties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks1");
streamsApplicationProperties.put(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks1");
streamsSecondApplicationProperties = props(stateUpdaterEnabled);
streamsSecondApplicationProperties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks2");
streamsSecondApplicationProperties.put(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks2");
final Topology topology = complexTopology();
try (final KafkaStreams streamsOne = new KafkaStreams(topology, streamsApplicationProperties)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streamsOne);
final List<MetricName> streamsTaskMetricNames = streamsOne.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());
final List<MetricName> consumerPassedStreamTaskMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName)
.filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());
/*
With only one instance, Kafka Streams should register task metrics for all tasks 0_0, 0_1, 1_0, 1_1
*/
final List<String> streamTaskIds = getTaskIdsAsStrings(streamsOne);
final long consumerPassedTaskMetricCount = consumerPassedStreamTaskMetricNames.stream().filter(metricName -> streamTaskIds.contains(metricName.tags().get("task-id"))).count();
assertEquals(streamsTaskMetricNames.size(), consumerPassedStreamTaskMetricNames.size());
assertEquals(consumerPassedTaskMetricCount, streamsTaskMetricNames.size());
try (final KafkaStreams streamsTwo = new KafkaStreams(topology, streamsSecondApplicationProperties)) {
streamsTwo.start();
waitForCondition(() -> KafkaStreams.State.RUNNING == streamsTwo.state() && KafkaStreams.State.RUNNING == streamsOne.state(),
IntegrationTestUtils.DEFAULT_TIMEOUT,
() -> "Kafka Streams one or two never transitioned to a RUNNING state.");
/*
Now with 2 instances, the tasks will get split amongst both Kafka Streams applications
*/
final List<String> streamOneTaskIds = getTaskIdsAsStrings(streamsOne);
final List<String> streamTwoTasksIds = getTaskIdsAsStrings(streamsTwo);
final List<MetricName> streamsOneTaskMetrics = streamsOne.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());
final List<MetricName> streamsOneStateMetrics = streamsOne.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
final List<MetricName> consumerOnePassedTaskMetrics = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT)
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());
final List<MetricName> consumerOnePassedStateMetrics = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT)
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
final List<MetricName> streamsTwoTaskMetrics = streamsTwo.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());
final List<MetricName> streamsTwoStateMetrics = streamsTwo.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
final List<MetricName> consumerTwoPassedTaskMetrics = INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT)
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());
final List<MetricName> consumerTwoPassedStateMetrics = INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT)
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
/*
Confirm pre-existing KafkaStreams instance one only passes metrics for its tasks and has no metrics for previous tasks
*/
final long consumerOneStreamOneTaskCount = consumerOnePassedTaskMetrics.stream().filter(metricName -> streamOneTaskIds.contains(metricName.tags().get("task-id"))).count();
final long consumerOneStateMetricCount = consumerOnePassedStateMetrics.stream().filter(metricName -> streamOneTaskIds.contains(metricName.tags().get("task-id"))).count();
final long consumerOneTaskTwoMetricCount = consumerOnePassedTaskMetrics.stream().filter(metricName -> streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count();
final long consumerOneStateTwoMetricCount = consumerOnePassedStateMetrics.stream().filter(metricName -> streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count();
/*
Confirm new KafkaStreams instance only passes metrics for the newly assigned tasks
*/
final long consumerTwoStreamTwoTaskCount = consumerTwoPassedTaskMetrics.stream().filter(metricName -> streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count();
final long consumerTwoStateMetricCount = consumerTwoPassedStateMetrics.stream().filter(metricName -> streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count();
final long consumerTwoTaskOneMetricCount = consumerTwoPassedTaskMetrics.stream().filter(metricName -> streamOneTaskIds.contains(metricName.tags().get("task-id"))).count();
final long consumerTwoStateMetricOneCount = consumerTwoPassedStateMetrics.stream().filter(metricName -> streamOneTaskIds.contains(metricName.tags().get("task-id"))).count();
assertEquals(streamsOneTaskMetrics.size(), consumerOneStreamOneTaskCount);
assertEquals(streamsOneStateMetrics.size(), consumerOneStateMetricCount);
assertEquals(0, consumerOneTaskTwoMetricCount);
assertEquals(0, consumerOneStateTwoMetricCount);
assertEquals(streamsTwoTaskMetrics.size(), consumerTwoStreamTwoTaskCount);
assertEquals(streamsTwoStateMetrics.size(), consumerTwoStateMetricCount);
assertEquals(0, consumerTwoTaskOneMetricCount);
assertEquals(0, consumerTwoStateMetricOneCount);
}
}
}
@Test
@DisplayName("Streams metrics should not be visible in client metrics")
public void passedMetricsShouldNotLeakIntoClientMetrics() throws Exception {
streamsApplicationProperties = props(true);
final Topology topology = complexTopology();
try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<MetricName> streamsThreadMetrics = streams.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
final List<MetricName> streamsClientMetrics = streams.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.group().equals("stream-metrics")).collect(Collectors.toList());
final Map<MetricName, ? extends Metric> embeddedConsumerMetrics = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).metrics();
final Map<MetricName, ? extends Metric> embeddedAdminMetrics = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).metrics();
streamsThreadMetrics.forEach(metricName -> assertFalse(embeddedConsumerMetrics.containsKey(metricName), "Stream thread metric found in client metrics" + metricName));
streamsClientMetrics.forEach(metricName -> assertFalse(embeddedAdminMetrics.containsKey(metricName), "Stream client metric found in client metrics" + metricName));
}
}
private void subscribeForStreamsMetrics() throws Exception {
final Properties clientProps = new Properties();
clientProps.put("bootstrap.servers", cluster.bootstrapServers());
try (final ClientMetricsCommand.ClientMetricsService clientMetricsService = new ClientMetricsCommand.ClientMetricsService(clientProps)) {
final String[] metricsSubscriptionParameters = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--metrics", "org.apache.kafka.stream", "--alter", "--name", "streams-task-metrics-subscription", "--interval", "1000"};
final ClientMetricsCommand.ClientMetricsCommandOptions commandOptions = new ClientMetricsCommand.ClientMetricsCommandOptions(metricsSubscriptionParameters);
clientMetricsService.alterClientMetrics(commandOptions);
}
}
private List<String> getTaskIdsAsStrings(final KafkaStreams streams) {
return streams.metadataForLocalThreads().stream()
.flatMap(threadMeta -> threadMeta.activeTasks().stream()
.map(taskMeta -> taskMeta.taskId().toString()))
.collect(Collectors.toList());
}
private static Stream<Arguments> singleAndMultiTaskParameters() {
return Stream.of(Arguments.of("simple", true),
Arguments.of("simple", false),
Arguments.of("complex", true),
Arguments.of("complex", false));
}
private static Stream<Arguments> multiTaskParameters() {
return Stream.of(Arguments.of(true),
Arguments.of(false));
}
private Properties props(final boolean stateUpdaterEnabled) {
return props(mkObjectProperties(mkMap(mkEntry(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled))));
}
private Properties props(final Properties extraProperties) {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_CLIENT_SUPPLIER_CONFIG, TestClientSupplier.class);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.putAll(extraProperties);
return streamsConfiguration;
}
private Topology complexTopology() {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopicTwoPartitions, Consumed.with(Serdes.String(), Serdes.String()))
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count()
.toStream().to(outputTopicTwoPartitions, Produced.with(Serdes.String(), Serdes.Long()));
return builder.build();
}
private Topology simpleTopology() {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopicOnePartition, Consumed.with(Serdes.String(), Serdes.String()))
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.to(outputTopicOnePartition, Produced.with(Serdes.String(), Serdes.String()));
return builder.build();
}
public static class TestClientSupplier implements KafkaClientSupplier {
@Override
public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
return new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
}
@Override
public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
final TestingMetricsInterceptingConsumer<byte[], byte[]> consumer = new TestingMetricsInterceptingConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
INTERCEPTING_CONSUMERS.add(consumer);
return consumer;
}
@Override
public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config) {
return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
}
@Override
public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> config) {
return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
}
@Override
public Admin getAdmin(final Map<String, Object> config) {
assertTrue((Boolean) config.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG));
final TestingMetricsInterceptingAdminClient adminClient = new TestingMetricsInterceptingAdminClient(config);
INTERCEPTING_ADMIN_CLIENTS.add(adminClient);
return adminClient;
}
}
public static class TestingMetricsInterceptingConsumer<K, V> extends KafkaConsumer<K, V> {
public List<KafkaMetric> passedMetrics = new ArrayList<>();
public TestingMetricsInterceptingConsumer(final Map<String, Object> configs, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) {
super(configs, keyDeserializer, valueDeserializer);
}
@Override
public void registerMetricForSubscription(final KafkaMetric metric) {
passedMetrics.add(metric);
super.registerMetricForSubscription(metric);
}
@Override
public void unregisterMetricFromSubscription(final KafkaMetric metric) {
passedMetrics.remove(metric);
super.unregisterMetricFromSubscription(metric);
}
}
public static class TelemetryPlugin implements ClientTelemetry, MetricsReporter, ClientTelemetryReceiver {
public static final Map<Uuid, List<String>> SUBSCRIBED_METRICS = new ConcurrentHashMap<>();
public TelemetryPlugin() {
}
@Override
public void init(final List<KafkaMetric> metrics) {
}
@Override
public void metricChange(final KafkaMetric metric) {
}
@Override
public void metricRemoval(final KafkaMetric metric) {
}
@Override
public void close() {
}
@Override
public void configure(final Map<String, ?> configs) {
}
@Override
public ClientTelemetryReceiver clientReceiver() {
return this;
}
@Override
public void exportMetrics(final AuthorizableRequestContext context, final ClientTelemetryPayload payload) {
try {
final MetricsData data = MetricsData.parseFrom(payload.data());
final Uuid clientId = payload.clientInstanceId();
final List<String> metricNames = data.getResourceMetricsList()
.stream()
.map(rm -> rm.getScopeMetricsList().get(0).getMetrics(0).getName())
.sorted()
.collect(Collectors.toList());
LOG.info("Found metrics {} for clientId={}", metricNames, clientId);
SUBSCRIBED_METRICS.put(clientId, metricNames);
} catch (final Exception e) {
e.printStackTrace(System.err);
}
}
}
}

View File

@ -427,12 +427,14 @@ public class TestingMetricsInterceptingAdminClient extends AdminClient {
@Override
public void registerMetricForSubscription(final KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
passedMetrics.add(metric);
adminDelegate.registerMetricForSubscription(metric);
}
@Override
public void unregisterMetricFromSubscription(final KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
passedMetrics.remove(metric);
adminDelegate.unregisterMetricFromSubscription(metric);
}
@Override

View File

@ -53,6 +53,7 @@ import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.internals.ClientInstanceIdsImpl;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.internals.metrics.StreamsClientMetricsDelegatingReporter;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
@ -1016,6 +1017,9 @@ public class KafkaStreams implements AutoCloseable {
log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
metrics = createMetrics(applicationConfigs, time, clientId);
final StreamsClientMetricsDelegatingReporter reporter = new StreamsClientMetricsDelegatingReporter(adminClient, clientId);
metrics.addReporter(reporter);
streamsMetrics = new StreamsMetricsImpl(
metrics,
clientId,

View File

@ -1230,7 +1230,7 @@ public class StreamsConfig extends AbstractConfig {
private static final Map<String, Object> ADMIN_CLIENT_OVERRIDES;
static {
final Map<String, Object> tempAdminClientDefaultOverrides = new HashMap<>();
tempAdminClientDefaultOverrides.put(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "true");
tempAdminClientDefaultOverrides.put(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true);
ADMIN_CLIENT_OVERRIDES = Collections.unmodifiableMap(tempAdminClientDefaultOverrides);
}
@ -1811,7 +1811,6 @@ public class StreamsConfig extends AbstractConfig {
// add client id with stream client id prefix
baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-global-consumer");
baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
return baseConsumerProps;
}
@ -1857,7 +1856,6 @@ public class StreamsConfig extends AbstractConfig {
// add client id with stream client id prefix
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
return props;
}

View File

@ -48,6 +48,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
@ -87,6 +88,9 @@ import static org.apache.kafka.streams.processor.internals.ClientUtils.restoreCo
public class StreamThread extends Thread implements ProcessingThread {
private static final String THREAD_ID_SUBSTRING = "-StreamThread-";
private static final String STATE_UPDATER_ID_SUBSTRING = "-StateUpdater-";
/**
* Stream thread states are the possible states that a stream thread can be in.
* A thread must only be in one state at a time
@ -367,7 +371,8 @@ public class StreamThread extends Thread implements ProcessingThread {
final int threadIdx,
final Runnable shutdownErrorHook,
final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler) {
final String threadId = clientId + "-StreamThread-" + threadIdx;
final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx;
final String logPrefix = String.format("stream-thread [%s] ", threadId);
final LogContext logContext = new LogContext(logPrefix);
@ -473,6 +478,10 @@ public class StreamThread extends Thread implements ProcessingThread {
taskManager.setMainConsumer(mainConsumer);
referenceContainer.mainConsumer = mainConsumer;
final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, STATE_UPDATER_ID_SUBSTRING);
final StreamsThreadMetricsDelegatingReporter reporter = new StreamsThreadMetricsDelegatingReporter(mainConsumer, threadId, stateUpdaterId);
streamsMetrics.metricsRegistry().addReporter(reporter);
final StreamThread streamThread = new StreamThread(
time,
config,
@ -533,7 +542,7 @@ public class StreamThread extends Thread implements ProcessingThread {
final String clientId,
final int threadIdx) {
if (stateUpdaterEnabled) {
final String name = clientId + "-StateUpdater-" + threadIdx;
final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx;
final StateUpdater stateUpdater = new DefaultStateUpdater(
name,
streamsMetrics.metricsRegistry(),

View File

@ -383,7 +383,7 @@ public class StreamsConfigTest {
@Test
public void shouldOverrideAdminDefaultAdminClientEnableTelemetry() {
final Map<String, Object> returnedProps = streamsConfig.getAdminConfigs(clientId);
assertThat(returnedProps.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG), equalTo("true"));
assertTrue((boolean) returnedProps.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG));
}
@Test