labels) {
+ final Resource.Builder resourceBuilder = resource.toBuilder();
+ labels.forEach((key, value) -> {
+ addAttribute(resourceBuilder, key, value);
+ });
+ resource = resourceBuilder.build();
+ }
+
+ /**
+ * The metrics resource for this provider which will be used to generate the metrics.
+ *
+ * @return A fully formed {@link Resource} with all the tags.
+ */
+ Resource resource() {
+ return resource;
+ }
+
+ /**
+ * Domain of the active provider i.e. specifies prefix to the metrics.
+ *
+ * @return Domain in string format.
+ */
+ String domain() {
+ return DOMAIN;
+ }
+
+ private void addAttribute(Resource.Builder resourceBuilder, String key, String value) {
+ final KeyValue.Builder kv = KeyValue.newBuilder()
+ .setKey(key)
+ .setValue(AnyValue.newBuilder().setStringValue(value));
+ resourceBuilder.addAttributes(kv);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
new file mode 100644
index 00000000000..08ae1d31ed4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -0,0 +1,982 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ *
+ *
+ * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the broker and send the
+ * telemetry data to the broker based on the subscription information i.e. push interval, temporality,
+ * compression type, etc.
+ *
+ *
+ * The full life-cycle of the metric collection process is defined by a state machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set of operations.
+ * For example, the client telemetry reporter will attempt to fetch the telemetry subscription
+ * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to re-fetch the
+ * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ *
+ *
+ * In an unlikely scenario, if a bad state transition is detected, an
+ * {@link IllegalStateException} will be thrown.
+ *
+ *
+ * The state transition follows the following steps in order:
+ *
+ * - {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}
+ * - {@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS}
+ * - {@link ClientTelemetryState#PUSH_NEEDED}
+ * - {@link ClientTelemetryState#PUSH_IN_PROGRESS}
+ * - {@link ClientTelemetryState#TERMINATING_PUSH_NEEDED}
+ * - {@link ClientTelemetryState#TERMINATING_PUSH_IN_PROGRESS}
+ * - {@link ClientTelemetryState#TERMINATED}
+ *
+ *
+ *
+ * For more detail in state transition, see {@link ClientTelemetryState#validateTransition}.
+ */
+public class ClientTelemetryReporter implements MetricsReporter {
+
+ private static final Logger log = LoggerFactory.getLogger(ClientTelemetryReporter.class);
+ public static final int DEFAULT_PUSH_INTERVAL_MS = 5 * 60 * 1000;
+
+ private final ClientTelemetryProvider telemetryProvider;
+ private final ClientTelemetrySender clientTelemetrySender;
+ private final Time time;
+
+ private Map rawOriginalConfig;
+ private KafkaMetricsCollector kafkaMetricsCollector;
+
+ public ClientTelemetryReporter(Time time) {
+ this.time = time;
+ telemetryProvider = new ClientTelemetryProvider();
+ clientTelemetrySender = new DefaultClientTelemetrySender();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public synchronized void configure(Map configs) {
+ rawOriginalConfig = (Map) Objects.requireNonNull(configs);
+ }
+
+ @Override
+ public synchronized void contextChange(MetricsContext metricsContext) {
+ /*
+ If validation succeeds: initialize the provider, start the metric collection task,
+ set metrics labels for services/libraries that expose metrics.
+ */
+ Objects.requireNonNull(rawOriginalConfig, "configure() was not called before contextChange()");
+ if (kafkaMetricsCollector != null) {
+ kafkaMetricsCollector.stop();
+ }
+
+ if (!telemetryProvider.validate(metricsContext)) {
+ log.warn("Validation failed for {} context {}, skip starting collectors. Metrics collection is disabled",
+ telemetryProvider.getClass(), metricsContext.contextLabels());
+ return;
+ }
+
+ if (kafkaMetricsCollector == null) {
+ /*
+ Initialize the provider only once. contextChange(..) can be called more than once,
+ but once it's been initialized and all necessary labels are present then we don't
+ re-initialize.
+ */
+ telemetryProvider.configure(rawOriginalConfig);
+ }
+
+ telemetryProvider.contextChange(metricsContext);
+
+ if (kafkaMetricsCollector == null) {
+ initCollectors();
+ }
+ }
+
+ @Override
+ public void init(List metrics) {
+ /*
+ metrics collector may not have been initialized (e.g. invalid context labels)
+ in which case metrics collection is disabled
+ */
+ if (kafkaMetricsCollector != null) {
+ kafkaMetricsCollector.init(metrics);
+ }
+ }
+
+ /**
+ * Method is invoked whenever a metric is added/registered
+ */
+ @Override
+ public void metricChange(KafkaMetric metric) {
+ /*
+ metrics collector may not have been initialized (e.g. invalid context labels)
+ in which case metrics collection is disabled
+ */
+ if (kafkaMetricsCollector != null) {
+ kafkaMetricsCollector.metricChange(metric);
+ }
+ }
+
+ /**
+ * Method is invoked whenever a metric is removed
+ */
+ @Override
+ public void metricRemoval(KafkaMetric metric) {
+ /*
+ metrics collector may not have been initialized (e.g. invalid context labels)
+ in which case metrics collection is disabled
+ */
+ if (kafkaMetricsCollector != null) {
+ kafkaMetricsCollector.metricRemoval(metric);
+ }
+ }
+
+ @Override
+ public void close() {
+ log.debug("Stopping ClientTelemetryReporter");
+ try {
+ clientTelemetrySender.close();
+ } catch (Exception exception) {
+ log.error("Failed to close client telemetry reporter", exception);
+ }
+ }
+
+ public synchronized void updateMetricsLabels(Map labels) {
+ telemetryProvider.updateLabels(labels);
+ }
+
+ public void initiateClose(long timeoutMs) {
+ log.debug("Initiate close of ClientTelemetryReporter");
+ try {
+ clientTelemetrySender.initiateClose(timeoutMs);
+ } catch (Exception exception) {
+ log.error("Failed to initiate close of client telemetry reporter", exception);
+ }
+ }
+
+ public ClientTelemetrySender telemetrySender() {
+ return clientTelemetrySender;
+ }
+
+ private void initCollectors() {
+ kafkaMetricsCollector = new KafkaMetricsCollector(
+ TelemetryMetricNamingConvention.getClientTelemetryMetricNamingStrategy(
+ telemetryProvider.domain()));
+ }
+
+ private ResourceMetrics buildMetric(Metric metric) {
+ return ResourceMetrics.newBuilder()
+ .setResource(telemetryProvider.resource())
+ .addScopeMetrics(ScopeMetrics.newBuilder()
+ .addMetrics(metric)
+ .build()).build();
+ }
+
+ // Visible for testing, only for unit tests
+ void metricsCollector(KafkaMetricsCollector metricsCollector) {
+ kafkaMetricsCollector = metricsCollector;
+ }
+
+ // Visible for testing, only for unit tests
+ MetricsCollector metricsCollector() {
+ return kafkaMetricsCollector;
+ }
+
+ // Visible for testing, only for unit tests
+ ClientTelemetryProvider telemetryProvider() {
+ return telemetryProvider;
+ }
+
+ class DefaultClientTelemetrySender implements ClientTelemetrySender {
+
+ /*
+ These are the lower and upper bounds of the jitter that we apply to the initial push
+ telemetry API call. This helps to avoid a flood of requests all coming at the same time.
+ */
+ private final static double INITIAL_PUSH_JITTER_LOWER = 0.5;
+ private final static double INITIAL_PUSH_JITTER_UPPER = 1.5;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Condition subscriptionLoaded = lock.writeLock().newCondition();
+ private final Condition terminalPushInProgress = lock.writeLock().newCondition();
+ /*
+ Initial state should be subscription needed which should allow issuing first telemetry
+ request of get telemetry subscription.
+ */
+ private ClientTelemetryState state = ClientTelemetryState.SUBSCRIPTION_NEEDED;
+
+ private ClientTelemetrySubscription subscription;
+
+ /*
+ Last time a telemetry request was made. Initialized to 0 to indicate that no request has
+ been made yet. Telemetry requests, get or post, should always be made after the push interval
+ time has elapsed.
+ */
+ private long lastRequestMs;
+ /*
+ Interval between telemetry requests in milliseconds. Initialized to 0 to indicate that the
+ interval has not yet been computed. The first get request will be immediately triggered as
+ soon as the client is ready.
+ */
+ private int intervalMs;
+ /*
+ Whether the client telemetry sender is enabled or not. Initialized to true to indicate that
+ the client telemetry sender is enabled. This is used to disable the client telemetry sender
+ when the client receives unrecoverable error from broker.
+ */
+ private boolean enabled;
+
+ private DefaultClientTelemetrySender() {
+ enabled = true;
+ }
+
+ @Override
+ public long timeToNextUpdate(long requestTimeoutMs) {
+ final long nowMs = time.milliseconds();
+ final ClientTelemetryState localState;
+ final long localLastRequestMs;
+ final int localIntervalMs;
+
+ lock.readLock().lock();
+ try {
+ if (!enabled) {
+ return Integer.MAX_VALUE;
+ }
+
+ localState = state;
+ localLastRequestMs = lastRequestMs;
+ localIntervalMs = intervalMs;
+ } finally {
+ lock.readLock().unlock();
+ }
+
+ final long timeMs;
+ final String apiName;
+ final String msg;
+
+ switch (localState) {
+ case SUBSCRIPTION_IN_PROGRESS:
+ case PUSH_IN_PROGRESS:
+ /*
+ We have a network request in progress. We record the time of the request
+ submission, so wait that amount of the time PLUS the requestTimeout that
+ is provided.
+ */
+ apiName = (localState == ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS) ? ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS.name : ApiKeys.PUSH_TELEMETRY.name;
+ timeMs = requestTimeoutMs;
+ msg = String.format("the remaining wait time for the %s network API request, as specified by %s", apiName, CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG);
+ break;
+ case TERMINATING_PUSH_IN_PROGRESS:
+ timeMs = Long.MAX_VALUE;
+ msg = String.format("the terminating push is in progress, disabling telemetry for further requests");
+ break;
+ case TERMINATING_PUSH_NEEDED:
+ timeMs = 0;
+ msg = String.format("the client should try to submit the final %s network API request ASAP before closing", ApiKeys.PUSH_TELEMETRY.name);
+ break;
+ case SUBSCRIPTION_NEEDED:
+ case PUSH_NEEDED:
+ apiName = (localState == ClientTelemetryState.SUBSCRIPTION_NEEDED) ? ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS.name : ApiKeys.PUSH_TELEMETRY.name;
+ long timeRemainingBeforeRequest = localLastRequestMs + localIntervalMs - nowMs;
+ if (timeRemainingBeforeRequest <= 0) {
+ timeMs = 0;
+ msg = String.format("the wait time before submitting the next %s network API request has elapsed", apiName);
+ } else {
+ timeMs = timeRemainingBeforeRequest;
+ msg = String.format("the client will wait before submitting the next %s network API request", apiName);
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unknown telemetry state: " + localState);
+ }
+
+ log.debug("For telemetry state {}, returning the value {} ms; {}", localState, timeMs, msg);
+ return timeMs;
+ }
+
+ @Override
+ public Optional> createRequest() {
+ final ClientTelemetryState localState;
+ final ClientTelemetrySubscription localSubscription;
+
+ lock.readLock().lock();
+ try {
+ localState = state;
+ localSubscription = subscription;
+ } finally {
+ lock.readLock().unlock();
+ }
+
+ if (localState == ClientTelemetryState.SUBSCRIPTION_NEEDED) {
+ return createSubscriptionRequest(localSubscription);
+ } else if (localState == ClientTelemetryState.PUSH_NEEDED || localState == ClientTelemetryState.TERMINATING_PUSH_NEEDED) {
+ return createPushRequest(localSubscription);
+ }
+
+ log.warn("Cannot make telemetry request as telemetry is in state: {}", localState);
+ return Optional.empty();
+ }
+
+ @Override
+ public void handleResponse(GetTelemetrySubscriptionsResponse response) {
+ final long nowMs = time.milliseconds();
+ final GetTelemetrySubscriptionsResponseData data = response.data();
+
+ final ClientTelemetryState oldState;
+ final ClientTelemetrySubscription oldSubscription;
+ lock.readLock().lock();
+ try {
+ oldState = state;
+ oldSubscription = subscription;
+ } finally {
+ lock.readLock().unlock();
+ }
+
+ Optional errorIntervalMsOpt = ClientTelemetryUtils.maybeFetchErrorIntervalMs(data.errorCode(),
+ oldSubscription != null ? oldSubscription.pushIntervalMs() : -1);
+ /*
+ If the error code indicates that the interval ms needs to be updated as per the error
+ code then update the interval ms and state so that the subscription can be retried.
+ */
+ if (errorIntervalMsOpt.isPresent()) {
+ /*
+ Update the state from SUBSCRIPTION_INR_PROGRESS to SUBSCRIPTION_NEEDED as the error
+ response indicates that the subscription is not valid.
+ */
+ if (!maybeSetState(ClientTelemetryState.SUBSCRIPTION_NEEDED)) {
+ log.warn("Unable to transition state after failed get telemetry subscriptions from state {}", oldState);
+ }
+ updateErrorResult(errorIntervalMsOpt.get(), nowMs);
+ return;
+ }
+
+ Uuid clientInstanceId = ClientTelemetryUtils.validateClientInstanceId(data.clientInstanceId());
+ int intervalMs = ClientTelemetryUtils.validateIntervalMs(data.pushIntervalMs());
+ Predicate super MetricKeyable> selector = ClientTelemetryUtils.getSelectorFromRequestedMetrics(
+ data.requestedMetrics());
+ List acceptedCompressionTypes = ClientTelemetryUtils.getCompressionTypesFromAcceptedList(
+ data.acceptedCompressionTypes());
+
+ /*
+ Check if the delta temporality has changed, if so, we need to reset the ledger tracking
+ the last value sent for each metric.
+ */
+ if (oldSubscription != null && oldSubscription.deltaTemporality() != data.deltaTemporality()) {
+ log.info("Delta temporality has changed from {} to {}, resetting metric values",
+ oldSubscription.deltaTemporality(), data.deltaTemporality());
+ if (kafkaMetricsCollector != null) {
+ kafkaMetricsCollector.metricsReset();
+ }
+ }
+
+ ClientTelemetrySubscription clientTelemetrySubscription = new ClientTelemetrySubscription(
+ clientInstanceId,
+ data.subscriptionId(),
+ intervalMs,
+ acceptedCompressionTypes,
+ data.deltaTemporality(),
+ selector);
+
+ lock.writeLock().lock();
+ try {
+ /*
+ This is the case if we began termination sometime after the subscription request
+ was issued. We're just now getting our callback, but we need to ignore it.
+ */
+ if (isTerminatingState()) {
+ return;
+ }
+
+ ClientTelemetryState newState;
+ if (selector == ClientTelemetryUtils.SELECTOR_NO_METRICS) {
+ /*
+ This is the case where no metrics are requested and/or match the filters. We need
+ to wait intervalMs then retry.
+ */
+ newState = ClientTelemetryState.SUBSCRIPTION_NEEDED;
+ } else {
+ newState = ClientTelemetryState.PUSH_NEEDED;
+ }
+
+ // If we're terminating, don't update state or set the subscription.
+ if (!maybeSetState(newState)) {
+ return;
+ }
+
+ updateSubscriptionResult(clientTelemetrySubscription, nowMs);
+ log.info("Client telemetry registered with client instance id: {}", subscription.clientInstanceId());
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void handleResponse(PushTelemetryResponse response) {
+ final long nowMs = time.milliseconds();
+ final PushTelemetryResponseData data = response.data();
+
+ lock.writeLock().lock();
+ try {
+ Optional errorIntervalMsOpt = ClientTelemetryUtils.maybeFetchErrorIntervalMs(data.errorCode(),
+ subscription.pushIntervalMs());
+ /*
+ If the error code indicates that the interval ms needs to be updated as per the error
+ code then update the interval ms and state so that the subscription can be re-fetched,
+ and the push retried.
+ */
+ if (errorIntervalMsOpt.isPresent()) {
+ /*
+ This is the case when client began termination sometime after the last push request
+ was issued. Just getting the callback, hence need to ignore it.
+ */
+ if (isTerminatingState()) {
+ return;
+ }
+
+ if (!maybeSetState(ClientTelemetryState.SUBSCRIPTION_NEEDED)) {
+ log.warn("Unable to transition state after failed push telemetry from state {}", state);
+ }
+ updateErrorResult(errorIntervalMsOpt.get(), nowMs);
+ return;
+ }
+
+ lastRequestMs = nowMs;
+ intervalMs = subscription.pushIntervalMs();
+ if (!maybeSetState(ClientTelemetryState.PUSH_NEEDED)) {
+ log.warn("Unable to transition state after successful push telemetry from state {}", state);
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void handleFailedGetTelemetrySubscriptionsRequest(KafkaException maybeFatalException) {
+ log.warn("The broker generated an error for the get telemetry network API request", maybeFatalException);
+ handleFailedRequest(maybeFatalException != null);
+ }
+
+ @Override
+ public void handleFailedPushTelemetryRequest(KafkaException maybeFatalException) {
+ log.warn("The broker generated an error for the push telemetry network API request", maybeFatalException);
+ handleFailedRequest(maybeFatalException != null);
+ }
+
+ @Override
+ public Optional clientInstanceId(Duration timeout) {
+ final long timeoutMs = timeout.toMillis();
+ if (timeoutMs < 0) {
+ throw new IllegalArgumentException("The timeout cannot be negative for fetching client instance id.");
+ }
+
+ lock.writeLock().lock();
+ try {
+ if (subscription == null) {
+ // If we have a non-negative timeout and no-subscription, let's wait for one to be retrieved.
+ log.debug("Waiting for telemetry subscription containing the client instance ID with timeoutMillis = {} ms.", timeoutMs);
+ try {
+ if (!subscriptionLoaded.await(timeoutMs, TimeUnit.MILLISECONDS)) {
+ log.debug("Wait for telemetry subscription elapsed; may not have actually loaded it");
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptException(e);
+ }
+ }
+
+ if (subscription == null) {
+ log.debug("Client instance ID could not be retrieved with timeout {}", timeout);
+ return Optional.empty();
+ }
+
+ Uuid clientInstanceId = subscription.clientInstanceId();
+ if (clientInstanceId == null) {
+ log.info("Client instance ID was null in telemetry subscription while in state {}", state);
+ return Optional.empty();
+ }
+
+ return Optional.of(clientInstanceId);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void close() {
+ log.debug("close telemetry sender for client telemetry reporter instance");
+
+ boolean shouldClose = false;
+ lock.writeLock().lock();
+ try {
+ if (state != ClientTelemetryState.TERMINATED) {
+ if (maybeSetState(ClientTelemetryState.TERMINATED)) {
+ shouldClose = true;
+ }
+ } else {
+ log.debug("Ignoring subsequent close");
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ if (shouldClose) {
+ if (kafkaMetricsCollector != null) {
+ kafkaMetricsCollector.stop();
+ }
+ }
+ }
+
+ @Override
+ public void initiateClose(long timeoutMs) {
+ log.debug("initiate close for client telemetry, check if terminal push required. Timeout {} ms.", timeoutMs);
+
+ lock.writeLock().lock();
+ try {
+ // If we never fetched a subscription, we can't really push anything.
+ if (lastRequestMs == 0) {
+ log.debug("Telemetry subscription not loaded, not attempting terminating push");
+ return;
+ }
+
+ if (state == ClientTelemetryState.SUBSCRIPTION_NEEDED) {
+ log.debug("Subscription not yet loaded, ignoring terminal push");
+ return;
+ }
+
+ if (isTerminatingState() || !maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED)) {
+ log.debug("Ignoring subsequent initiateClose");
+ return;
+ }
+
+ try {
+ log.info("About to wait {} ms. for terminal telemetry push to be submitted", timeoutMs);
+ if (!terminalPushInProgress.await(timeoutMs, TimeUnit.MILLISECONDS)) {
+ log.info("Wait for terminal telemetry push to be submitted has elapsed; may not have actually sent request");
+ }
+ } catch (InterruptedException e) {
+ log.warn("Error during client telemetry close", e);
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private Optional> createSubscriptionRequest(ClientTelemetrySubscription localSubscription) {
+ /*
+ If we've previously retrieved a subscription, it will contain the client instance ID
+ that the broker assigned. Otherwise, per KIP-714, we send a special "null" UUID to
+ signal to the broker that we need to have a client instance ID assigned.
+ */
+ Uuid clientInstanceId = (localSubscription != null) ? localSubscription.clientInstanceId() : Uuid.ZERO_UUID;
+
+ lock.writeLock().lock();
+ try {
+ if (isTerminatingState()) {
+ return Optional.empty();
+ }
+
+ if (!maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)) {
+ return Optional.empty();
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ AbstractRequest.Builder> requestBuilder = new GetTelemetrySubscriptionsRequest.Builder(
+ new GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId), true);
+ return Optional.of(requestBuilder);
+ }
+
+ private Optional> createPushRequest(ClientTelemetrySubscription localSubscription) {
+ if (localSubscription == null) {
+ log.warn("Telemetry state is {} but subscription is null; not sending telemetry", state);
+ if (!maybeSetState(ClientTelemetryState.SUBSCRIPTION_NEEDED)) {
+ log.warn("Unable to transition state after failed create push telemetry from state {}", state);
+ }
+ return Optional.empty();
+ }
+
+ /*
+ Don't send a push request if we don't have the collector initialized. Re-attempt
+ the push on the next interval.
+ */
+ if (kafkaMetricsCollector == null) {
+ log.warn("Cannot make telemetry request as collector is not initialized");
+ // Update last accessed time for push request to be retried on next interval.
+ updateErrorResult(localSubscription.pushIntervalMs, time.milliseconds());
+ return Optional.empty();
+ }
+
+ boolean terminating;
+ lock.writeLock().lock();
+ try {
+ /*
+ We've already been terminated, or we've already issued our last push, so we
+ should just exit now.
+ */
+ if (state == ClientTelemetryState.TERMINATED || state == ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS) {
+ return Optional.empty();
+ }
+
+ /*
+ Check the *actual* state (while locked) to make sure we're still in the state
+ we expect to be in.
+ */
+ terminating = state == ClientTelemetryState.TERMINATING_PUSH_NEEDED;
+ if (!maybeSetState(terminating ? ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS : ClientTelemetryState.PUSH_IN_PROGRESS)) {
+ return Optional.empty();
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ byte[] payload;
+ try (MetricsEmitter emitter = new ClientTelemetryEmitter(localSubscription.selector(), localSubscription.deltaTemporality())) {
+ emitter.init();
+ kafkaMetricsCollector.collect(emitter);
+ payload = createPayload(emitter.emittedMetrics());
+ } catch (Exception e) {
+ log.warn("Error constructing client telemetry payload: ", e);
+ // Update last accessed time for push request to be retried on next interval.
+ updateErrorResult(localSubscription.pushIntervalMs, time.milliseconds());
+ return Optional.empty();
+ }
+
+ CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
+ ByteBuffer buffer = ClientTelemetryUtils.compress(payload, compressionType);
+
+ AbstractRequest.Builder> requestBuilder = new PushTelemetryRequest.Builder(
+ new PushTelemetryRequestData()
+ .setClientInstanceId(localSubscription.clientInstanceId())
+ .setSubscriptionId(localSubscription.subscriptionId())
+ .setTerminating(terminating)
+ .setCompressionType(compressionType.id)
+ .setMetrics(Utils.readBytes(buffer)), true);
+
+ return Optional.of(requestBuilder);
+ }
+
+ /**
+ * Updates the {@link ClientTelemetrySubscription}, {@link #intervalMs}, and
+ * {@link #lastRequestMs}.
+ *
+ * After the update, the {@link #subscriptionLoaded} condition is signaled so any threads
+ * waiting on the subscription can be unblocked.
+ *
+ * @param subscription Updated subscription that will replace any current subscription
+ * @param timeMs Time in milliseconds representing the current time
+ */
+ // Visible for testing
+ void updateSubscriptionResult(ClientTelemetrySubscription subscription, long timeMs) {
+ lock.writeLock().lock();
+ try {
+ this.subscription = Objects.requireNonNull(subscription);
+ /*
+ If the subscription is updated for the client, we want to attempt to spread out the push
+ requests between 50% and 150% of the push interval value from the broker. This helps us
+ to avoid the case where multiple clients are started at the same time and end up sending
+ all their data at the same time.
+ */
+ if (state == ClientTelemetryState.PUSH_NEEDED) {
+ intervalMs = computeStaggeredIntervalMs(subscription.pushIntervalMs(), INITIAL_PUSH_JITTER_LOWER, INITIAL_PUSH_JITTER_UPPER);
+ } else {
+ intervalMs = subscription.pushIntervalMs();
+ }
+ lastRequestMs = timeMs;
+
+ log.debug("Updating subscription - subscription: {}; intervalMs: {}, lastRequestMs: {}",
+ subscription, intervalMs, lastRequestMs);
+ subscriptionLoaded.signalAll();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Updates the {@link #intervalMs}, {@link #lastRequestMs} and {@link #enabled}.
+ *
+ * The contents of the method are guarded by the {@link #lock}.
+ */
+ private void updateErrorResult(int intervalMs, long timeMs) {
+ lock.writeLock().lock();
+ try {
+ this.intervalMs = intervalMs;
+ lastRequestMs = timeMs;
+ /*
+ If the interval time is set to Integer.MAX_VALUE, then it means that the telemetry sender
+ should not send any more telemetry requests. This is used when the client received
+ unrecoverable error from broker.
+ */
+ if (intervalMs == Integer.MAX_VALUE) {
+ enabled = false;
+ }
+
+ log.debug("Updating intervalMs: {}, lastRequestMs: {}", intervalMs, lastRequestMs);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ // Visible for testing
+ int computeStaggeredIntervalMs(int intervalMs, double lowerBound, double upperBound) {
+ double rand = ThreadLocalRandom.current().nextDouble(lowerBound, upperBound);
+ int firstPushIntervalMs = (int) Math.round(rand * intervalMs);
+
+ log.debug("Telemetry subscription push interval value from broker was {}; to stagger requests the first push"
+ + " interval is being adjusted to {}", intervalMs, firstPushIntervalMs);
+ return firstPushIntervalMs;
+ }
+
+ private boolean isTerminatingState() {
+ return state == ClientTelemetryState.TERMINATED || state == ClientTelemetryState.TERMINATING_PUSH_NEEDED
+ || state == ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS;
+ }
+
+ // Visible for testing
+ boolean maybeSetState(ClientTelemetryState newState) {
+ lock.writeLock().lock();
+ try {
+ ClientTelemetryState oldState = state;
+ state = oldState.validateTransition(newState);
+ log.debug("Setting telemetry state from {} to {}", oldState, newState);
+
+ if (newState == ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS) {
+ terminalPushInProgress.signalAll();
+ }
+ return true;
+ } catch (IllegalStateException e) {
+ log.warn("Error updating client telemetry state, disabled telemetry", e);
+ enabled = false;
+ return false;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private void handleFailedRequest(boolean shouldWait) {
+ final long nowMs = time.milliseconds();
+ lock.writeLock().lock();
+ try {
+ if (isTerminatingState()) {
+ return;
+ }
+ if (state != ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS && state != ClientTelemetryState.PUSH_IN_PROGRESS) {
+ log.warn("Could not transition state after failed telemetry from state {}", state);
+ return;
+ }
+
+ /*
+ The broker might not support telemetry. Let's sleep for a while before trying request
+ again. We may disconnect from the broker and connect to a broker that supports client
+ telemetry.
+ */
+ if (shouldWait) {
+ updateErrorResult(DEFAULT_PUSH_INTERVAL_MS, nowMs);
+ } else {
+ updateErrorResult(Integer.MAX_VALUE, nowMs);
+ }
+
+ if (!maybeSetState(ClientTelemetryState.SUBSCRIPTION_NEEDED)) {
+ log.warn("Could not transition state after failed telemetry from state {}", state);
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private byte[] createPayload(List emittedMetrics) {
+ MetricsData.Builder builder = MetricsData.newBuilder();
+ emittedMetrics.forEach(metric -> {
+ Metric m = metric.builder().build();
+ ResourceMetrics rm = buildMetric(m);
+ builder.addResourceMetrics(rm);
+ });
+ return builder.build().toByteArray();
+ }
+
+ // Visible for testing
+ ClientTelemetrySubscription subscription() {
+ lock.readLock().lock();
+ try {
+ return subscription;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ // Visible for testing
+ ClientTelemetryState state() {
+ lock.readLock().lock();
+ try {
+ return state;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ // Visible for testing
+ long intervalMs() {
+ lock.readLock().lock();
+ try {
+ return intervalMs;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ // Visible for testing
+ long lastRequestMs() {
+ lock.readLock().lock();
+ try {
+ return lastRequestMs;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ // Visible for testing
+ void enabled(boolean enabled) {
+ lock.writeLock().lock();
+ try {
+ this.enabled = enabled;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ // Visible for testing
+ boolean enabled() {
+ lock.readLock().lock();
+ try {
+ return enabled;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+ }
+
+ /**
+ * Representation of the telemetry subscription that is retrieved from the cluster at startup and
+ * then periodically afterward, following the telemetry push.
+ */
+ static class ClientTelemetrySubscription {
+
+ private final Uuid clientInstanceId;
+ private final int subscriptionId;
+ private final int pushIntervalMs;
+ private final List acceptedCompressionTypes;
+ private final boolean deltaTemporality;
+ private final Predicate super MetricKeyable> selector;
+
+ ClientTelemetrySubscription(Uuid clientInstanceId, int subscriptionId, int pushIntervalMs,
+ List acceptedCompressionTypes, boolean deltaTemporality,
+ Predicate super MetricKeyable> selector) {
+ this.clientInstanceId = clientInstanceId;
+ this.subscriptionId = subscriptionId;
+ this.pushIntervalMs = pushIntervalMs;
+ this.acceptedCompressionTypes = Collections.unmodifiableList(acceptedCompressionTypes);
+ this.deltaTemporality = deltaTemporality;
+ this.selector = selector;
+ }
+
+ public Uuid clientInstanceId() {
+ return clientInstanceId;
+ }
+
+ public int subscriptionId() {
+ return subscriptionId;
+ }
+
+ public int pushIntervalMs() {
+ return pushIntervalMs;
+ }
+
+ public List acceptedCompressionTypes() {
+ return acceptedCompressionTypes;
+ }
+
+ public boolean deltaTemporality() {
+ return deltaTemporality;
+ }
+
+ public Predicate super MetricKeyable> selector() {
+ return selector;
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", "ClientTelemetrySubscription{", "}")
+ .add("clientInstanceId=" + clientInstanceId)
+ .add("subscriptionId=" + subscriptionId)
+ .add("pushIntervalMs=" + pushIntervalMs)
+ .add("acceptedCompressionTypes=" + acceptedCompressionTypes)
+ .add("deltaTemporality=" + deltaTemporality)
+ .add("selector=" + selector)
+ .toString();
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java
index b8499a3af33..e6bb9d40e73 100644
--- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java
+++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java
@@ -17,7 +17,10 @@
package org.apache.kafka.common.telemetry.internals;
+import java.time.Duration;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.requests.AbstractRequest.Builder;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.PushTelemetryResponse;
@@ -76,4 +79,36 @@ public interface ClientTelemetrySender extends AutoCloseable {
* @param kafkaException the fatal exception.
*/
void handleFailedPushTelemetryRequest(KafkaException kafkaException);
+
+ /**
+ * Determines the client's unique client instance ID used for telemetry. This ID is unique to
+ * the specific enclosing client instance and will not change after it is initially generated.
+ * The ID is useful for correlating client operations with telemetry sent to the broker and
+ * to its eventual monitoring destination(s).
+ *
+ * This method waits up to timeout
for the subscription to become available in
+ * order to complete the request.
+ *
+ * @param timeout The maximum time to wait for enclosing client instance to determine its
+ * client instance ID. The value must be non-negative. Specifying a timeout
+ * of zero means do not wait for the initial request to complete if it hasn't
+ * already.
+ * @throws InterruptException If the thread is interrupted while blocked.
+ * @throws KafkaException If an unexpected error occurs while trying to determine the client
+ * instance ID, though this error does not necessarily imply the
+ * enclosing client instance is otherwise unusable.
+ * @throws IllegalArgumentException If the timeout
is negative.
+ *
+ * @return If present, optional of the client's assigned instance id used for metrics collection.
+ */
+
+ Optional clientInstanceId(Duration timeout);
+
+ /**
+ * Initiates shutdown of this client. This method is called when the enclosing client instance
+ * is being closed. This method should not throw an exception if the client is already closed.
+ *
+ * @param timeoutMs The maximum time to wait for the client to close.
+ */
+ void initiateClose(long timeoutMs);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
new file mode 100644
index 00000000000..91bc6f23b46
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+public class ClientTelemetryUtils {
+
+ private final static Logger log = LoggerFactory.getLogger(ClientTelemetryUtils.class);
+
+ public final static Predicate super MetricKeyable> SELECTOR_NO_METRICS = k -> false;
+
+ public final static Predicate super MetricKeyable> SELECTOR_ALL_METRICS = k -> true;
+
+ /**
+ * Examine the response data and handle different error code accordingly:
+ *
+ *
+ * - Invalid Request: Disable Telemetry
+ * - Invalid Record: Disable Telemetry
+ * - Unsupported Version: Disable Telemetry
+ * - UnknownSubscription or Unsupported Compression: Retry immediately
+ * - TelemetryTooLarge or ThrottlingQuotaExceeded: Retry as per next interval
+ *
+ *
+ * @param errorCode response body error code
+ * @param intervalMs current push interval in milliseconds
+ *
+ * @return Optional of push interval in milliseconds
+ */
+ public static Optional maybeFetchErrorIntervalMs(short errorCode, int intervalMs) {
+ if (errorCode == Errors.NONE.code())
+ return Optional.empty();
+
+ int pushIntervalMs;
+ String reason;
+
+ Errors error = Errors.forCode(errorCode);
+ switch (error) {
+ case INVALID_REQUEST:
+ case INVALID_RECORD:
+ case UNSUPPORTED_VERSION:
+ pushIntervalMs = Integer.MAX_VALUE;
+ reason = "The broker response indicates the client sent an request that cannot be resolved"
+ + " by re-trying, hence disable telemetry";
+ break;
+ case UNKNOWN_SUBSCRIPTION_ID:
+ case UNSUPPORTED_COMPRESSION_TYPE:
+ pushIntervalMs = 0;
+ reason = error.message();
+ break;
+ case TELEMETRY_TOO_LARGE:
+ case THROTTLING_QUOTA_EXCEEDED:
+ reason = error.message();
+ pushIntervalMs = (intervalMs != -1) ? intervalMs : ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS;
+ break;
+ default:
+ reason = "Unwrapped error code";
+ log.error("Error code: {}. Unmapped error for telemetry, disable telemetry.", errorCode);
+ pushIntervalMs = Integer.MAX_VALUE;
+ }
+
+ log.debug("Error code: {}, reason: {}. Push interval update to {} ms.", errorCode, reason, pushIntervalMs);
+ return Optional.of(pushIntervalMs);
+ }
+
+ public static Predicate super MetricKeyable> getSelectorFromRequestedMetrics(List requestedMetrics) {
+ if (requestedMetrics == null || requestedMetrics.isEmpty()) {
+ log.debug("Telemetry subscription has specified no metric names; telemetry will record no metrics");
+ return SELECTOR_NO_METRICS;
+ } else if (requestedMetrics.size() == 1 && requestedMetrics.get(0) != null && requestedMetrics.get(0).equals("*")) {
+ log.debug("Telemetry subscription has specified a single '*' metric name; using all metrics");
+ return SELECTOR_ALL_METRICS;
+ } else {
+ log.debug("Telemetry subscription has specified to include only metrics that are prefixed with the following strings: {}", requestedMetrics);
+ return k -> requestedMetrics.stream().anyMatch(f -> k.key().name().startsWith(f));
+ }
+ }
+
+ public static List getCompressionTypesFromAcceptedList(List acceptedCompressionTypes) {
+ if (acceptedCompressionTypes == null || acceptedCompressionTypes.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List result = new ArrayList<>();
+ for (Byte compressionByte : acceptedCompressionTypes) {
+ int compressionId = compressionByte.intValue();
+ try {
+ CompressionType compressionType = CompressionType.forId(compressionId);
+ result.add(compressionType);
+ } catch (IllegalArgumentException e) {
+ log.warn("Accepted compressionByte type with ID {} is not a known compressionByte type; ignoring", compressionId, e);
+ }
+ }
+ return result;
+ }
+
+ public static Uuid validateClientInstanceId(Uuid clientInstanceId) {
+ if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+ throw new IllegalArgumentException("clientInstanceId is not valid");
+ }
+
+ return clientInstanceId;
+ }
+
+ public static int validateIntervalMs(int intervalMs) {
+ if (intervalMs <= 0) {
+ log.warn("Telemetry subscription push interval value from broker was invalid ({}),"
+ + " substituting with default value of {}", intervalMs, ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS);
+ return ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS;
+ }
+
+ log.debug("Telemetry subscription push interval value from broker: {}", intervalMs);
+ return intervalMs;
+ }
+
+ public static boolean validateResourceLabel(Map m, String key) {
+ if (!m.containsKey(key)) {
+ log.trace("{} does not exist in map {}", key, m);
+ return false;
+ }
+
+ if (m.get(key) == null) {
+ log.trace("{} is null. map {}", key, m);
+ return false;
+ }
+
+ if (!(m.get(key) instanceof String)) {
+ log.trace("{} is not a string. map {}", key, m);
+ return false;
+ }
+
+ String val = (String) m.get(key);
+ if (val.isEmpty()) {
+ log.trace("{} is empty string. value = {} map {}", key, val, m);
+ return false;
+ }
+ return true;
+ }
+
+ public static boolean validateRequiredResourceLabels(Map metadata) {
+ return validateResourceLabel(metadata, MetricsContext.NAMESPACE);
+ }
+
+ public static CompressionType preferredCompressionType(List acceptedCompressionTypes) {
+ // TODO: Support compression in client telemetry.
+ return CompressionType.NONE;
+ }
+
+ public static ByteBuffer compress(byte[] raw, CompressionType compressionType) {
+ // TODO: Support compression in client telemetry.
+ if (compressionType == CompressionType.NONE) {
+ return ByteBuffer.wrap(raw);
+ } else {
+ throw new UnsupportedOperationException("Compression is not supported");
+ }
+ }
+
+ public static MetricsData deserializeMetricsData(ByteBuffer serializedMetricsData) {
+ try {
+ return MetricsData.parseFrom(serializedMetricsData);
+ } catch (IOException e) {
+ throw new KafkaException("Unable to parse MetricsData payload", e);
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java
index e9d978d6aa5..6c8655d21b0 100644
--- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java
+++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java
@@ -29,10 +29,6 @@ import java.io.Closeable;
*
*
*
- * An {@code MetricsEmitter} is stateless and the telemetry reporter should assume that the object is
- * not thread safe and thus concurrent access to either the
- * {@link #shouldEmitMetric(MetricKeyable)} or {@link #emitMetric(SinglePointMetric)} should be avoided.
- *
* Regarding threading, the {@link #init()} and {@link #close()} methods may be called from
* different threads and so proper care should be taken by implementations of the
* {@code MetricsCollector} interface to be thread-safe. However, the telemetry reporter must
diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryEmitterTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryEmitterTest.java
new file mode 100644
index 00000000000..32fd2a5ad93
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryEmitterTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.function.Predicate;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientTelemetryEmitterTest {
+
+ private MetricKey metricKey;
+ private Instant now;
+
+ @BeforeEach
+ public void setUp() {
+ metricKey = new MetricKey("name", Collections.emptyMap());
+ now = Instant.now();
+ }
+
+ @Test
+ public void testShouldEmitMetric() {
+ Predicate super MetricKeyable> selector = ClientTelemetryUtils.getSelectorFromRequestedMetrics(
+ Collections.singletonList("io.test.metric"));
+ ClientTelemetryEmitter emitter = new ClientTelemetryEmitter(selector, true);
+
+ assertTrue(emitter.shouldEmitMetric(new MetricKey("io.test.metric")));
+ assertTrue(emitter.shouldEmitMetric(new MetricKey("io.test.metric1")));
+ assertTrue(emitter.shouldEmitMetric(new MetricKey("io.test.metric.producer.bytes")));
+ assertFalse(emitter.shouldEmitMetric(new MetricKey("io.test")));
+ assertFalse(emitter.shouldEmitMetric(new MetricKey("org.io.test.metric")));
+ assertTrue(emitter.shouldEmitDeltaMetrics());
+ }
+
+ @Test
+ public void testShouldEmitMetricSelectorAll() {
+ ClientTelemetryEmitter emitter = new ClientTelemetryEmitter(ClientTelemetryUtils.SELECTOR_ALL_METRICS, true);
+
+ assertTrue(emitter.shouldEmitMetric(new MetricKey("io.test.metric")));
+ assertTrue(emitter.shouldEmitMetric(new MetricKey("io.test.metric1")));
+ assertTrue(emitter.shouldEmitMetric(new MetricKey("io.test.metric.producer.bytes")));
+ assertTrue(emitter.shouldEmitMetric(new MetricKey("io.test")));
+ assertTrue(emitter.shouldEmitMetric(new MetricKey("org.io.test.metric")));
+ assertTrue(emitter.shouldEmitDeltaMetrics());
+ }
+
+ @Test
+ public void testShouldEmitMetricSelectorNone() {
+ ClientTelemetryEmitter emitter = new ClientTelemetryEmitter(ClientTelemetryUtils.SELECTOR_NO_METRICS, true);
+
+ assertFalse(emitter.shouldEmitMetric(new MetricKey("io.test.metric")));
+ assertFalse(emitter.shouldEmitMetric(new MetricKey("io.test.metric1")));
+ assertFalse(emitter.shouldEmitMetric(new MetricKey("io.test.metric.producer.bytes")));
+ assertFalse(emitter.shouldEmitMetric(new MetricKey("io.test")));
+ assertFalse(emitter.shouldEmitMetric(new MetricKey("org.io.test.metric")));
+ assertTrue(emitter.shouldEmitDeltaMetrics());
+ }
+
+ @Test
+ public void testShouldEmitDeltaMetricsFalse() {
+ ClientTelemetryEmitter emitter = new ClientTelemetryEmitter(ClientTelemetryUtils.SELECTOR_ALL_METRICS, false);
+ assertFalse(emitter.shouldEmitDeltaMetrics());
+ }
+
+ @Test
+ public void testEmitMetric() {
+ Predicate super MetricKeyable> selector = ClientTelemetryUtils.getSelectorFromRequestedMetrics(
+ Collections.singletonList("name"));
+ ClientTelemetryEmitter emitter = new ClientTelemetryEmitter(selector, true);
+
+ SinglePointMetric gauge = SinglePointMetric.gauge(metricKey, Long.valueOf(1), now);
+ SinglePointMetric sum = SinglePointMetric.sum(metricKey, 1.0, true, now);
+ assertTrue(emitter.emitMetric(gauge));
+ assertTrue(emitter.emitMetric(sum));
+
+ MetricKey anotherKey = new MetricKey("io.name", Collections.emptyMap());
+ assertFalse(emitter.emitMetric(SinglePointMetric.gauge(anotherKey, Long.valueOf(1), now)));
+
+ assertEquals(2, emitter.emittedMetrics().size());
+ assertEquals(Arrays.asList(gauge, sum), emitter.emittedMetrics());
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
new file mode 100644
index 00000000000..64d8a3af33f
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
@@ -0,0 +1,619 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.KeyValue;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientTelemetryReporterTest {
+
+ private MockTime time;
+ private ClientTelemetryReporter clientTelemetryReporter;
+ private Map configs;
+ private MetricsContext metricsContext;
+ private Uuid uuid;
+ private ClientTelemetryReporter.ClientTelemetrySubscription subscription;
+
+ @BeforeEach
+ public void setUp() {
+ time = new MockTime();
+ clientTelemetryReporter = new ClientTelemetryReporter(time);
+ configs = new HashMap<>();
+ metricsContext = new KafkaMetricsContext("test");
+ uuid = Uuid.randomUuid();
+ subscription = new ClientTelemetryReporter.ClientTelemetrySubscription(uuid, 1234, 20000,
+ Collections.emptyList(), true, null);
+ }
+
+ @Test
+ public void testInitTelemetryReporter() {
+ configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client");
+ configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack");
+
+ clientTelemetryReporter.configure(configs);
+ clientTelemetryReporter.contextChange(metricsContext);
+ assertNotNull(clientTelemetryReporter.metricsCollector());
+ assertNotNull(clientTelemetryReporter.telemetryProvider().resource());
+ assertEquals(1, clientTelemetryReporter.telemetryProvider().resource().getAttributesCount());
+ assertEquals(
+ ClientTelemetryProvider.CLIENT_RACK, clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getKey());
+ assertEquals("rack", clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getValue().getStringValue());
+ }
+
+ @Test
+ public void testInitTelemetryReporterNoCollector() {
+ // Remove namespace config which skips the collector initialization.
+ MetricsContext metricsContext = Collections::emptyMap;
+
+ clientTelemetryReporter.configure(configs);
+ clientTelemetryReporter.contextChange(metricsContext);
+ assertNull(clientTelemetryReporter.metricsCollector());
+ }
+
+ @Test
+ public void testProducerLabels() {
+ configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client");
+ configs.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
+ configs.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "group-instance-id");
+ configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-id");
+ configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack");
+
+ clientTelemetryReporter.configure(configs);
+ clientTelemetryReporter.contextChange(new KafkaMetricsContext("kafka.producer"));
+ assertNotNull(clientTelemetryReporter.metricsCollector());
+ assertNotNull(clientTelemetryReporter.telemetryProvider().resource());
+
+ List attributes = clientTelemetryReporter.telemetryProvider().resource().getAttributesList();
+ assertEquals(2, attributes.size());
+ attributes.forEach(attribute -> {
+ if (attribute.getKey().equals(ClientTelemetryProvider.CLIENT_RACK)) {
+ assertEquals("rack", attribute.getValue().getStringValue());
+ } else if (attribute.getKey().equals(ClientTelemetryProvider.TRANSACTIONAL_ID)) {
+ assertEquals("transaction-id", attribute.getValue().getStringValue());
+ }
+ });
+ }
+
+ @Test
+ public void testConsumerLabels() {
+ configs.put(CommonClientConfigs.CLIENT_ID_CONFIG, "test-client");
+ configs.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
+ configs.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "group-instance-id");
+ configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-id");
+ configs.put(CommonClientConfigs.CLIENT_RACK_CONFIG, "rack");
+
+ clientTelemetryReporter.configure(configs);
+ clientTelemetryReporter.contextChange(new KafkaMetricsContext("kafka.consumer"));
+ assertNotNull(clientTelemetryReporter.metricsCollector());
+ assertNotNull(clientTelemetryReporter.telemetryProvider().resource());
+
+ List attributes = clientTelemetryReporter.telemetryProvider().resource().getAttributesList();
+ assertEquals(3, attributes.size());
+ attributes.forEach(attribute -> {
+ if (attribute.getKey().equals(ClientTelemetryProvider.CLIENT_RACK)) {
+ assertEquals("rack", attribute.getValue().getStringValue());
+ } else if (attribute.getKey().equals(ClientTelemetryProvider.GROUP_ID)) {
+ assertEquals("group-id", attribute.getValue().getStringValue());
+ } else if (attribute.getKey().equals(ClientTelemetryProvider.GROUP_INSTANCE_ID)) {
+ assertEquals("group-instance-id", attribute.getValue().getStringValue());
+ }
+ });
+ }
+
+ @Test
+ public void testTelemetryReporterClose() {
+ clientTelemetryReporter.close();
+ assertEquals(ClientTelemetryState.TERMINATED, ((ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter
+ .telemetrySender()).state());
+ }
+
+ @Test
+ public void testTelemetryReporterCloseMultipleTimesNoException() {
+ clientTelemetryReporter.close();
+ clientTelemetryReporter.close();
+ assertEquals(ClientTelemetryState.TERMINATED, ((ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter
+ .telemetrySender()).state());
+ }
+
+ @Test
+ public void testTelemetrySenderTimeToNextUpdate() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(0, telemetrySender.timeToNextUpdate(100));
+
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+ assertEquals(20000, telemetrySender.timeToNextUpdate(100), 200);
+
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ assertEquals(100, telemetrySender.timeToNextUpdate(100));
+
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+ long time = telemetrySender.timeToNextUpdate(100);
+ assertTrue(time > 0 && time >= 0.5 * time && time <= 1.5 * time);
+
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
+ assertEquals(100, telemetrySender.timeToNextUpdate(100));
+
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));
+ assertEquals(0, telemetrySender.timeToNextUpdate(100));
+
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS));
+ assertEquals(Long.MAX_VALUE, telemetrySender.timeToNextUpdate(100));
+
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATED));
+ assertThrows(IllegalStateException.class, () -> telemetrySender.timeToNextUpdate(100));
+ }
+
+ @Test
+ public void testCreateRequestSubscriptionNeeded() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+
+ Optional> requestOptional = telemetrySender.createRequest();
+ assertNotNull(requestOptional);
+ assertTrue(requestOptional.isPresent());
+ assertTrue(requestOptional.get().build() instanceof GetTelemetrySubscriptionsRequest);
+ GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest) requestOptional.get().build();
+
+ GetTelemetrySubscriptionsRequest expectedResult = new GetTelemetrySubscriptionsRequest.Builder(
+ new GetTelemetrySubscriptionsRequestData().setClientInstanceId(Uuid.ZERO_UUID), true).build();
+
+ assertEquals(expectedResult.data(), request.data());
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS, telemetrySender.state());
+ }
+
+ @Test
+ public void testCreateRequestSubscriptionNeededAfterExistingSubscription() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+
+ Optional> requestOptional = telemetrySender.createRequest();
+ assertNotNull(requestOptional);
+ assertTrue(requestOptional.isPresent());
+ assertTrue(requestOptional.get().build() instanceof GetTelemetrySubscriptionsRequest);
+ GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest) requestOptional.get().build();
+
+ GetTelemetrySubscriptionsRequest expectedResult = new GetTelemetrySubscriptionsRequest.Builder(
+ new GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscription.clientInstanceId()), true).build();
+
+ assertEquals(expectedResult.data(), request.data());
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS, telemetrySender.state());
+ }
+
+ @Test
+ public void testCreateRequestPushNeeded() {
+ clientTelemetryReporter.configure(configs);
+ clientTelemetryReporter.contextChange(metricsContext);
+
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ // create request to move state to SUBSCRIPTION_IN_PROGRESS
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+ telemetrySender.createRequest();
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
+ Optional> requestOptional = telemetrySender.createRequest();
+ assertNotNull(requestOptional);
+ assertTrue(requestOptional.isPresent());
+ assertTrue(requestOptional.get().build() instanceof PushTelemetryRequest);
+ PushTelemetryRequest request = (PushTelemetryRequest) requestOptional.get().build();
+
+ PushTelemetryRequest expectedResult = new PushTelemetryRequest.Builder(
+ new PushTelemetryRequestData().setClientInstanceId(subscription.clientInstanceId())
+ .setSubscriptionId(subscription.subscriptionId()), true).build();
+
+ assertEquals(expectedResult.data(), request.data());
+ assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state());
+ }
+
+ @Test
+ public void testCreateRequestPushNeededWithoutSubscription() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ // create request to move state to SUBSCRIPTION_IN_PROGRESS
+ telemetrySender.createRequest();
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
+ Optional> requestOptional = telemetrySender.createRequest();
+ assertNotNull(requestOptional);
+ assertFalse(requestOptional.isPresent());
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ }
+
+ @Test
+ public void testCreateRequestInvalidState() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ assertFalse(telemetrySender.createRequest().isPresent());
+
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
+ assertFalse(telemetrySender.createRequest().isPresent());
+
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));
+ assertFalse(telemetrySender.createRequest().isPresent());
+
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS));
+ assertFalse(telemetrySender.createRequest().isPresent());
+
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATED));
+ assertFalse(telemetrySender.createRequest().isPresent());
+ }
+
+ @Test
+ public void testCreateRequestPushNoCollector() {
+ final long now = time.milliseconds();
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ // create request to move state to SUBSCRIPTION_IN_PROGRESS
+ telemetrySender.createRequest();
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
+ telemetrySender.updateSubscriptionResult(subscription, now);
+ long interval = telemetrySender.timeToNextUpdate(100);
+ assertTrue(interval > 0 && interval != 2000 && interval >= 0.5 * interval && interval <= 1.5 * interval);
+
+ time.sleep(1000);
+ Optional> requestOptional = telemetrySender.createRequest();
+ assertFalse(requestOptional.isPresent());
+
+ assertEquals(20000, telemetrySender.timeToNextUpdate(100));
+ assertEquals(now + 1000, telemetrySender.lastRequestMs());
+ }
+
+ @Test
+ public void testHandleResponseGetSubscriptions() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ Uuid clientInstanceId = Uuid.randomUuid();
+ GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(
+ new GetTelemetrySubscriptionsResponseData()
+ .setClientInstanceId(clientInstanceId)
+ .setSubscriptionId(5678)
+ .setAcceptedCompressionTypes(Collections.singletonList(CompressionType.GZIP.id))
+ .setPushIntervalMs(20000)
+ .setRequestedMetrics(Collections.singletonList("*")));
+
+ telemetrySender.handleResponse(response);
+ assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state());
+
+ ClientTelemetryReporter.ClientTelemetrySubscription subscription = telemetrySender.subscription();
+ assertNotNull(subscription);
+ assertEquals(clientInstanceId, subscription.clientInstanceId());
+ assertEquals(5678, subscription.subscriptionId());
+ assertEquals(Collections.singletonList(CompressionType.GZIP), subscription.acceptedCompressionTypes());
+ assertEquals(20000, subscription.pushIntervalMs());
+ assertEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, subscription.selector());
+ }
+
+ @Test
+ public void testHandleResponseGetSubscriptionsWithoutMetrics() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ Uuid clientInstanceId = Uuid.randomUuid();
+ GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(
+ new GetTelemetrySubscriptionsResponseData()
+ .setClientInstanceId(clientInstanceId)
+ .setSubscriptionId(5678)
+ .setAcceptedCompressionTypes(Collections.singletonList(CompressionType.GZIP.id))
+ .setPushIntervalMs(20000));
+
+ telemetrySender.handleResponse(response);
+ // Again subscription should be required.
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+
+ ClientTelemetryReporter.ClientTelemetrySubscription subscription = telemetrySender.subscription();
+ assertNotNull(subscription);
+ assertEquals(clientInstanceId, subscription.clientInstanceId());
+ assertEquals(5678, subscription.subscriptionId());
+ assertEquals(Collections.singletonList(CompressionType.GZIP), subscription.acceptedCompressionTypes());
+ assertEquals(20000, subscription.pushIntervalMs());
+ assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, subscription.selector());
+ }
+
+ @Test
+ public void testHandleResponseGetTelemetryErrorResponse() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ // throttling quota exceeded
+ GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(
+ new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.THROTTLING_QUOTA_EXCEEDED.code()));
+
+ telemetrySender.handleResponse(response);
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(300000, telemetrySender.intervalMs());
+ assertTrue(telemetrySender.enabled());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ // invalid request error
+ response = new GetTelemetrySubscriptionsResponse(
+ new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.INVALID_REQUEST.code()));
+
+ telemetrySender.handleResponse(response);
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+ assertFalse(telemetrySender.enabled());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ // unsupported version error
+ telemetrySender.enabled(true);
+ response = new GetTelemetrySubscriptionsResponse(
+ new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code()));
+
+ telemetrySender.handleResponse(response);
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+ assertFalse(telemetrySender.enabled());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ // unknown error
+ telemetrySender.enabled(true);
+ response = new GetTelemetrySubscriptionsResponse(
+ new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()));
+
+ telemetrySender.handleResponse(response);
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+ assertFalse(telemetrySender.enabled());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ }
+
+ @Test
+ public void testHandleResponseSubscriptionChange() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+ KafkaMetricsCollector kafkaMetricsCollector = Mockito.mock(KafkaMetricsCollector.class);
+ clientTelemetryReporter.metricsCollector(kafkaMetricsCollector);
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ Uuid clientInstanceId = Uuid.randomUuid();
+ GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(
+ new GetTelemetrySubscriptionsResponseData()
+ .setClientInstanceId(clientInstanceId)
+ .setSubscriptionId(15678)
+ .setAcceptedCompressionTypes(Collections.singletonList(CompressionType.ZSTD.id))
+ .setPushIntervalMs(10000)
+ .setDeltaTemporality(false) // Change delta temporality as well
+ .setRequestedMetrics(Collections.singletonList("org.apache.kafka.producer")));
+
+ telemetrySender.handleResponse(response);
+ assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state());
+
+ ClientTelemetryReporter.ClientTelemetrySubscription responseSubscription = telemetrySender.subscription();
+ assertNotNull(responseSubscription);
+ assertEquals(clientInstanceId, responseSubscription.clientInstanceId());
+ assertEquals(15678, responseSubscription.subscriptionId());
+ assertEquals(Collections.singletonList(CompressionType.ZSTD), responseSubscription.acceptedCompressionTypes());
+ assertEquals(10000, responseSubscription.pushIntervalMs());
+ assertFalse(responseSubscription.deltaTemporality());
+ assertTrue(responseSubscription.selector().test(new MetricKey("org.apache.kafka.producer")));
+ assertTrue(responseSubscription.selector().test(new MetricKey("org.apache.kafka.producerabc")));
+ assertTrue(responseSubscription.selector().test(new MetricKey("org.apache.kafka.producer.abc")));
+ assertFalse(responseSubscription.selector().test(new MetricKey("org.apache.kafka.produce")));
+
+ Mockito.verify(kafkaMetricsCollector, Mockito.times(1)).metricsReset();
+ }
+
+ @Test
+ public void testHandleResponsePushTelemetry() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
+
+ PushTelemetryResponse response = new PushTelemetryResponse(new PushTelemetryResponseData());
+
+ telemetrySender.handleResponse(response);
+ assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state());
+ assertEquals(subscription.pushIntervalMs(), telemetrySender.intervalMs());
+ assertTrue(telemetrySender.enabled());
+ }
+
+ @Test
+ public void testHandleResponsePushTelemetryErrorResponse() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
+
+ // unknown subscription id
+ PushTelemetryResponse response = new PushTelemetryResponse(
+ new PushTelemetryResponseData().setErrorCode(Errors.UNKNOWN_SUBSCRIPTION_ID.code()));
+
+ telemetrySender.handleResponse(response);
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(0, telemetrySender.intervalMs());
+ assertTrue(telemetrySender.enabled());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
+
+ // unsupported compression type
+ response = new PushTelemetryResponse(
+ new PushTelemetryResponseData().setErrorCode(Errors.UNSUPPORTED_COMPRESSION_TYPE.code()));
+
+ telemetrySender.handleResponse(response);
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(0, telemetrySender.intervalMs());
+ assertTrue(telemetrySender.enabled());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
+
+ // telemetry too large
+ response = new PushTelemetryResponse(
+ new PushTelemetryResponseData().setErrorCode(Errors.TELEMETRY_TOO_LARGE.code()));
+
+ telemetrySender.handleResponse(response);
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(20000, telemetrySender.intervalMs());
+ assertTrue(telemetrySender.enabled());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
+
+ // throttling quota exceeded
+ response = new PushTelemetryResponse(
+ new PushTelemetryResponseData().setErrorCode(Errors.THROTTLING_QUOTA_EXCEEDED.code()));
+
+ telemetrySender.handleResponse(response);
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(20000, telemetrySender.intervalMs());
+ assertTrue(telemetrySender.enabled());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
+
+ // invalid request error
+ response = new PushTelemetryResponse(
+ new PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code()));
+
+ telemetrySender.handleResponse(response);
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+ assertFalse(telemetrySender.enabled());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
+
+ // unsupported version error
+ telemetrySender.enabled(true);
+ response = new PushTelemetryResponse(
+ new PushTelemetryResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code()));
+
+ telemetrySender.handleResponse(response);
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+ assertFalse(telemetrySender.enabled());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
+
+ // invalid record
+ telemetrySender.enabled(true);
+ response = new PushTelemetryResponse(
+ new PushTelemetryResponseData().setErrorCode(Errors.INVALID_RECORD.code()));
+
+ telemetrySender.handleResponse(response);
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+ assertFalse(telemetrySender.enabled());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
+
+ // unknown error
+ telemetrySender.enabled(true);
+ response = new PushTelemetryResponse(
+ new PushTelemetryResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()));
+
+ telemetrySender.handleResponse(response);
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+ assertFalse(telemetrySender.enabled());
+ }
+
+ @Test
+ public void testClientInstanceId() throws InterruptedException {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ CountDownLatch lock = new CountDownLatch(2);
+
+ AtomicReference> clientInstanceId = new AtomicReference<>();
+ new Thread(() -> {
+ try {
+ clientInstanceId.set(telemetrySender.clientInstanceId(Duration.ofMillis(10000)));
+ } finally {
+ lock.countDown();
+ }
+ }).start();
+
+ new Thread(() -> {
+ try {
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+ } finally {
+ lock.countDown();
+ }
+ }).start();
+
+ assertTrue(lock.await(2000, TimeUnit.MILLISECONDS));
+ assertNotNull(clientInstanceId.get());
+ assertTrue(clientInstanceId.get().isPresent());
+ assertEquals(uuid, clientInstanceId.get().get());
+ }
+
+ @Test
+ public void testComputeStaggeredIntervalMs() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ assertEquals(0, telemetrySender.computeStaggeredIntervalMs(0, 0.5, 1.5));
+ assertEquals(1, telemetrySender.computeStaggeredIntervalMs(1, 0.99, 1));
+ long timeMs = telemetrySender.computeStaggeredIntervalMs(1000, 0.5, 1.5);
+ assertTrue(timeMs >= 500 && timeMs <= 1500);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ clientTelemetryReporter.close();
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
new file mode 100644
index 00000000000..9897e102295
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientTelemetryUtilsTest {
+
+ @Test
+ public void testMaybeFetchErrorIntervalMs() {
+ assertEquals(Optional.empty(), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.NONE.code(), -1));
+ assertEquals(Optional.of(Integer.MAX_VALUE), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.INVALID_REQUEST.code(), -1));
+ assertEquals(Optional.of(Integer.MAX_VALUE), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.INVALID_RECORD.code(), -1));
+ assertEquals(Optional.of(0), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.UNKNOWN_SUBSCRIPTION_ID.code(), -1));
+ assertEquals(Optional.of(0), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.UNSUPPORTED_COMPRESSION_TYPE.code(), -1));
+ assertEquals(Optional.of(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.TELEMETRY_TOO_LARGE.code(), -1));
+ assertEquals(Optional.of(20000), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.TELEMETRY_TOO_LARGE.code(), 20000));
+ assertEquals(Optional.of(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.THROTTLING_QUOTA_EXCEEDED.code(), -1));
+ assertEquals(Optional.of(20000), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.THROTTLING_QUOTA_EXCEEDED.code(), 20000));
+ assertEquals(Optional.of(Integer.MAX_VALUE), ClientTelemetryUtils.maybeFetchErrorIntervalMs(Errors.UNKNOWN_SERVER_ERROR.code(), -1));
+ }
+
+ @Test
+ public void testGetSelectorFromRequestedMetrics() {
+ // no metrics selector
+ assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(Collections.emptyList()));
+ assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(null));
+ // all metrics selector
+ assertEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(Collections.singletonList("*")));
+ // specific metrics selector
+ Predicate super MetricKeyable> selector = ClientTelemetryUtils.getSelectorFromRequestedMetrics(Arrays.asList("metric1", "metric2"));
+ assertNotEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, selector);
+ assertNotEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, selector);
+ assertTrue(selector.test(new MetricKey("metric1.test")));
+ assertTrue(selector.test(new MetricKey("metric2.test")));
+ assertFalse(selector.test(new MetricKey("test.metric1")));
+ assertFalse(selector.test(new MetricKey("test.metric2")));
+ }
+
+ @Test
+ public void testGetCompressionTypesFromAcceptedList() {
+ assertEquals(0, ClientTelemetryUtils.getCompressionTypesFromAcceptedList(null).size());
+ assertEquals(0, ClientTelemetryUtils.getCompressionTypesFromAcceptedList(Collections.emptyList()).size());
+
+ List compressionTypes = new ArrayList<>();
+ compressionTypes.add(CompressionType.GZIP.id);
+ compressionTypes.add(CompressionType.LZ4.id);
+ compressionTypes.add(CompressionType.SNAPPY.id);
+ compressionTypes.add(CompressionType.ZSTD.id);
+ compressionTypes.add(CompressionType.NONE.id);
+ compressionTypes.add((byte) -1);
+
+ // should take the first compression type
+ assertEquals(5, ClientTelemetryUtils.getCompressionTypesFromAcceptedList(compressionTypes).size());
+ }
+
+ @Test
+ public void testValidateClientInstanceId() {
+ assertThrows(IllegalArgumentException.class, () -> ClientTelemetryUtils.validateClientInstanceId(null));
+ assertThrows(IllegalArgumentException.class, () -> ClientTelemetryUtils.validateClientInstanceId(Uuid.ZERO_UUID));
+
+ Uuid uuid = Uuid.randomUuid();
+ assertEquals(uuid, ClientTelemetryUtils.validateClientInstanceId(uuid));
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {300_000, Integer.MAX_VALUE - 1, Integer.MAX_VALUE})
+ public void testValidateIntervalMsValid(int pushIntervalMs) {
+ assertEquals(pushIntervalMs, ClientTelemetryUtils.validateIntervalMs(pushIntervalMs));
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {-1, 0})
+ public void testValidateIntervalMsInvalid(int pushIntervalMs) {
+ assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, ClientTelemetryUtils.validateIntervalMs(pushIntervalMs));
+ }
+
+ @Test
+ public void testPreferredCompressionType() {
+ assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Collections.emptyList()));
+ assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(null));
+ }
+}
\ No newline at end of file