KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) (#14909)

Part of KIP-714.

Implements ClientTelemetryReporter which manages the lifecycle for client metrics collection. The reporter also defines TelemetrySender which will be used by Network clients to send API calls to broker.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Philip Nee <pnee@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Apoorv Mittal 2023-12-05 01:14:56 +05:30 committed by GitHub
parent ddf99880d7
commit 7a6d2664cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 2261 additions and 4 deletions

View File

@ -199,6 +199,7 @@
<subpackage name="telemetry">
<allow pkg="io.opentelemetry.proto"/>
<allow pkg="org.apache.kafka.clients"/>
<allow pkg="org.apache.kafka.common" />
</subpackage>

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
public class ClientTelemetryEmitter implements MetricsEmitter {
private final Predicate<? super MetricKeyable> selector;
private final List<SinglePointMetric> emitted;
private final boolean deltaMetrics;
ClientTelemetryEmitter(Predicate<? super MetricKeyable> selector, boolean deltaMetrics) {
this.selector = selector;
this.emitted = new ArrayList<>();
this.deltaMetrics = deltaMetrics;
}
@Override
public boolean shouldEmitMetric(MetricKeyable metricKeyable) {
return selector.test(metricKeyable);
}
@Override
public boolean shouldEmitDeltaMetrics() {
return deltaMetrics;
}
@Override
public boolean emitMetric(SinglePointMetric metric) {
if (!shouldEmitMetric(metric)) {
return false;
}
emitted.add(metric);
return true;
}
@Override
public List<SinglePointMetric> emittedMetrics() {
return Collections.unmodifiableList(emitted);
}
}

View File

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.resource.v1.Resource;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.metrics.MetricsContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ClientTelemetryProvider implements Configurable {
public static final String DOMAIN = "org.apache.kafka";
// Client metrics tags
public static final String CLIENT_RACK = "client_rack";
public static final String GROUP_ID = "group_id";
public static final String GROUP_INSTANCE_ID = "group_instance_id";
public static final String GROUP_MEMBER_ID = "group_member_id";
public static final String TRANSACTIONAL_ID = "transactional_id";
private static final String PRODUCER_NAMESPACE = "kafka.producer";
private static final String CONSUMER_NAMESPACE = "kafka.consumer";
private static final Map<String, String> PRODUCER_CONFIG_MAPPING = new HashMap<>();
private static final Map<String, String> CONSUMER_CONFIG_MAPPING = new HashMap<>();
private volatile Resource resource = null;
private Map<String, ?> config = null;
// Mapping of config keys to telemetry keys. Contains only keys which can be fetched from config.
// Config like group_member_id is not present here as it is not fetched from config.
static {
PRODUCER_CONFIG_MAPPING.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ClientTelemetryProvider.TRANSACTIONAL_ID);
CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_ID_CONFIG, ClientTelemetryProvider.GROUP_ID);
CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, ClientTelemetryProvider.GROUP_INSTANCE_ID);
}
@Override
public synchronized void configure(Map<String, ?> configs) {
this.config = configs;
}
/**
* Validate that all the data required for generating correct metrics is present.
*
* @param metricsContext {@link MetricsContext}
* @return false if all the data required for generating correct metrics is missing, true
* otherwise.
*/
boolean validate(MetricsContext metricsContext) {
return ClientTelemetryUtils.validateRequiredResourceLabels(metricsContext.contextLabels());
}
/**
* Sets the metrics tags for the service or library exposing metrics. This will be called before
* {@link org.apache.kafka.common.metrics.MetricsReporter#init(List)} and may be called anytime
* after that.
*
* @param metricsContext {@link MetricsContext}
*/
synchronized void contextChange(MetricsContext metricsContext) {
final Resource.Builder resourceBuilder = Resource.newBuilder();
final String namespace = metricsContext.contextLabels().get(MetricsContext.NAMESPACE);
if (PRODUCER_NAMESPACE.equals(namespace)) {
// Add producer resource labels.
PRODUCER_CONFIG_MAPPING.forEach((configKey, telemetryKey) -> {
if (config.containsKey(configKey)) {
addAttribute(resourceBuilder, telemetryKey, String.valueOf(config.get(configKey)));
}
});
} else if (CONSUMER_NAMESPACE.equals(namespace)) {
// Add consumer resource labels.
CONSUMER_CONFIG_MAPPING.forEach((configKey, telemetryKey) -> {
if (config.containsKey(configKey)) {
addAttribute(resourceBuilder, telemetryKey, String.valueOf(config.get(configKey)));
}
});
}
// Add client rack label.
if (config.containsKey(CommonClientConfigs.CLIENT_RACK_CONFIG)) {
addAttribute(resourceBuilder, CLIENT_RACK, String.valueOf(config.get(CommonClientConfigs.CLIENT_RACK_CONFIG)));
}
resource = resourceBuilder.build();
}
/**
* Updates the resource labels/tags for the service or library exposing metrics.
*
* @param labels Map of labels to be updated.
*/
synchronized void updateLabels(Map<String, String> labels) {
final Resource.Builder resourceBuilder = resource.toBuilder();
labels.forEach((key, value) -> {
addAttribute(resourceBuilder, key, value);
});
resource = resourceBuilder.build();
}
/**
* The metrics resource for this provider which will be used to generate the metrics.
*
* @return A fully formed {@link Resource} with all the tags.
*/
Resource resource() {
return resource;
}
/**
* Domain of the active provider i.e. specifies prefix to the metrics.
*
* @return Domain in string format.
*/
String domain() {
return DOMAIN;
}
private void addAttribute(Resource.Builder resourceBuilder, String key, String value) {
final KeyValue.Builder kv = KeyValue.newBuilder()
.setKey(key)
.setValue(AnyValue.newBuilder().setStringValue(value));
resourceBuilder.addAttributes(kv);
}
}

View File

@ -0,0 +1,982 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.MetricsData;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.PushTelemetryRequestData;
import org.apache.kafka.common.message.PushTelemetryResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractRequest.Builder;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.PushTelemetryRequest;
import org.apache.kafka.common.requests.PushTelemetryResponse;
import org.apache.kafka.common.telemetry.ClientTelemetryState;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
/**
* The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle
* of the client telemetry collection process. The client telemetry reporter is responsible for
* collecting the client telemetry data and sending it to the broker.
* <p>
*
* The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is
* responsible for sending the client telemetry data to the broker. The client telemetry reporter
* will attempt to fetch the telemetry subscription information from the broker and send the
* telemetry data to the broker based on the subscription information i.e. push interval, temporality,
* compression type, etc.
* <p>
*
* The full life-cycle of the metric collection process is defined by a state machine in
* {@link ClientTelemetryState}. Each state is associated with a different set of operations.
* For example, the client telemetry reporter will attempt to fetch the telemetry subscription
* from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
* If the push operation fails, the client telemetry reporter will attempt to re-fetch the
* subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}.
* <p>
*
* In an unlikely scenario, if a bad state transition is detected, an
* {@link IllegalStateException} will be thrown.
* <p>
*
* The state transition follows the following steps in order:
* <ol>
* <li>{@link ClientTelemetryState#SUBSCRIPTION_NEEDED}</li>
* <li>{@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS}</li>
* <li>{@link ClientTelemetryState#PUSH_NEEDED}</li>
* <li>{@link ClientTelemetryState#PUSH_IN_PROGRESS}</li>
* <li>{@link ClientTelemetryState#TERMINATING_PUSH_NEEDED}</li>
* <li>{@link ClientTelemetryState#TERMINATING_PUSH_IN_PROGRESS}</li>
* <li>{@link ClientTelemetryState#TERMINATED}</li>
* </ol>
* <p>
*
* For more detail in state transition, see {@link ClientTelemetryState#validateTransition}.
*/
public class ClientTelemetryReporter implements MetricsReporter {
private static final Logger log = LoggerFactory.getLogger(ClientTelemetryReporter.class);
public static final int DEFAULT_PUSH_INTERVAL_MS = 5 * 60 * 1000;
private final ClientTelemetryProvider telemetryProvider;
private final ClientTelemetrySender clientTelemetrySender;
private final Time time;
private Map<String, Object> rawOriginalConfig;
private KafkaMetricsCollector kafkaMetricsCollector;
public ClientTelemetryReporter(Time time) {
this.time = time;
telemetryProvider = new ClientTelemetryProvider();
clientTelemetrySender = new DefaultClientTelemetrySender();
}
@SuppressWarnings("unchecked")
@Override
public synchronized void configure(Map<String, ?> configs) {
rawOriginalConfig = (Map<String, Object>) Objects.requireNonNull(configs);
}
@Override
public synchronized void contextChange(MetricsContext metricsContext) {
/*
If validation succeeds: initialize the provider, start the metric collection task,
set metrics labels for services/libraries that expose metrics.
*/
Objects.requireNonNull(rawOriginalConfig, "configure() was not called before contextChange()");
if (kafkaMetricsCollector != null) {
kafkaMetricsCollector.stop();
}
if (!telemetryProvider.validate(metricsContext)) {
log.warn("Validation failed for {} context {}, skip starting collectors. Metrics collection is disabled",
telemetryProvider.getClass(), metricsContext.contextLabels());
return;
}
if (kafkaMetricsCollector == null) {
/*
Initialize the provider only once. contextChange(..) can be called more than once,
but once it's been initialized and all necessary labels are present then we don't
re-initialize.
*/
telemetryProvider.configure(rawOriginalConfig);
}
telemetryProvider.contextChange(metricsContext);
if (kafkaMetricsCollector == null) {
initCollectors();
}
}
@Override
public void init(List<KafkaMetric> metrics) {
/*
metrics collector may not have been initialized (e.g. invalid context labels)
in which case metrics collection is disabled
*/
if (kafkaMetricsCollector != null) {
kafkaMetricsCollector.init(metrics);
}
}
/**
* Method is invoked whenever a metric is added/registered
*/
@Override
public void metricChange(KafkaMetric metric) {
/*
metrics collector may not have been initialized (e.g. invalid context labels)
in which case metrics collection is disabled
*/
if (kafkaMetricsCollector != null) {
kafkaMetricsCollector.metricChange(metric);
}
}
/**
* Method is invoked whenever a metric is removed
*/
@Override
public void metricRemoval(KafkaMetric metric) {
/*
metrics collector may not have been initialized (e.g. invalid context labels)
in which case metrics collection is disabled
*/
if (kafkaMetricsCollector != null) {
kafkaMetricsCollector.metricRemoval(metric);
}
}
@Override
public void close() {
log.debug("Stopping ClientTelemetryReporter");
try {
clientTelemetrySender.close();
} catch (Exception exception) {
log.error("Failed to close client telemetry reporter", exception);
}
}
public synchronized void updateMetricsLabels(Map<String, String> labels) {
telemetryProvider.updateLabels(labels);
}
public void initiateClose(long timeoutMs) {
log.debug("Initiate close of ClientTelemetryReporter");
try {
clientTelemetrySender.initiateClose(timeoutMs);
} catch (Exception exception) {
log.error("Failed to initiate close of client telemetry reporter", exception);
}
}
public ClientTelemetrySender telemetrySender() {
return clientTelemetrySender;
}
private void initCollectors() {
kafkaMetricsCollector = new KafkaMetricsCollector(
TelemetryMetricNamingConvention.getClientTelemetryMetricNamingStrategy(
telemetryProvider.domain()));
}
private ResourceMetrics buildMetric(Metric metric) {
return ResourceMetrics.newBuilder()
.setResource(telemetryProvider.resource())
.addScopeMetrics(ScopeMetrics.newBuilder()
.addMetrics(metric)
.build()).build();
}
// Visible for testing, only for unit tests
void metricsCollector(KafkaMetricsCollector metricsCollector) {
kafkaMetricsCollector = metricsCollector;
}
// Visible for testing, only for unit tests
MetricsCollector metricsCollector() {
return kafkaMetricsCollector;
}
// Visible for testing, only for unit tests
ClientTelemetryProvider telemetryProvider() {
return telemetryProvider;
}
class DefaultClientTelemetrySender implements ClientTelemetrySender {
/*
These are the lower and upper bounds of the jitter that we apply to the initial push
telemetry API call. This helps to avoid a flood of requests all coming at the same time.
*/
private final static double INITIAL_PUSH_JITTER_LOWER = 0.5;
private final static double INITIAL_PUSH_JITTER_UPPER = 1.5;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Condition subscriptionLoaded = lock.writeLock().newCondition();
private final Condition terminalPushInProgress = lock.writeLock().newCondition();
/*
Initial state should be subscription needed which should allow issuing first telemetry
request of get telemetry subscription.
*/
private ClientTelemetryState state = ClientTelemetryState.SUBSCRIPTION_NEEDED;
private ClientTelemetrySubscription subscription;
/*
Last time a telemetry request was made. Initialized to 0 to indicate that no request has
been made yet. Telemetry requests, get or post, should always be made after the push interval
time has elapsed.
*/
private long lastRequestMs;
/*
Interval between telemetry requests in milliseconds. Initialized to 0 to indicate that the
interval has not yet been computed. The first get request will be immediately triggered as
soon as the client is ready.
*/
private int intervalMs;
/*
Whether the client telemetry sender is enabled or not. Initialized to true to indicate that
the client telemetry sender is enabled. This is used to disable the client telemetry sender
when the client receives unrecoverable error from broker.
*/
private boolean enabled;
private DefaultClientTelemetrySender() {
enabled = true;
}
@Override
public long timeToNextUpdate(long requestTimeoutMs) {
final long nowMs = time.milliseconds();
final ClientTelemetryState localState;
final long localLastRequestMs;
final int localIntervalMs;
lock.readLock().lock();
try {
if (!enabled) {
return Integer.MAX_VALUE;
}
localState = state;
localLastRequestMs = lastRequestMs;
localIntervalMs = intervalMs;
} finally {
lock.readLock().unlock();
}
final long timeMs;
final String apiName;
final String msg;
switch (localState) {
case SUBSCRIPTION_IN_PROGRESS:
case PUSH_IN_PROGRESS:
/*
We have a network request in progress. We record the time of the request
submission, so wait that amount of the time PLUS the requestTimeout that
is provided.
*/
apiName = (localState == ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS) ? ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS.name : ApiKeys.PUSH_TELEMETRY.name;
timeMs = requestTimeoutMs;
msg = String.format("the remaining wait time for the %s network API request, as specified by %s", apiName, CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG);
break;
case TERMINATING_PUSH_IN_PROGRESS:
timeMs = Long.MAX_VALUE;
msg = String.format("the terminating push is in progress, disabling telemetry for further requests");
break;
case TERMINATING_PUSH_NEEDED:
timeMs = 0;
msg = String.format("the client should try to submit the final %s network API request ASAP before closing", ApiKeys.PUSH_TELEMETRY.name);
break;
case SUBSCRIPTION_NEEDED:
case PUSH_NEEDED:
apiName = (localState == ClientTelemetryState.SUBSCRIPTION_NEEDED) ? ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS.name : ApiKeys.PUSH_TELEMETRY.name;
long timeRemainingBeforeRequest = localLastRequestMs + localIntervalMs - nowMs;
if (timeRemainingBeforeRequest <= 0) {
timeMs = 0;
msg = String.format("the wait time before submitting the next %s network API request has elapsed", apiName);
} else {
timeMs = timeRemainingBeforeRequest;
msg = String.format("the client will wait before submitting the next %s network API request", apiName);
}
break;
default:
throw new IllegalStateException("Unknown telemetry state: " + localState);
}
log.debug("For telemetry state {}, returning the value {} ms; {}", localState, timeMs, msg);
return timeMs;
}
@Override
public Optional<AbstractRequest.Builder<?>> createRequest() {
final ClientTelemetryState localState;
final ClientTelemetrySubscription localSubscription;
lock.readLock().lock();
try {
localState = state;
localSubscription = subscription;
} finally {
lock.readLock().unlock();
}
if (localState == ClientTelemetryState.SUBSCRIPTION_NEEDED) {
return createSubscriptionRequest(localSubscription);
} else if (localState == ClientTelemetryState.PUSH_NEEDED || localState == ClientTelemetryState.TERMINATING_PUSH_NEEDED) {
return createPushRequest(localSubscription);
}
log.warn("Cannot make telemetry request as telemetry is in state: {}", localState);
return Optional.empty();
}
@Override
public void handleResponse(GetTelemetrySubscriptionsResponse response) {
final long nowMs = time.milliseconds();
final GetTelemetrySubscriptionsResponseData data = response.data();
final ClientTelemetryState oldState;
final ClientTelemetrySubscription oldSubscription;
lock.readLock().lock();
try {
oldState = state;
oldSubscription = subscription;
} finally {
lock.readLock().unlock();
}
Optional<Integer> errorIntervalMsOpt = ClientTelemetryUtils.maybeFetchErrorIntervalMs(data.errorCode(),
oldSubscription != null ? oldSubscription.pushIntervalMs() : -1);
/*
If the error code indicates that the interval ms needs to be updated as per the error
code then update the interval ms and state so that the subscription can be retried.
*/
if (errorIntervalMsOpt.isPresent()) {
/*
Update the state from SUBSCRIPTION_INR_PROGRESS to SUBSCRIPTION_NEEDED as the error
response indicates that the subscription is not valid.
*/
if (!maybeSetState(ClientTelemetryState.SUBSCRIPTION_NEEDED)) {
log.warn("Unable to transition state after failed get telemetry subscriptions from state {}", oldState);
}
updateErrorResult(errorIntervalMsOpt.get(), nowMs);
return;
}
Uuid clientInstanceId = ClientTelemetryUtils.validateClientInstanceId(data.clientInstanceId());
int intervalMs = ClientTelemetryUtils.validateIntervalMs(data.pushIntervalMs());
Predicate<? super MetricKeyable> selector = ClientTelemetryUtils.getSelectorFromRequestedMetrics(
data.requestedMetrics());
List<CompressionType> acceptedCompressionTypes = ClientTelemetryUtils.getCompressionTypesFromAcceptedList(
data.acceptedCompressionTypes());
/*
Check if the delta temporality has changed, if so, we need to reset the ledger tracking
the last value sent for each metric.
*/
if (oldSubscription != null && oldSubscription.deltaTemporality() != data.deltaTemporality()) {
log.info("Delta temporality has changed from {} to {}, resetting metric values",
oldSubscription.deltaTemporality(), data.deltaTemporality());
if (kafkaMetricsCollector != null) {
kafkaMetricsCollector.metricsReset();
}
}
ClientTelemetrySubscription clientTelemetrySubscription = new ClientTelemetrySubscription(
clientInstanceId,
data.subscriptionId(),
intervalMs,
acceptedCompressionTypes,
data.deltaTemporality(),
selector);
lock.writeLock().lock();
try {
/*
This is the case if we began termination sometime after the subscription request
was issued. We're just now getting our callback, but we need to ignore it.
*/
if (isTerminatingState()) {
return;
}
ClientTelemetryState newState;
if (selector == ClientTelemetryUtils.SELECTOR_NO_METRICS) {
/*
This is the case where no metrics are requested and/or match the filters. We need
to wait intervalMs then retry.
*/
newState = ClientTelemetryState.SUBSCRIPTION_NEEDED;
} else {
newState = ClientTelemetryState.PUSH_NEEDED;
}
// If we're terminating, don't update state or set the subscription.
if (!maybeSetState(newState)) {
return;
}
updateSubscriptionResult(clientTelemetrySubscription, nowMs);
log.info("Client telemetry registered with client instance id: {}", subscription.clientInstanceId());
} finally {
lock.writeLock().unlock();
}
}
@Override
public void handleResponse(PushTelemetryResponse response) {
final long nowMs = time.milliseconds();
final PushTelemetryResponseData data = response.data();
lock.writeLock().lock();
try {
Optional<Integer> errorIntervalMsOpt = ClientTelemetryUtils.maybeFetchErrorIntervalMs(data.errorCode(),
subscription.pushIntervalMs());
/*
If the error code indicates that the interval ms needs to be updated as per the error
code then update the interval ms and state so that the subscription can be re-fetched,
and the push retried.
*/
if (errorIntervalMsOpt.isPresent()) {
/*
This is the case when client began termination sometime after the last push request
was issued. Just getting the callback, hence need to ignore it.
*/
if (isTerminatingState()) {
return;
}
if (!maybeSetState(ClientTelemetryState.SUBSCRIPTION_NEEDED)) {
log.warn("Unable to transition state after failed push telemetry from state {}", state);
}
updateErrorResult(errorIntervalMsOpt.get(), nowMs);
return;
}
lastRequestMs = nowMs;
intervalMs = subscription.pushIntervalMs();
if (!maybeSetState(ClientTelemetryState.PUSH_NEEDED)) {
log.warn("Unable to transition state after successful push telemetry from state {}", state);
}
} finally {
lock.writeLock().unlock();
}
}
@Override
public void handleFailedGetTelemetrySubscriptionsRequest(KafkaException maybeFatalException) {
log.warn("The broker generated an error for the get telemetry network API request", maybeFatalException);
handleFailedRequest(maybeFatalException != null);
}
@Override
public void handleFailedPushTelemetryRequest(KafkaException maybeFatalException) {
log.warn("The broker generated an error for the push telemetry network API request", maybeFatalException);
handleFailedRequest(maybeFatalException != null);
}
@Override
public Optional<Uuid> clientInstanceId(Duration timeout) {
final long timeoutMs = timeout.toMillis();
if (timeoutMs < 0) {
throw new IllegalArgumentException("The timeout cannot be negative for fetching client instance id.");
}
lock.writeLock().lock();
try {
if (subscription == null) {
// If we have a non-negative timeout and no-subscription, let's wait for one to be retrieved.
log.debug("Waiting for telemetry subscription containing the client instance ID with timeoutMillis = {} ms.", timeoutMs);
try {
if (!subscriptionLoaded.await(timeoutMs, TimeUnit.MILLISECONDS)) {
log.debug("Wait for telemetry subscription elapsed; may not have actually loaded it");
}
} catch (InterruptedException e) {
throw new InterruptException(e);
}
}
if (subscription == null) {
log.debug("Client instance ID could not be retrieved with timeout {}", timeout);
return Optional.empty();
}
Uuid clientInstanceId = subscription.clientInstanceId();
if (clientInstanceId == null) {
log.info("Client instance ID was null in telemetry subscription while in state {}", state);
return Optional.empty();
}
return Optional.of(clientInstanceId);
} finally {
lock.writeLock().unlock();
}
}
@Override
public void close() {
log.debug("close telemetry sender for client telemetry reporter instance");
boolean shouldClose = false;
lock.writeLock().lock();
try {
if (state != ClientTelemetryState.TERMINATED) {
if (maybeSetState(ClientTelemetryState.TERMINATED)) {
shouldClose = true;
}
} else {
log.debug("Ignoring subsequent close");
}
} finally {
lock.writeLock().unlock();
}
if (shouldClose) {
if (kafkaMetricsCollector != null) {
kafkaMetricsCollector.stop();
}
}
}
@Override
public void initiateClose(long timeoutMs) {
log.debug("initiate close for client telemetry, check if terminal push required. Timeout {} ms.", timeoutMs);
lock.writeLock().lock();
try {
// If we never fetched a subscription, we can't really push anything.
if (lastRequestMs == 0) {
log.debug("Telemetry subscription not loaded, not attempting terminating push");
return;
}
if (state == ClientTelemetryState.SUBSCRIPTION_NEEDED) {
log.debug("Subscription not yet loaded, ignoring terminal push");
return;
}
if (isTerminatingState() || !maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED)) {
log.debug("Ignoring subsequent initiateClose");
return;
}
try {
log.info("About to wait {} ms. for terminal telemetry push to be submitted", timeoutMs);
if (!terminalPushInProgress.await(timeoutMs, TimeUnit.MILLISECONDS)) {
log.info("Wait for terminal telemetry push to be submitted has elapsed; may not have actually sent request");
}
} catch (InterruptedException e) {
log.warn("Error during client telemetry close", e);
}
} finally {
lock.writeLock().unlock();
}
}
private Optional<Builder<?>> createSubscriptionRequest(ClientTelemetrySubscription localSubscription) {
/*
If we've previously retrieved a subscription, it will contain the client instance ID
that the broker assigned. Otherwise, per KIP-714, we send a special "null" UUID to
signal to the broker that we need to have a client instance ID assigned.
*/
Uuid clientInstanceId = (localSubscription != null) ? localSubscription.clientInstanceId() : Uuid.ZERO_UUID;
lock.writeLock().lock();
try {
if (isTerminatingState()) {
return Optional.empty();
}
if (!maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)) {
return Optional.empty();
}
} finally {
lock.writeLock().unlock();
}
AbstractRequest.Builder<?> requestBuilder = new GetTelemetrySubscriptionsRequest.Builder(
new GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId), true);
return Optional.of(requestBuilder);
}
private Optional<Builder<?>> createPushRequest(ClientTelemetrySubscription localSubscription) {
if (localSubscription == null) {
log.warn("Telemetry state is {} but subscription is null; not sending telemetry", state);
if (!maybeSetState(ClientTelemetryState.SUBSCRIPTION_NEEDED)) {
log.warn("Unable to transition state after failed create push telemetry from state {}", state);
}
return Optional.empty();
}
/*
Don't send a push request if we don't have the collector initialized. Re-attempt
the push on the next interval.
*/
if (kafkaMetricsCollector == null) {
log.warn("Cannot make telemetry request as collector is not initialized");
// Update last accessed time for push request to be retried on next interval.
updateErrorResult(localSubscription.pushIntervalMs, time.milliseconds());
return Optional.empty();
}
boolean terminating;
lock.writeLock().lock();
try {
/*
We've already been terminated, or we've already issued our last push, so we
should just exit now.
*/
if (state == ClientTelemetryState.TERMINATED || state == ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS) {
return Optional.empty();
}
/*
Check the *actual* state (while locked) to make sure we're still in the state
we expect to be in.
*/
terminating = state == ClientTelemetryState.TERMINATING_PUSH_NEEDED;
if (!maybeSetState(terminating ? ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS : ClientTelemetryState.PUSH_IN_PROGRESS)) {
return Optional.empty();
}
} finally {
lock.writeLock().unlock();
}
byte[] payload;
try (MetricsEmitter emitter = new ClientTelemetryEmitter(localSubscription.selector(), localSubscription.deltaTemporality())) {
emitter.init();
kafkaMetricsCollector.collect(emitter);
payload = createPayload(emitter.emittedMetrics());
} catch (Exception e) {
log.warn("Error constructing client telemetry payload: ", e);
// Update last accessed time for push request to be retried on next interval.
updateErrorResult(localSubscription.pushIntervalMs, time.milliseconds());
return Optional.empty();
}
CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
ByteBuffer buffer = ClientTelemetryUtils.compress(payload, compressionType);
AbstractRequest.Builder<?> requestBuilder = new PushTelemetryRequest.Builder(
new PushTelemetryRequestData()
.setClientInstanceId(localSubscription.clientInstanceId())
.setSubscriptionId(localSubscription.subscriptionId())
.setTerminating(terminating)
.setCompressionType(compressionType.id)
.setMetrics(Utils.readBytes(buffer)), true);
return Optional.of(requestBuilder);
}
/**
* Updates the {@link ClientTelemetrySubscription}, {@link #intervalMs}, and
* {@link #lastRequestMs}.
* <p>
* After the update, the {@link #subscriptionLoaded} condition is signaled so any threads
* waiting on the subscription can be unblocked.
*
* @param subscription Updated subscription that will replace any current subscription
* @param timeMs Time in milliseconds representing the current time
*/
// Visible for testing
void updateSubscriptionResult(ClientTelemetrySubscription subscription, long timeMs) {
lock.writeLock().lock();
try {
this.subscription = Objects.requireNonNull(subscription);
/*
If the subscription is updated for the client, we want to attempt to spread out the push
requests between 50% and 150% of the push interval value from the broker. This helps us
to avoid the case where multiple clients are started at the same time and end up sending
all their data at the same time.
*/
if (state == ClientTelemetryState.PUSH_NEEDED) {
intervalMs = computeStaggeredIntervalMs(subscription.pushIntervalMs(), INITIAL_PUSH_JITTER_LOWER, INITIAL_PUSH_JITTER_UPPER);
} else {
intervalMs = subscription.pushIntervalMs();
}
lastRequestMs = timeMs;
log.debug("Updating subscription - subscription: {}; intervalMs: {}, lastRequestMs: {}",
subscription, intervalMs, lastRequestMs);
subscriptionLoaded.signalAll();
} finally {
lock.writeLock().unlock();
}
}
/**
* Updates the {@link #intervalMs}, {@link #lastRequestMs} and {@link #enabled}.
* <p>
* The contents of the method are guarded by the {@link #lock}.
*/
private void updateErrorResult(int intervalMs, long timeMs) {
lock.writeLock().lock();
try {
this.intervalMs = intervalMs;
lastRequestMs = timeMs;
/*
If the interval time is set to Integer.MAX_VALUE, then it means that the telemetry sender
should not send any more telemetry requests. This is used when the client received
unrecoverable error from broker.
*/
if (intervalMs == Integer.MAX_VALUE) {
enabled = false;
}
log.debug("Updating intervalMs: {}, lastRequestMs: {}", intervalMs, lastRequestMs);
} finally {
lock.writeLock().unlock();
}
}
// Visible for testing
int computeStaggeredIntervalMs(int intervalMs, double lowerBound, double upperBound) {
double rand = ThreadLocalRandom.current().nextDouble(lowerBound, upperBound);
int firstPushIntervalMs = (int) Math.round(rand * intervalMs);
log.debug("Telemetry subscription push interval value from broker was {}; to stagger requests the first push"
+ " interval is being adjusted to {}", intervalMs, firstPushIntervalMs);
return firstPushIntervalMs;
}
private boolean isTerminatingState() {
return state == ClientTelemetryState.TERMINATED || state == ClientTelemetryState.TERMINATING_PUSH_NEEDED
|| state == ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS;
}
// Visible for testing
boolean maybeSetState(ClientTelemetryState newState) {
lock.writeLock().lock();
try {
ClientTelemetryState oldState = state;
state = oldState.validateTransition(newState);
log.debug("Setting telemetry state from {} to {}", oldState, newState);
if (newState == ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS) {
terminalPushInProgress.signalAll();
}
return true;
} catch (IllegalStateException e) {
log.warn("Error updating client telemetry state, disabled telemetry", e);
enabled = false;
return false;
} finally {
lock.writeLock().unlock();
}
}
private void handleFailedRequest(boolean shouldWait) {
final long nowMs = time.milliseconds();
lock.writeLock().lock();
try {
if (isTerminatingState()) {
return;
}
if (state != ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS && state != ClientTelemetryState.PUSH_IN_PROGRESS) {
log.warn("Could not transition state after failed telemetry from state {}", state);
return;
}
/*
The broker might not support telemetry. Let's sleep for a while before trying request
again. We may disconnect from the broker and connect to a broker that supports client
telemetry.
*/
if (shouldWait) {
updateErrorResult(DEFAULT_PUSH_INTERVAL_MS, nowMs);
} else {
updateErrorResult(Integer.MAX_VALUE, nowMs);
}
if (!maybeSetState(ClientTelemetryState.SUBSCRIPTION_NEEDED)) {
log.warn("Could not transition state after failed telemetry from state {}", state);
}
} finally {
lock.writeLock().unlock();
}
}
private byte[] createPayload(List<SinglePointMetric> emittedMetrics) {
MetricsData.Builder builder = MetricsData.newBuilder();
emittedMetrics.forEach(metric -> {
Metric m = metric.builder().build();
ResourceMetrics rm = buildMetric(m);
builder.addResourceMetrics(rm);
});
return builder.build().toByteArray();
}
// Visible for testing
ClientTelemetrySubscription subscription() {
lock.readLock().lock();
try {
return subscription;
} finally {
lock.readLock().unlock();
}
}
// Visible for testing
ClientTelemetryState state() {
lock.readLock().lock();
try {
return state;
} finally {
lock.readLock().unlock();
}
}
// Visible for testing
long intervalMs() {
lock.readLock().lock();
try {
return intervalMs;
} finally {
lock.readLock().unlock();
}
}
// Visible for testing
long lastRequestMs() {
lock.readLock().lock();
try {
return lastRequestMs;
} finally {
lock.readLock().unlock();
}
}
// Visible for testing
void enabled(boolean enabled) {
lock.writeLock().lock();
try {
this.enabled = enabled;
} finally {
lock.writeLock().unlock();
}
}
// Visible for testing
boolean enabled() {
lock.readLock().lock();
try {
return enabled;
} finally {
lock.readLock().unlock();
}
}
}
/**
* Representation of the telemetry subscription that is retrieved from the cluster at startup and
* then periodically afterward, following the telemetry push.
*/
static class ClientTelemetrySubscription {
private final Uuid clientInstanceId;
private final int subscriptionId;
private final int pushIntervalMs;
private final List<CompressionType> acceptedCompressionTypes;
private final boolean deltaTemporality;
private final Predicate<? super MetricKeyable> selector;
ClientTelemetrySubscription(Uuid clientInstanceId, int subscriptionId, int pushIntervalMs,
List<CompressionType> acceptedCompressionTypes, boolean deltaTemporality,
Predicate<? super MetricKeyable> selector) {
this.clientInstanceId = clientInstanceId;
this.subscriptionId = subscriptionId;
this.pushIntervalMs = pushIntervalMs;
this.acceptedCompressionTypes = Collections.unmodifiableList(acceptedCompressionTypes);
this.deltaTemporality = deltaTemporality;
this.selector = selector;
}
public Uuid clientInstanceId() {
return clientInstanceId;
}
public int subscriptionId() {
return subscriptionId;
}
public int pushIntervalMs() {
return pushIntervalMs;
}
public List<CompressionType> acceptedCompressionTypes() {
return acceptedCompressionTypes;
}
public boolean deltaTemporality() {
return deltaTemporality;
}
public Predicate<? super MetricKeyable> selector() {
return selector;
}
@Override
public String toString() {
return new StringJoiner(", ", "ClientTelemetrySubscription{", "}")
.add("clientInstanceId=" + clientInstanceId)
.add("subscriptionId=" + subscriptionId)
.add("pushIntervalMs=" + pushIntervalMs)
.add("acceptedCompressionTypes=" + acceptedCompressionTypes)
.add("deltaTemporality=" + deltaTemporality)
.add("selector=" + selector)
.toString();
}
}
}

View File

@ -17,7 +17,10 @@
package org.apache.kafka.common.telemetry.internals;
import java.time.Duration;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.requests.AbstractRequest.Builder;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.PushTelemetryResponse;
@ -76,4 +79,36 @@ public interface ClientTelemetrySender extends AutoCloseable {
* @param kafkaException the fatal exception.
*/
void handleFailedPushTelemetryRequest(KafkaException kafkaException);
/**
* Determines the client's unique client instance ID used for telemetry. This ID is unique to
* the specific enclosing client instance and will not change after it is initially generated.
* The ID is useful for correlating client operations with telemetry sent to the broker and
* to its eventual monitoring destination(s).
* <p>
* This method waits up to <code>timeout</code> for the subscription to become available in
* order to complete the request.
*
* @param timeout The maximum time to wait for enclosing client instance to determine its
* client instance ID. The value must be non-negative. Specifying a timeout
* of zero means do not wait for the initial request to complete if it hasn't
* already.
* @throws InterruptException If the thread is interrupted while blocked.
* @throws KafkaException If an unexpected error occurs while trying to determine the client
* instance ID, though this error does not necessarily imply the
* enclosing client instance is otherwise unusable.
* @throws IllegalArgumentException If the <code>timeout</code> is negative.
*
* @return If present, optional of the client's assigned instance id used for metrics collection.
*/
Optional<Uuid> clientInstanceId(Duration timeout);
/**
* Initiates shutdown of this client. This method is called when the enclosing client instance
* is being closed. This method should not throw an exception if the client is already closed.
*
* @param timeoutMs The maximum time to wait for the client to close.
*/
void initiateClose(long timeoutMs);
}

View File

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
import io.opentelemetry.proto.metrics.v1.MetricsData;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
public class ClientTelemetryUtils {
private final static Logger log = LoggerFactory.getLogger(ClientTelemetryUtils.class);
public final static Predicate<? super MetricKeyable> SELECTOR_NO_METRICS = k -> false;
public final static Predicate<? super MetricKeyable> SELECTOR_ALL_METRICS = k -> true;
/**
* Examine the response data and handle different error code accordingly:
*
* <ul>
* <li>Invalid Request: Disable Telemetry</li>
* <li>Invalid Record: Disable Telemetry</li>
* <li>Unsupported Version: Disable Telemetry</li>
* <li>UnknownSubscription or Unsupported Compression: Retry immediately</li>
* <li>TelemetryTooLarge or ThrottlingQuotaExceeded: Retry as per next interval</li>
* </ul>
*
* @param errorCode response body error code
* @param intervalMs current push interval in milliseconds
*
* @return Optional of push interval in milliseconds
*/
public static Optional<Integer> maybeFetchErrorIntervalMs(short errorCode, int intervalMs) {
if (errorCode == Errors.NONE.code())
return Optional.empty();
int pushIntervalMs;
String reason;
Errors error = Errors.forCode(errorCode);
switch (error) {
case INVALID_REQUEST:
case INVALID_RECORD:
case UNSUPPORTED_VERSION:
pushIntervalMs = Integer.MAX_VALUE;
reason = "The broker response indicates the client sent an request that cannot be resolved"
+ " by re-trying, hence disable telemetry";
break;
case UNKNOWN_SUBSCRIPTION_ID:
case UNSUPPORTED_COMPRESSION_TYPE:
pushIntervalMs = 0;
reason = error.message();
break;
case TELEMETRY_TOO_LARGE:
case THROTTLING_QUOTA_EXCEEDED:
reason = error.message();
pushIntervalMs = (intervalMs != -1) ? intervalMs : ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS;
break;
default:
reason = "Unwrapped error code";
log.error("Error code: {}. Unmapped error for telemetry, disable telemetry.", errorCode);
pushIntervalMs = Integer.MAX_VALUE;
}
log.debug("Error code: {}, reason: {}. Push interval update to {} ms.", errorCode, reason, pushIntervalMs);
return Optional.of(pushIntervalMs);
}
public static Predicate<? super MetricKeyable> getSelectorFromRequestedMetrics(List<String> requestedMetrics) {
if (requestedMetrics == null || requestedMetrics.isEmpty()) {
log.debug("Telemetry subscription has specified no metric names; telemetry will record no metrics");
return SELECTOR_NO_METRICS;
} else if (requestedMetrics.size() == 1 && requestedMetrics.get(0) != null && requestedMetrics.get(0).equals("*")) {
log.debug("Telemetry subscription has specified a single '*' metric name; using all metrics");
return SELECTOR_ALL_METRICS;
} else {
log.debug("Telemetry subscription has specified to include only metrics that are prefixed with the following strings: {}", requestedMetrics);
return k -> requestedMetrics.stream().anyMatch(f -> k.key().name().startsWith(f));
}
}
public static List<CompressionType> getCompressionTypesFromAcceptedList(List<Byte> acceptedCompressionTypes) {
if (acceptedCompressionTypes == null || acceptedCompressionTypes.isEmpty()) {
return Collections.emptyList();
}
List<CompressionType> result = new ArrayList<>();
for (Byte compressionByte : acceptedCompressionTypes) {
int compressionId = compressionByte.intValue();
try {
CompressionType compressionType = CompressionType.forId(compressionId);
result.add(compressionType);
} catch (IllegalArgumentException e) {
log.warn("Accepted compressionByte type with ID {} is not a known compressionByte type; ignoring", compressionId, e);
}
}
return result;
}
public static Uuid validateClientInstanceId(Uuid clientInstanceId) {
if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
throw new IllegalArgumentException("clientInstanceId is not valid");
}
return clientInstanceId;
}
public static int validateIntervalMs(int intervalMs) {
if (intervalMs <= 0) {
log.warn("Telemetry subscription push interval value from broker was invalid ({}),"
+ " substituting with default value of {}", intervalMs, ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS);
return ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS;
}
log.debug("Telemetry subscription push interval value from broker: {}", intervalMs);
return intervalMs;
}
public static boolean validateResourceLabel(Map<String, ?> m, String key) {
if (!m.containsKey(key)) {
log.trace("{} does not exist in map {}", key, m);
return false;
}
if (m.get(key) == null) {
log.trace("{} is null. map {}", key, m);
return false;
}
if (!(m.get(key) instanceof String)) {
log.trace("{} is not a string. map {}", key, m);
return false;
}
String val = (String) m.get(key);
if (val.isEmpty()) {
log.trace("{} is empty string. value = {} map {}", key, val, m);
return false;
}
return true;
}
public static boolean validateRequiredResourceLabels(Map<String, String> metadata) {
return validateResourceLabel(metadata, MetricsContext.NAMESPACE);
}
public static CompressionType preferredCompressionType(List<CompressionType> acceptedCompressionTypes) {
// TODO: Support compression in client telemetry.
return CompressionType.NONE;
}
public static ByteBuffer compress(byte[] raw, CompressionType compressionType) {
// TODO: Support compression in client telemetry.
if (compressionType == CompressionType.NONE) {
return ByteBuffer.wrap(raw);
} else {
throw new UnsupportedOperationException("Compression is not supported");
}
}
public static MetricsData deserializeMetricsData(ByteBuffer serializedMetricsData) {
try {
return MetricsData.parseFrom(serializedMetricsData);
} catch (IOException e) {
throw new KafkaException("Unable to parse MetricsData payload", e);
}
}
}

View File

@ -29,10 +29,6 @@ import java.io.Closeable;
*
* <p>
*
* An {@code MetricsEmitter} is stateless and the telemetry reporter should assume that the object is
* not thread safe and thus concurrent access to either the
* {@link #shouldEmitMetric(MetricKeyable)} or {@link #emitMetric(SinglePointMetric)} should be avoided.
*
* Regarding threading, the {@link #init()} and {@link #close()} methods may be called from
* different threads and so proper care should be taken by implementations of the
* {@code MetricsCollector} interface to be thread-safe. However, the telemetry reporter must

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.function.Predicate;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ClientTelemetryEmitterTest {
private MetricKey metricKey;
private Instant now;
@BeforeEach
public void setUp() {
metricKey = new MetricKey("name", Collections.emptyMap());
now = Instant.now();
}
@Test
public void testShouldEmitMetric() {
Predicate<? super MetricKeyable> selector = ClientTelemetryUtils.getSelectorFromRequestedMetrics(
Collections.singletonList("io.test.metric"));
ClientTelemetryEmitter emitter = new ClientTelemetryEmitter(selector, true);
assertTrue(emitter.shouldEmitMetric(new MetricKey("io.test.metric")));
assertTrue(emitter.shouldEmitMetric(new MetricKey("io.test.metric1")));
assertTrue(emitter.shouldEmitMetric(new MetricKey("io.test.metric.producer.bytes")));
assertFalse(emitter.shouldEmitMetric(new MetricKey("io.test")));
assertFalse(emitter.shouldEmitMetric(new MetricKey("org.io.test.metric")));
assertTrue(emitter.shouldEmitDeltaMetrics());
}
@Test
public void testShouldEmitMetricSelectorAll() {
ClientTelemetryEmitter emitter = new ClientTelemetryEmitter(ClientTelemetryUtils.SELECTOR_ALL_METRICS, true);
assertTrue(emitter.shouldEmitMetric(new MetricKey("io.test.metric")));
assertTrue(emitter.shouldEmitMetric(new MetricKey("io.test.metric1")));
assertTrue(emitter.shouldEmitMetric(new MetricKey("io.test.metric.producer.bytes")));
assertTrue(emitter.shouldEmitMetric(new MetricKey("io.test")));
assertTrue(emitter.shouldEmitMetric(new MetricKey("org.io.test.metric")));
assertTrue(emitter.shouldEmitDeltaMetrics());
}
@Test
public void testShouldEmitMetricSelectorNone() {
ClientTelemetryEmitter emitter = new ClientTelemetryEmitter(ClientTelemetryUtils.SELECTOR_NO_METRICS, true);
assertFalse(emitter.shouldEmitMetric(new MetricKey("io.test.metric")));
assertFalse(emitter.shouldEmitMetric(new MetricKey("io.test.metric1")));
assertFalse(emitter.shouldEmitMetric(new MetricKey("io.test.metric.producer.bytes")));
assertFalse(emitter.shouldEmitMetric(new MetricKey("io.test")));
assertFalse(emitter.shouldEmitMetric(new MetricKey("org.io.test.metric")));
assertTrue(emitter.shouldEmitDeltaMetrics());
}
@Test
public void testShouldEmitDeltaMetricsFalse() {
ClientTelemetryEmitter emitter = new ClientTelemetryEmitter(ClientTelemetryUtils.SELECTOR_ALL_METRICS, false);
assertFalse(emitter.shouldEmitDeltaMetrics());
}
@Test
public void testEmitMetric() {
Predicate<? super MetricKeyable> selector = ClientTelemetryUtils.getSelectorFromRequestedMetrics(
Collections.singletonList("name"));
ClientTelemetryEmitter emitter = new ClientTelemetryEmitter(selector, true);
SinglePointMetric gauge = SinglePointMetric.gauge(metricKey, Long.valueOf(1), now);
SinglePointMetric sum = SinglePointMetric.sum(metricKey, 1.0, true, now);
assertTrue(emitter.emitMetric(gauge));
assertTrue(emitter.emitMetric(sum));
MetricKey anotherKey = new MetricKey("io.name", Collections.emptyMap());
assertFalse(emitter.emitMetric(SinglePointMetric.gauge(anotherKey, Long.valueOf(1), now)));
assertEquals(2, emitter.emittedMetrics().size());
assertEquals(Arrays.asList(gauge, sum), emitter.emittedMetrics());
}
}

View File

@ -0,0 +1,619 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
import io.opentelemetry.proto.common.v1.KeyValue;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.PushTelemetryRequestData;
import org.apache.kafka.common.message.PushTelemetryResponseData;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.PushTelemetryRequest;
import org.apache.kafka.common.requests.PushTelemetryResponse;
import org.apache.kafka.common.telemetry.ClientTelemetryState;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ClientTelemetryReporterTest {
private MockTime time;
private ClientTelemetryReporter clientTelemetryReporter;
private Map<String, Object> configs;
private MetricsContext metricsContext;
private Uuid uuid;
private ClientTelemetryReporter.ClientTelemetrySubscription subscription;
@BeforeEach
public void setUp() {
time = new MockTime();
clientTelemetryReporter = new ClientTelemetryReporter(time);
configs = new HashMap<>();
metricsContext = new KafkaMetricsContext("test");
uuid = Uuid.randomUuid();
subscription = new ClientTelemetryReporter.ClientTelemetrySubscription(uuid, 1234, 20000,
Collections.emptyList(), true, null);
}
@Test
public void testInitTelemetryReporter() {
configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client");
configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack");
clientTelemetryReporter.configure(configs);
clientTelemetryReporter.contextChange(metricsContext);
assertNotNull(clientTelemetryReporter.metricsCollector());
assertNotNull(clientTelemetryReporter.telemetryProvider().resource());
assertEquals(1, clientTelemetryReporter.telemetryProvider().resource().getAttributesCount());
assertEquals(
ClientTelemetryProvider.CLIENT_RACK, clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getKey());
assertEquals("rack", clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getValue().getStringValue());
}
@Test
public void testInitTelemetryReporterNoCollector() {
// Remove namespace config which skips the collector initialization.
MetricsContext metricsContext = Collections::emptyMap;
clientTelemetryReporter.configure(configs);
clientTelemetryReporter.contextChange(metricsContext);
assertNull(clientTelemetryReporter.metricsCollector());
}
@Test
public void testProducerLabels() {
configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client");
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
configs.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "group-instance-id");
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-id");
configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack");
clientTelemetryReporter.configure(configs);
clientTelemetryReporter.contextChange(new KafkaMetricsContext("kafka.producer"));
assertNotNull(clientTelemetryReporter.metricsCollector());
assertNotNull(clientTelemetryReporter.telemetryProvider().resource());
List<KeyValue> attributes = clientTelemetryReporter.telemetryProvider().resource().getAttributesList();
assertEquals(2, attributes.size());
attributes.forEach(attribute -> {
if (attribute.getKey().equals(ClientTelemetryProvider.CLIENT_RACK)) {
assertEquals("rack", attribute.getValue().getStringValue());
} else if (attribute.getKey().equals(ClientTelemetryProvider.TRANSACTIONAL_ID)) {
assertEquals("transaction-id", attribute.getValue().getStringValue());
}
});
}
@Test
public void testConsumerLabels() {
configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client");
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
configs.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "group-instance-id");
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-id");
configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack");
clientTelemetryReporter.configure(configs);
clientTelemetryReporter.contextChange(new KafkaMetricsContext("kafka.consumer"));
assertNotNull(clientTelemetryReporter.metricsCollector());
assertNotNull(clientTelemetryReporter.telemetryProvider().resource());
List<KeyValue> attributes = clientTelemetryReporter.telemetryProvider().resource().getAttributesList();
assertEquals(3, attributes.size());
attributes.forEach(attribute -> {
if (attribute.getKey().equals(ClientTelemetryProvider.CLIENT_RACK)) {
assertEquals("rack", attribute.getValue().getStringValue());
} else if (attribute.getKey().equals(ClientTelemetryProvider.GROUP_ID)) {
assertEquals("group-id", attribute.getValue().getStringValue());
} else if (attribute.getKey().equals(ClientTelemetryProvider.GROUP_INSTANCE_ID)) {
assertEquals("group-instance-id", attribute.getValue().getStringValue());
}
});
}
@Test
public void testTelemetryReporterClose() {
clientTelemetryReporter.close();
assertEquals(ClientTelemetryState.TERMINATED, ((ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter
.telemetrySender()).state());
}
@Test
public void testTelemetryReporterCloseMultipleTimesNoException() {
clientTelemetryReporter.close();
clientTelemetryReporter.close();
assertEquals(ClientTelemetryState.TERMINATED, ((ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter
.telemetrySender()).state());
}
@Test
public void testTelemetrySenderTimeToNextUpdate() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(0, telemetrySender.timeToNextUpdate(100));
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
assertEquals(20000, telemetrySender.timeToNextUpdate(100), 200);
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertEquals(100, telemetrySender.timeToNextUpdate(100));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
long time = telemetrySender.timeToNextUpdate(100);
assertTrue(time > 0 && time >= 0.5 * time && time <= 1.5 * time);
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
assertEquals(100, telemetrySender.timeToNextUpdate(100));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));
assertEquals(0, telemetrySender.timeToNextUpdate(100));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS));
assertEquals(Long.MAX_VALUE, telemetrySender.timeToNextUpdate(100));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATED));
assertThrows(IllegalStateException.class, () -> telemetrySender.timeToNextUpdate(100));
}
@Test
public void testCreateRequestSubscriptionNeeded() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
Optional<AbstractRequest.Builder<?>> requestOptional = telemetrySender.createRequest();
assertNotNull(requestOptional);
assertTrue(requestOptional.isPresent());
assertTrue(requestOptional.get().build() instanceof GetTelemetrySubscriptionsRequest);
GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest) requestOptional.get().build();
GetTelemetrySubscriptionsRequest expectedResult = new GetTelemetrySubscriptionsRequest.Builder(
new GetTelemetrySubscriptionsRequestData().setClientInstanceId(Uuid.ZERO_UUID), true).build();
assertEquals(expectedResult.data(), request.data());
assertEquals(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS, telemetrySender.state());
}
@Test
public void testCreateRequestSubscriptionNeededAfterExistingSubscription() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
Optional<AbstractRequest.Builder<?>> requestOptional = telemetrySender.createRequest();
assertNotNull(requestOptional);
assertTrue(requestOptional.isPresent());
assertTrue(requestOptional.get().build() instanceof GetTelemetrySubscriptionsRequest);
GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest) requestOptional.get().build();
GetTelemetrySubscriptionsRequest expectedResult = new GetTelemetrySubscriptionsRequest.Builder(
new GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscription.clientInstanceId()), true).build();
assertEquals(expectedResult.data(), request.data());
assertEquals(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS, telemetrySender.state());
}
@Test
public void testCreateRequestPushNeeded() {
clientTelemetryReporter.configure(configs);
clientTelemetryReporter.contextChange(metricsContext);
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
// create request to move state to SUBSCRIPTION_IN_PROGRESS
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
telemetrySender.createRequest();
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
Optional<AbstractRequest.Builder<?>> requestOptional = telemetrySender.createRequest();
assertNotNull(requestOptional);
assertTrue(requestOptional.isPresent());
assertTrue(requestOptional.get().build() instanceof PushTelemetryRequest);
PushTelemetryRequest request = (PushTelemetryRequest) requestOptional.get().build();
PushTelemetryRequest expectedResult = new PushTelemetryRequest.Builder(
new PushTelemetryRequestData().setClientInstanceId(subscription.clientInstanceId())
.setSubscriptionId(subscription.subscriptionId()), true).build();
assertEquals(expectedResult.data(), request.data());
assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state());
}
@Test
public void testCreateRequestPushNeededWithoutSubscription() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
// create request to move state to SUBSCRIPTION_IN_PROGRESS
telemetrySender.createRequest();
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
Optional<AbstractRequest.Builder<?>> requestOptional = telemetrySender.createRequest();
assertNotNull(requestOptional);
assertFalse(requestOptional.isPresent());
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
}
@Test
public void testCreateRequestInvalidState() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertFalse(telemetrySender.createRequest().isPresent());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
assertFalse(telemetrySender.createRequest().isPresent());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));
assertFalse(telemetrySender.createRequest().isPresent());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS));
assertFalse(telemetrySender.createRequest().isPresent());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATED));
assertFalse(telemetrySender.createRequest().isPresent());
}
@Test
public void testCreateRequestPushNoCollector() {
final long now = time.milliseconds();
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
// create request to move state to SUBSCRIPTION_IN_PROGRESS
telemetrySender.createRequest();
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
telemetrySender.updateSubscriptionResult(subscription, now);
long interval = telemetrySender.timeToNextUpdate(100);
assertTrue(interval > 0 && interval != 2000 && interval >= 0.5 * interval && interval <= 1.5 * interval);
time.sleep(1000);
Optional<AbstractRequest.Builder<?>> requestOptional = telemetrySender.createRequest();
assertFalse(requestOptional.isPresent());
assertEquals(20000, telemetrySender.timeToNextUpdate(100));
assertEquals(now + 1000, telemetrySender.lastRequestMs());
}
@Test
public void testHandleResponseGetSubscriptions() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
Uuid clientInstanceId = Uuid.randomUuid();
GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(
new GetTelemetrySubscriptionsResponseData()
.setClientInstanceId(clientInstanceId)
.setSubscriptionId(5678)
.setAcceptedCompressionTypes(Collections.singletonList(CompressionType.GZIP.id))
.setPushIntervalMs(20000)
.setRequestedMetrics(Collections.singletonList("*")));
telemetrySender.handleResponse(response);
assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state());
ClientTelemetryReporter.ClientTelemetrySubscription subscription = telemetrySender.subscription();
assertNotNull(subscription);
assertEquals(clientInstanceId, subscription.clientInstanceId());
assertEquals(5678, subscription.subscriptionId());
assertEquals(Collections.singletonList(CompressionType.GZIP), subscription.acceptedCompressionTypes());
assertEquals(20000, subscription.pushIntervalMs());
assertEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, subscription.selector());
}
@Test
public void testHandleResponseGetSubscriptionsWithoutMetrics() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
Uuid clientInstanceId = Uuid.randomUuid();
GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(
new GetTelemetrySubscriptionsResponseData()
.setClientInstanceId(clientInstanceId)
.setSubscriptionId(5678)
.setAcceptedCompressionTypes(Collections.singletonList(CompressionType.GZIP.id))
.setPushIntervalMs(20000));
telemetrySender.handleResponse(response);
// Again subscription should be required.
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
ClientTelemetryReporter.ClientTelemetrySubscription subscription = telemetrySender.subscription();
assertNotNull(subscription);
assertEquals(clientInstanceId, subscription.clientInstanceId());
assertEquals(5678, subscription.subscriptionId());
assertEquals(Collections.singletonList(CompressionType.GZIP), subscription.acceptedCompressionTypes());
assertEquals(20000, subscription.pushIntervalMs());
assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, subscription.selector());
}
@Test
public void testHandleResponseGetTelemetryErrorResponse() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
// throttling quota exceeded
GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(
new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.THROTTLING_QUOTA_EXCEEDED.code()));
telemetrySender.handleResponse(response);
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(300000, telemetrySender.intervalMs());
assertTrue(telemetrySender.enabled());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
// invalid request error
response = new GetTelemetrySubscriptionsResponse(
new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.INVALID_REQUEST.code()));
telemetrySender.handleResponse(response);
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
assertFalse(telemetrySender.enabled());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
// unsupported version error
telemetrySender.enabled(true);
response = new GetTelemetrySubscriptionsResponse(
new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code()));
telemetrySender.handleResponse(response);
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
assertFalse(telemetrySender.enabled());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
// unknown error
telemetrySender.enabled(true);
response = new GetTelemetrySubscriptionsResponse(
new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()));
telemetrySender.handleResponse(response);
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
assertFalse(telemetrySender.enabled());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
}
@Test
public void testHandleResponseSubscriptionChange() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
KafkaMetricsCollector kafkaMetricsCollector = Mockito.mock(KafkaMetricsCollector.class);
clientTelemetryReporter.metricsCollector(kafkaMetricsCollector);
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
Uuid clientInstanceId = Uuid.randomUuid();
GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(
new GetTelemetrySubscriptionsResponseData()
.setClientInstanceId(clientInstanceId)
.setSubscriptionId(15678)
.setAcceptedCompressionTypes(Collections.singletonList(CompressionType.ZSTD.id))
.setPushIntervalMs(10000)
.setDeltaTemporality(false) // Change delta temporality as well
.setRequestedMetrics(Collections.singletonList("org.apache.kafka.producer")));
telemetrySender.handleResponse(response);
assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state());
ClientTelemetryReporter.ClientTelemetrySubscription responseSubscription = telemetrySender.subscription();
assertNotNull(responseSubscription);
assertEquals(clientInstanceId, responseSubscription.clientInstanceId());
assertEquals(15678, responseSubscription.subscriptionId());
assertEquals(Collections.singletonList(CompressionType.ZSTD), responseSubscription.acceptedCompressionTypes());
assertEquals(10000, responseSubscription.pushIntervalMs());
assertFalse(responseSubscription.deltaTemporality());
assertTrue(responseSubscription.selector().test(new MetricKey("org.apache.kafka.producer")));
assertTrue(responseSubscription.selector().test(new MetricKey("org.apache.kafka.producerabc")));
assertTrue(responseSubscription.selector().test(new MetricKey("org.apache.kafka.producer.abc")));
assertFalse(responseSubscription.selector().test(new MetricKey("org.apache.kafka.produce")));
Mockito.verify(kafkaMetricsCollector, Mockito.times(1)).metricsReset();
}
@Test
public void testHandleResponsePushTelemetry() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
PushTelemetryResponse response = new PushTelemetryResponse(new PushTelemetryResponseData());
telemetrySender.handleResponse(response);
assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state());
assertEquals(subscription.pushIntervalMs(), telemetrySender.intervalMs());
assertTrue(telemetrySender.enabled());
}
@Test
public void testHandleResponsePushTelemetryErrorResponse() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
// unknown subscription id
PushTelemetryResponse response = new PushTelemetryResponse(
new PushTelemetryResponseData().setErrorCode(Errors.UNKNOWN_SUBSCRIPTION_ID.code()));
telemetrySender.handleResponse(response);
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(0, telemetrySender.intervalMs());
assertTrue(telemetrySender.enabled());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
// unsupported compression type
response = new PushTelemetryResponse(
new PushTelemetryResponseData().setErrorCode(Errors.UNSUPPORTED_COMPRESSION_TYPE.code()));
telemetrySender.handleResponse(response);
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(0, telemetrySender.intervalMs());
assertTrue(telemetrySender.enabled());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
// telemetry too large
response = new PushTelemetryResponse(
new PushTelemetryResponseData().setErrorCode(Errors.TELEMETRY_TOO_LARGE.code()));
telemetrySender.handleResponse(response);
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(20000, telemetrySender.intervalMs());
assertTrue(telemetrySender.enabled());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
// throttling quota exceeded
response = new PushTelemetryResponse(
new PushTelemetryResponseData().setErrorCode(Errors.THROTTLING_QUOTA_EXCEEDED.code()));
telemetrySender.handleResponse(response);
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(20000, telemetrySender.intervalMs());
assertTrue(telemetrySender.enabled());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
// invalid request error
response = new PushTelemetryResponse(
new PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code()));
telemetrySender.handleResponse(response);
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
assertFalse(telemetrySender.enabled());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
// unsupported version error
telemetrySender.enabled(true);
response = new PushTelemetryResponse(
new PushTelemetryResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code()));
telemetrySender.handleResponse(response);
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
assertFalse(telemetrySender.enabled());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
// invalid record
telemetrySender.enabled(true);
response = new PushTelemetryResponse(
new PushTelemetryResponseData().setErrorCode(Errors.INVALID_RECORD.code()));
telemetrySender.handleResponse(response);
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
assertFalse(telemetrySender.enabled());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
// unknown error
telemetrySender.enabled(true);
response = new PushTelemetryResponse(
new PushTelemetryResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()));
telemetrySender.handleResponse(response);
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
assertFalse(telemetrySender.enabled());
}
@Test
public void testClientInstanceId() throws InterruptedException {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
CountDownLatch lock = new CountDownLatch(2);
AtomicReference<Optional<Uuid>> clientInstanceId = new AtomicReference<>();
new Thread(() -> {
try {
clientInstanceId.set(telemetrySender.clientInstanceId(Duration.ofMillis(10000)));
} finally {
lock.countDown();
}
}).start();
new Thread(() -> {
try {
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
} finally {
lock.countDown();
}
}).start();
assertTrue(lock.await(2000, TimeUnit.MILLISECONDS));
assertNotNull(clientInstanceId.get());
assertTrue(clientInstanceId.get().isPresent());
assertEquals(uuid, clientInstanceId.get().get());
}
@Test
public void testComputeStaggeredIntervalMs() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
assertEquals(0, telemetrySender.computeStaggeredIntervalMs(0, 0.5, 1.5));
assertEquals(1, telemetrySender.computeStaggeredIntervalMs(1, 0.99, 1));
long timeMs = telemetrySender.computeStaggeredIntervalMs(1000, 0.5, 1.5);
assertTrue(timeMs >= 500 && timeMs <= 1500);
}
@AfterEach
public void tearDown() {
clientTelemetryReporter.close();
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ClientTelemetryUtilsTest {
@Test
public void testMaybeFetchErrorIntervalMs() {
assertEquals(Optional.empty(), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.NONE.code(), -1));
assertEquals(Optional.of(Integer.MAX_VALUE), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.INVALID_REQUEST.code(), -1));
assertEquals(Optional.of(Integer.MAX_VALUE), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.INVALID_RECORD.code(), -1));
assertEquals(Optional.of(0), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.UNKNOWN_SUBSCRIPTION_ID.code(), -1));
assertEquals(Optional.of(0), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.UNSUPPORTED_COMPRESSION_TYPE.code(), -1));
assertEquals(Optional.of(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.TELEMETRY_TOO_LARGE.code(), -1));
assertEquals(Optional.of(20000), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.TELEMETRY_TOO_LARGE.code(), 20000));
assertEquals(Optional.of(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.THROTTLING_QUOTA_EXCEEDED.code(), -1));
assertEquals(Optional.of(20000), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.THROTTLING_QUOTA_EXCEEDED.code(), 20000));
assertEquals(Optional.of(Integer.MAX_VALUE), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.UNKNOWN_SERVER_ERROR.code(), -1));
}
@Test
public void testGetSelectorFromRequestedMetrics() {
// no metrics selector
assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(Collections.emptyList()));
assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(null));
// all metrics selector
assertEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(Collections.singletonList("*")));
// specific metrics selector
Predicate<? super MetricKeyable> selector = ClientTelemetryUtils.getSelectorFromRequestedMetrics(Arrays.asList("metric1", "metric2"));
assertNotEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, selector);
assertNotEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, selector);
assertTrue(selector.test(new MetricKey("metric1.test")));
assertTrue(selector.test(new MetricKey("metric2.test")));
assertFalse(selector.test(new MetricKey("test.metric1")));
assertFalse(selector.test(new MetricKey("test.metric2")));
}
@Test
public void testGetCompressionTypesFromAcceptedList() {
assertEquals(0, ClientTelemetryUtils.getCompressionTypesFromAcceptedList(null).size());
assertEquals(0, ClientTelemetryUtils.getCompressionTypesFromAcceptedList(Collections.emptyList()).size());
List<Byte> compressionTypes = new ArrayList<>();
compressionTypes.add(CompressionType.GZIP.id);
compressionTypes.add(CompressionType.LZ4.id);
compressionTypes.add(CompressionType.SNAPPY.id);
compressionTypes.add(CompressionType.ZSTD.id);
compressionTypes.add(CompressionType.NONE.id);
compressionTypes.add((byte) -1);
// should take the first compression type
assertEquals(5, ClientTelemetryUtils.getCompressionTypesFromAcceptedList(compressionTypes).size());
}
@Test
public void testValidateClientInstanceId() {
assertThrows(IllegalArgumentException.class, () -> ClientTelemetryUtils.validateClientInstanceId(null));
assertThrows(IllegalArgumentException.class, () -> ClientTelemetryUtils.validateClientInstanceId(Uuid.ZERO_UUID));
Uuid uuid = Uuid.randomUuid();
assertEquals(uuid, ClientTelemetryUtils.validateClientInstanceId(uuid));
}
@ParameterizedTest
@ValueSource(ints = {300_000, Integer.MAX_VALUE - 1, Integer.MAX_VALUE})
public void testValidateIntervalMsValid(int pushIntervalMs) {
assertEquals(pushIntervalMs, ClientTelemetryUtils.validateIntervalMs(pushIntervalMs));
}
@ParameterizedTest
@ValueSource(ints = {-1, 0})
public void testValidateIntervalMsInvalid(int pushIntervalMs) {
assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, ClientTelemetryUtils.validateIntervalMs(pushIntervalMs));
}
@Test
public void testPreferredCompressionType() {
assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Collections.emptyList()));
assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(null));
}
}