From 2b99d0e45027c88ae2347fa8f7d1ff4b2b919089 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Tue, 5 Dec 2023 23:20:33 +0530 Subject: [PATCH] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) (#14843) The PR adds changes for the client APIs to register ClientTelemetryReporter, if enabled, and periodically report client metrics. The changes include front facing API changes with NetworkCLient issuing telemetry APIs. The PR build is dependent on: #14620, #14724 Reviewers: Philip Nee , Andrew Schofield , Kirk True , Matthias J. Sax , Walker Carlson --- checkstyle/suppressions.xml | 6 +- .../org/apache/kafka/clients/ClientUtils.java | 14 +- .../kafka/clients/CommonClientConfigs.java | 13 ++ .../apache/kafka/clients/NetworkClient.java | 153 ++++++++++++++++-- .../kafka/clients/admin/KafkaAdminClient.java | 58 ++++++- .../internals/AbstractCoordinator.java | 17 ++ .../internals/AsyncKafkaConsumer.java | 27 +++- .../internals/ConsumerCoordinator.java | 7 +- .../consumer/internals/ConsumerUtils.java | 13 +- .../internals/LegacyKafkaConsumer.java | 29 +++- .../internals/NetworkClientDelegate.java | 7 +- .../kafka/clients/producer/KafkaProducer.java | 31 +++- .../internals/ClientTelemetryUtils.java | 10 ++ .../kafka/clients/NetworkClientTest.java | 119 +++++++++++++- .../clients/admin/KafkaAdminClientTest.java | 50 +++++- .../clients/consumer/KafkaConsumerTest.java | 86 +++++++++- .../internals/ConsumerCoordinatorTest.java | 8 +- .../clients/producer/KafkaProducerTest.java | 89 +++++++++- 18 files changed, 672 insertions(+), 65 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index c654ba7a2b6..3777d8714a2 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -66,7 +66,7 @@ files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor|AbstractRequest|AbstractResponse).java"/> + files="(NetworkClient|FieldSpec|KafkaRaftClient|KafkaProducer).java"/> + + files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest|NetworkClientTest).java"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index b77fbeda742..6b6b56059c8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.network.ChannelBuilders; import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; @@ -156,7 +157,8 @@ public final class ClientUtils { Time time, int maxInFlightRequestsPerConnection, Metadata metadata, - Sensor throttleTimeSensor) { + Sensor throttleTimeSensor, + ClientTelemetrySender clientTelemetrySender) { return createNetworkClient(config, config.getString(CommonClientConfigs.CLIENT_ID_CONFIG), metrics, @@ -169,7 +171,8 @@ public final class ClientUtils { metadata, null, new DefaultHostResolver(), - throttleTimeSensor); + throttleTimeSensor, + clientTelemetrySender); } public static NetworkClient createNetworkClient(AbstractConfig config, @@ -195,6 +198,7 @@ public final class ClientUtils { null, metadataUpdater, hostResolver, + null, null); } @@ -210,7 +214,8 @@ public final class ClientUtils { Metadata metadata, MetadataUpdater metadataUpdater, HostResolver hostResolver, - Sensor throttleTimeSensor) { + Sensor throttleTimeSensor, + ClientTelemetrySender clientTelemetrySender) { ChannelBuilder channelBuilder = null; Selector selector = null; @@ -239,7 +244,8 @@ public final class ClientUtils { apiVersions, throttleTimeSensor, logContext, - hostResolver); + hostResolver, + clientTelemetrySender); } catch (Throwable t) { closeQuietly(selector, "Selector"); closeQuietly(channelBuilder, "ChannelBuilder"); diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index 872c7210096..a65e2467f1b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +32,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; /** * Configurations shared by Kafka client applications: producer, consumer, connect, etc. @@ -294,4 +297,14 @@ public class CommonClientConfigs { } return reporters; } + + public static Optional telemetryReporter(String clientId, AbstractConfig config) { + if (!config.getBoolean(CommonClientConfigs.ENABLE_METRICS_PUSH_CONFIG)) { + return Optional.empty(); + } + + ClientTelemetryReporter telemetryReporter = new ClientTelemetryReporter(Time.SYSTEM); + telemetryReporter.configure(config.originals(Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId))); + return Optional.of(telemetryReporter); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index ad81d1faed4..a596f660d02 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -38,10 +38,13 @@ import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.CorrelationIdMismatchException; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.PushTelemetryResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -128,6 +131,8 @@ public class NetworkClient implements KafkaClient { private final AtomicReference state; + private final TelemetrySender telemetrySender; + public NetworkClient(Selectable selector, Metadata metadata, String clientId, @@ -194,7 +199,8 @@ public class NetworkClient implements KafkaClient { apiVersions, throttleTimeSensor, logContext, - new DefaultHostResolver()); + new DefaultHostResolver(), + null); } public NetworkClient(Selectable selector, @@ -229,7 +235,8 @@ public class NetworkClient implements KafkaClient { apiVersions, null, logContext, - new DefaultHostResolver()); + new DefaultHostResolver(), + null); } public NetworkClient(MetadataUpdater metadataUpdater, @@ -249,7 +256,8 @@ public class NetworkClient implements KafkaClient { ApiVersions apiVersions, Sensor throttleTimeSensor, LogContext logContext, - HostResolver hostResolver) { + HostResolver hostResolver, + ClientTelemetrySender clientTelemetrySender) { /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the * super constructor is invoked. @@ -279,6 +287,7 @@ public class NetworkClient implements KafkaClient { this.throttleTimeSensor = throttleTimeSensor; this.log = logContext.logger(NetworkClient.class); this.state = new AtomicReference<>(State.ACTIVE); + this.telemetrySender = (clientTelemetrySender != null) ? new TelemetrySender(clientTelemetrySender) : null; } /** @@ -361,6 +370,8 @@ public class NetworkClient implements KafkaClient { } } else if (request.header.apiKey() == ApiKeys.METADATA) { metadataUpdater.handleFailedRequest(now, Optional.empty()); + } else if (isTelemetryApi(request.header.apiKey()) && telemetrySender != null) { + telemetrySender.handleFailedRequest(request.header.apiKey(), null); } } } @@ -522,6 +533,8 @@ public class NetworkClient implements KafkaClient { abortedSends.add(clientResponse); else if (clientRequest.apiKey() == ApiKeys.METADATA) metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException)); + else if (isTelemetryApi(clientRequest.apiKey()) && telemetrySender != null) + telemetrySender.handleFailedRequest(clientRequest.apiKey(), unsupportedVersionException); } } @@ -567,8 +580,9 @@ public class NetworkClient implements KafkaClient { } long metadataTimeout = metadataUpdater.maybeUpdate(now); + long telemetryTimeout = telemetrySender != null ? telemetrySender.maybeUpdate(now) : Integer.MAX_VALUE; try { - this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); + this.selector.poll(Utils.min(timeout, metadataTimeout, telemetryTimeout, defaultRequestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } @@ -663,6 +677,8 @@ public class NetworkClient implements KafkaClient { if (state.compareAndSet(State.CLOSING, State.CLOSED)) { this.selector.close(); this.metadataUpdater.close(); + if (telemetrySender != null) + telemetrySender.close(); } else { log.warn("Attempting to close NetworkClient that has already been closed."); } @@ -925,6 +941,10 @@ public class NetworkClient implements KafkaClient { metadataUpdater.handleSuccessfulResponse(req.header, now, (MetadataResponse) response); else if (req.isInternalRequest && response instanceof ApiVersionsResponse) handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) response); + else if (req.isInternalRequest && response instanceof GetTelemetrySubscriptionsResponse) + telemetrySender.handleResponse((GetTelemetrySubscriptionsResponse) response); + else if (req.isInternalRequest && response instanceof PushTelemetryResponse) + telemetrySender.handleResponse((PushTelemetryResponse) response); else responses.add(req.completed(response, now)); } @@ -1042,6 +1062,25 @@ public class NetworkClient implements KafkaClient { } } + /** + * Return true if there's at least one connection establishment is currently underway + */ + private boolean isAnyNodeConnecting() { + for (Node node : metadataUpdater.fetchNodes()) { + if (connectionStates.isConnecting(node.idString())) { + return true; + } + } + return false; + } + + /** + * Return true if the ApiKey belongs to the Telemetry API. + */ + private boolean isTelemetryApi(ApiKeys apiKey) { + return apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY; + } + class DefaultMetadataUpdater implements MetadataUpdater { /* the current cluster metadata */ @@ -1161,18 +1200,6 @@ public class NetworkClient implements KafkaClient { this.metadata.close(); } - /** - * Return true if there's at least one connection establishment is currently underway - */ - private boolean isAnyNodeConnecting() { - for (Node node : fetchNodes()) { - if (connectionStates.isConnecting(node.idString())) { - return true; - } - } - return false; - } - /** * Add a metadata request to the list of sends if we can make one */ @@ -1222,6 +1249,95 @@ public class NetworkClient implements KafkaClient { } + class TelemetrySender { + + private final ClientTelemetrySender clientTelemetrySender; + private Node stickyNode; + + public TelemetrySender(ClientTelemetrySender clientTelemetrySender) { + this.clientTelemetrySender = clientTelemetrySender; + } + + public long maybeUpdate(long now) { + long timeToNextUpdate = clientTelemetrySender.timeToNextUpdate(defaultRequestTimeoutMs); + if (timeToNextUpdate > 0) + return timeToNextUpdate; + + // Per KIP-714, let's continue to re-use the same broker for as long as possible. + if (stickyNode == null) { + stickyNode = leastLoadedNode(now); + if (stickyNode == null) { + log.debug("Give up sending telemetry request since no node is available"); + return reconnectBackoffMs; + } + } + + return maybeUpdate(now, stickyNode); + } + + private long maybeUpdate(long now, Node node) { + String nodeConnectionId = node.idString(); + + if (canSendRequest(nodeConnectionId, now)) { + Optional> requestOpt = clientTelemetrySender.createRequest(); + + if (!requestOpt.isPresent()) + return Long.MAX_VALUE; + + AbstractRequest.Builder request = requestOpt.get(); + ClientRequest clientRequest = newClientRequest(nodeConnectionId, request, now, true); + doSend(clientRequest, true, now); + return defaultRequestTimeoutMs; + } else { + // Per KIP-714, if we can't issue a request to this broker node, let's clear it out + // and try another broker on the next loop. + stickyNode = null; + } + + // If there's any connection establishment underway, wait until it completes. This prevents + // the client from unnecessarily connecting to additional nodes while a previous connection + // attempt has not been completed. + if (isAnyNodeConnecting()) + return reconnectBackoffMs; + + if (connectionStates.canConnect(nodeConnectionId, now)) { + // We don't have a connection to this node right now, make one + log.debug("Initialize connection to node {} for sending telemetry request", node); + initiateConnect(node, now); + return reconnectBackoffMs; + } + + // In either case, we just need to wait for a network event to let us know the selected + // connection might be usable again. + return Long.MAX_VALUE; + } + + public void handleResponse(GetTelemetrySubscriptionsResponse response) { + clientTelemetrySender.handleResponse(response); + } + + public void handleResponse(PushTelemetryResponse response) { + clientTelemetrySender.handleResponse(response); + } + + public void handleFailedRequest(ApiKeys apiKey, KafkaException maybeFatalException) { + if (apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS) + clientTelemetrySender.handleFailedGetTelemetrySubscriptionsRequest(maybeFatalException); + else if (apiKey == ApiKeys.PUSH_TELEMETRY) + clientTelemetrySender.handleFailedPushTelemetryRequest(maybeFatalException); + else + throw new IllegalStateException("Invalid api key for failed telemetry request"); + } + + public void close() { + try { + clientTelemetrySender.close(); + } catch (Exception exception) { + log.error("Failed to close client telemetry sender", exception); + } + } + } + @Override public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder requestBuilder, @@ -1239,6 +1355,11 @@ public class NetworkClient implements KafkaClient { return correlation++; } + // visible for testing + Node telemetryConnectedNode() { + return telemetrySender.stickyNode; + } + @Override public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder requestBuilder, diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 18bd108a789..f663d6efc60 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -140,6 +140,7 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData; import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName; import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData; import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData; import org.apache.kafka.common.message.ListGroupsRequestData; @@ -208,6 +209,8 @@ import org.apache.kafka.common.requests.ElectLeadersRequest; import org.apache.kafka.common.requests.ElectLeadersResponse; import org.apache.kafka.common.requests.ExpireDelegationTokenRequest; import org.apache.kafka.common.requests.ExpireDelegationTokenResponse; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest; import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; import org.apache.kafka.common.requests.JoinGroupRequest; @@ -379,6 +382,12 @@ public class KafkaAdminClient extends AdminClient { private final long retryBackoffMs; private final long retryBackoffMaxMs; private final ExponentialBackoff retryBackoff; + private final boolean clientTelemetryEnabled; + + /** + * The telemetry requests client instance id. + */ + private Uuid clientInstanceId; /** * Get or create a list value from a map. @@ -533,6 +542,7 @@ public class KafkaAdminClient extends AdminClient { } } + // Visible for tests static KafkaAdminClient createInternal(AdminClientConfig config, AdminMetadataManager metadataManager, KafkaClient client, @@ -585,6 +595,7 @@ public class KafkaAdminClient extends AdminClient { CommonClientConfigs.RETRY_BACKOFF_EXP_BASE, retryBackoffMaxMs, CommonClientConfigs.RETRY_BACKOFF_JITTER); + this.clientTelemetryEnabled = config.getBoolean(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG); config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds()); log.debug("Kafka admin client initialized"); @@ -4420,7 +4431,52 @@ public class KafkaAdminClient extends AdminClient { @Override public Uuid clientInstanceId(Duration timeout) { - throw new UnsupportedOperationException(); + if (timeout.isNegative()) { + throw new IllegalArgumentException("The timeout cannot be negative."); + } + + if (!clientTelemetryEnabled) { + throw new IllegalStateException("Telemetry is not enabled. Set config `" + AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`."); + } + + if (clientInstanceId != null) { + return clientInstanceId; + } + + final long now = time.milliseconds(); + final KafkaFutureImpl future = new KafkaFutureImpl<>(); + runnable.call(new Call("getTelemetrySubscriptions", calcDeadlineMs(now, (int) timeout.toMillis()), + new LeastLoadedNodeProvider()) { + + @Override + GetTelemetrySubscriptionsRequest.Builder createRequest(int timeoutMs) { + return new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + GetTelemetrySubscriptionsResponse response = (GetTelemetrySubscriptionsResponse) abstractResponse; + if (response.error() != Errors.NONE) { + future.completeExceptionally(response.error().exception()); + } else { + future.complete(response.data().clientInstanceId()); + } + } + + @Override + void handleFailure(Throwable throwable) { + future.completeExceptionally(throwable); + } + }, now); + + try { + clientInstanceId = future.get(); + } catch (Exception e) { + log.error("Error occurred while fetching client instance id", e); + throw new KafkaException("Error occurred while fetching client instance id", e); + } + + return clientInstanceId; } private void invokeDriver( diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 24d05e7d63b..3ab4f2e7e6d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -64,6 +64,8 @@ import org.apache.kafka.common.requests.LeaveGroupResponse; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; import org.apache.kafka.common.utils.ExponentialBackoff; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; @@ -79,6 +81,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -130,6 +133,7 @@ public abstract class AbstractCoordinator implements Closeable { private final Heartbeat heartbeat; private final GroupCoordinatorMetrics sensors; private final GroupRebalanceConfig rebalanceConfig; + private final Optional clientTelemetryReporter; protected final Time time; protected final ConsumerNetworkClient client; @@ -160,6 +164,16 @@ public abstract class AbstractCoordinator implements Closeable { Metrics metrics, String metricGrpPrefix, Time time) { + this(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, time, Optional.empty()); + } + + public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, + LogContext logContext, + ConsumerNetworkClient client, + Metrics metrics, + String metricGrpPrefix, + Time time, + Optional clientTelemetryReporter) { Objects.requireNonNull(rebalanceConfig.groupId, "Expected a non-null group id for coordinator construction"); this.rebalanceConfig = rebalanceConfig; @@ -173,6 +187,7 @@ public abstract class AbstractCoordinator implements Closeable { CommonClientConfigs.RETRY_BACKOFF_JITTER); this.heartbeat = new Heartbeat(rebalanceConfig, time); this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix); + this.clientTelemetryReporter = clientTelemetryReporter; } /** @@ -648,6 +663,8 @@ public abstract class AbstractCoordinator implements Closeable { joinResponse.data().memberId(), joinResponse.data().protocolName()); log.info("Successfully joined group with generation {}", AbstractCoordinator.this.generation); + clientTelemetryReporter.ifPresent(reporter -> reporter.updateMetricsLabels( + Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, joinResponse.data().memberId()))); if (joinResponse.isLeader()) { onLeaderElected(joinResponse).chain(future); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 196bed50286..7733c45bc4a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -65,9 +65,12 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; @@ -158,6 +161,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { private final int defaultApiTimeoutMs; private volatile boolean closed = false; private final List assignors; + private final Optional clientTelemetryReporter; // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates private boolean cachedSubscriptionHasAllFetchPositions; @@ -185,7 +189,10 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { log.debug("Initializing the Kafka consumer"); this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); this.time = Time.SYSTEM; - this.metrics = createMetrics(config, time); + List reporters = CommonClientConfigs.metricsReporters(clientId, config); + this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); + this.clientTelemetryReporter.ifPresent(reporters::add); + this.metrics = createMetrics(config, time, reporters); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); List> interceptorList = configuredConsumerInterceptors(config); @@ -215,7 +222,8 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { config, apiVersions, metrics, - fetchMetricsManager); + fetchMetricsManager, + clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null)); final Supplier requestManagersSupplier = RequestManagers.supplier(time, logContext, backgroundEventQueue, @@ -305,6 +313,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { this.applicationEventHandler = applicationEventHandler; this.assignors = assignors; this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer"); + this.clientTelemetryReporter = Optional.empty(); } // Visible for testing @@ -330,6 +339,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer); this.assignors = assignors; + this.clientTelemetryReporter = Optional.empty(); ConsumerMetrics metricsRegistry = new ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX); FetchMetricsManager fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics); @@ -918,8 +928,12 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { log.trace("Closing the Kafka consumer"); AtomicReference firstException = new AtomicReference<>(); + final Timer closeTimer = time.timer(timeout); + clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); + closeTimer.update(); + if (applicationEventHandler != null) - closeQuietly(() -> applicationEventHandler.close(timeout), "Failed to close application event handler with a timeout(ms)=" + timeout, firstException); + closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed to close application event handler with a timeout(ms)=" + closeTimer.remainingMs(), firstException); // Invoke all callbacks after the background thread exists in case if there are unsent async // commits @@ -930,6 +944,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException); closeQuietly(metrics, "consumer metrics", firstException); closeQuietly(deserializers, "consumer deserializers", firstException); + clientTelemetryReporter.ifPresent(reporter -> closeQuietly(reporter, "async consumer telemetry reporter", firstException)); AppInfoParser.unregisterAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics); log.debug("Kafka consumer has been closed"); @@ -979,7 +994,11 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { @Override public Uuid clientInstanceId(Duration timeout) { - throw new KafkaException("method not implemented"); + if (!clientTelemetryReporter.isPresent()) { + throw new IllegalStateException("Telemetry is not enabled. Set config `" + ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`."); + } + + return ClientTelemetryUtils.fetchClientInstanceId(clientTelemetryReporter.get(), timeout); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 590c8a0976a..e3ce79e373f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -56,6 +56,7 @@ import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.OffsetFetchRequest; import org.apache.kafka.common.requests.OffsetFetchResponse; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; @@ -168,13 +169,15 @@ public final class ConsumerCoordinator extends AbstractCoordinator { int autoCommitIntervalMs, ConsumerInterceptors interceptors, boolean throwOnFetchStableOffsetsUnsupported, - String rackId) { + String rackId, + Optional clientTelemetryReporter) { super(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, - time); + time, + clientTelemetryReporter); this.rebalanceConfig = rebalanceConfig; this.log = logContext.logger(ConsumerCoordinator.class); this.metadata = metadata; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java index e267293f98b..d599d41a245 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java @@ -38,6 +38,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsContext; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; @@ -81,7 +82,8 @@ public final class ConsumerUtils { Time time, Metadata metadata, Sensor throttleTimeSensor, - long retryBackoffMs) { + long retryBackoffMs, + ClientTelemetrySender clientTelemetrySender) { NetworkClient netClient = ClientUtils.createNetworkClient(config, metrics, CONSUMER_METRIC_GROUP_PREFIX, @@ -90,7 +92,8 @@ public final class ConsumerUtils { time, CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION, metadata, - throttleTimeSensor); + throttleTimeSensor, + clientTelemetrySender); // Will avoid blocking an extended period of time to prevent heartbeat thread starvation int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); @@ -130,6 +133,11 @@ public final class ConsumerUtils { } public static Metrics createMetrics(ConsumerConfig config, Time time) { + return createMetrics(config, time, CommonClientConfigs.metricsReporters( + config.getString(ConsumerConfig.CLIENT_ID_CONFIG), config)); + } + + public static Metrics createMetrics(ConsumerConfig config, Time time, List reporters) { String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); Map metricsTags = Collections.singletonMap(CONSUMER_CLIENT_ID_METRIC_TAG, clientId); MetricConfig metricConfig = new MetricConfig() @@ -137,7 +145,6 @@ public final class ConsumerUtils { .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricsTags); - List reporters = CommonClientConfigs.metricsReporters(clientId, config); MetricsContext metricsContext = new KafkaMetricsContext(CONSUMER_JMX_PREFIX, config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); return new Metrics(metricConfig, reporters, time, metricsContext); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java index e427322d31b..563b437cba6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java @@ -48,7 +48,10 @@ import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; @@ -130,6 +133,7 @@ public class LegacyKafkaConsumer implements ConsumerDelegate { private final int defaultApiTimeoutMs; private volatile boolean closed = false; private final List assignors; + private final Optional clientTelemetryReporter; // currentThread holds the threadId of the current thread accessing LegacyKafkaConsumer // and is used to prevent multi-threaded access @@ -160,7 +164,10 @@ public class LegacyKafkaConsumer implements ConsumerDelegate { this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); this.time = Time.SYSTEM; - this.metrics = createMetrics(config, time); + List reporters = CommonClientConfigs.metricsReporters(clientId, config); + this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); + this.clientTelemetryReporter.ifPresent(reporters::add); + this.metrics = createMetrics(config, time, reporters); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); @@ -188,7 +195,8 @@ public class LegacyKafkaConsumer implements ConsumerDelegate { time, metadata, fetchMetricsManager.throttleTimeSensor(), - retryBackoffMs); + retryBackoffMs, + clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null)); this.assignors = ConsumerPartitionAssignor.getAssignorInstances( config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), @@ -214,7 +222,8 @@ public class LegacyKafkaConsumer implements ConsumerDelegate { config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), this.interceptors, config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED), - config.getString(ConsumerConfig.CLIENT_RACK_CONFIG)); + config.getString(ConsumerConfig.CLIENT_RACK_CONFIG), + clientTelemetryReporter); } this.fetcher = new Fetcher<>( logContext, @@ -282,6 +291,7 @@ public class LegacyKafkaConsumer implements ConsumerDelegate { this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); + this.clientTelemetryReporter = Optional.empty(); int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG); int rebalanceTimeoutMs = config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG); @@ -327,7 +337,8 @@ public class LegacyKafkaConsumer implements ConsumerDelegate { autoCommitIntervalMs, interceptors, throwOnStableOffsetNotSupported, - rackId + rackId, + clientTelemetryReporter ); } else { this.coordinator = null; @@ -880,7 +891,12 @@ public class LegacyKafkaConsumer implements ConsumerDelegate { @Override public Uuid clientInstanceId(Duration timeout) { - throw new UnsupportedOperationException(); + if (!clientTelemetryReporter.isPresent()) { + throw new IllegalStateException("Telemetry is not enabled. Set config `" + ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`."); + + } + + return ClientTelemetryUtils.fetchClientInstanceId(clientTelemetryReporter.get(), timeout); } @Override @@ -1108,6 +1124,8 @@ public class LegacyKafkaConsumer implements ConsumerDelegate { AtomicReference firstException = new AtomicReference<>(); final Timer closeTimer = createTimerForRequest(timeout); + clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); + closeTimer.update(); // Close objects with a timeout. The timeout is required because the coordinator & the fetcher send requests to // the server in the process of closing which may not respect the overall timeout defined for closing the // consumer. @@ -1134,6 +1152,7 @@ public class LegacyKafkaConsumer implements ConsumerDelegate { closeQuietly(metrics, "consumer metrics", firstException); closeQuietly(client, "consumer network client", firstException); closeQuietly(deserializers, "consumer deserializers", firstException); + clientTelemetryReporter.ifPresent(reporter -> closeQuietly(reporter, "consumer telemetry reporter", firstException)); AppInfoParser.unregisterAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics); log.debug("Kafka consumer has been closed"); Throwable exception = firstException.get(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 732de9055c2..141f5f955c8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; @@ -369,7 +370,8 @@ public class NetworkClientDelegate implements AutoCloseable { final ConsumerConfig config, final ApiVersions apiVersions, final Metrics metrics, - final FetchMetricsManager fetchMetricsManager) { + final FetchMetricsManager fetchMetricsManager, + final ClientTelemetrySender clientTelemetrySender) { return new CachedSupplier() { @Override protected NetworkClientDelegate create() { @@ -381,7 +383,8 @@ public class NetworkClientDelegate implements AutoCloseable { time, CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION, metadata, - fetchMetricsManager.throttleTimeSensor()); + fetchMetricsManager.throttleTimeSensor(), + clientTelemetrySender); return new NetworkClientDelegate(time, config, logContext, client); } }; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 2b9b4c2b05a..94273358e7d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -66,10 +66,13 @@ import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; @@ -80,6 +83,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -257,6 +261,7 @@ public class KafkaProducer implements Producer { private final ProducerInterceptors interceptors; private final ApiVersions apiVersions; private final TransactionManager transactionManager; + private final Optional clientTelemetryReporter; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -364,6 +369,8 @@ public class KafkaProducer implements Producer { .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricTags); List reporters = CommonClientConfigs.metricsReporters(clientId, config); + this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); + this.clientTelemetryReporter.ifPresent(reporters::add); MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time, metricsContext); @@ -480,7 +487,8 @@ public class KafkaProducer implements Producer { ProducerInterceptors interceptors, Partitioner partitioner, Time time, - KafkaThread ioThread) { + KafkaThread ioThread, + Optional clientTelemetryReporter) { this.producerConfig = config; this.time = time; this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); @@ -503,6 +511,7 @@ public class KafkaProducer implements Producer { this.metadata = metadata; this.sender = sender; this.ioThread = ioThread; + this.clientTelemetryReporter = clientTelemetryReporter; } // visible for testing @@ -519,7 +528,8 @@ public class KafkaProducer implements Producer { time, maxInflightRequests, metadata, - throttleTimeSensor); + throttleTimeSensor, + clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null)); short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG)); return new Sender(logContext, @@ -1280,7 +1290,11 @@ public class KafkaProducer implements Producer { */ @Override public Uuid clientInstanceId(Duration timeout) { - throw new UnsupportedOperationException(); + if (!clientTelemetryReporter.isPresent()) { + throw new IllegalStateException("Telemetry is not enabled. Set config `" + ProducerConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`."); + } + + return ClientTelemetryUtils.fetchClientInstanceId(clientTelemetryReporter.get(), timeout); } /** @@ -1341,16 +1355,22 @@ public class KafkaProducer implements Producer { timeoutMs); } else { // Try to close gracefully. - if (this.sender != null) + final Timer closeTimer = time.timer(timeout); + if (this.sender != null) { this.sender.initiateClose(); + closeTimer.update(); + } if (this.ioThread != null) { try { - this.ioThread.join(timeoutMs); + this.ioThread.join(closeTimer.remainingMs()); } catch (InterruptedException t) { firstException.compareAndSet(null, new InterruptException(t)); log.error("Interrupted while joining ioThread", t); + } finally { + closeTimer.update(); } } + clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(closeTimer.remainingMs())); } } @@ -1374,6 +1394,7 @@ public class KafkaProducer implements Producer { Utils.closeQuietly(keySerializer, "producer keySerializer", firstException); Utils.closeQuietly(valueSerializer, "producer valueSerializer", firstException); Utils.closeQuietly(partitioner, "producer partitioner", firstException); + clientTelemetryReporter.ifPresent(reporter -> Utils.closeQuietly(reporter, "producer telemetry reporter", firstException)); AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics); Throwable exception = firstException.get(); if (exception != null && !swallowException) { diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java index 91bc6f23b46..d1d62a7efe2 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -194,4 +195,13 @@ public class ClientTelemetryUtils { throw new KafkaException("Unable to parse MetricsData payload", e); } } + + public static Uuid fetchClientInstanceId(ClientTelemetryReporter clientTelemetryReporter, Duration timeout) { + if (timeout.isNegative()) { + throw new IllegalArgumentException("The timeout cannot be negative."); + } + + Optional optionalUuid = clientTelemetryReporter.telemetrySender().clientInstanceId(timeout); + return optionalUuid.orElse(null); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index abf44f4c45b..747049fd192 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -26,20 +26,29 @@ import org.apache.kafka.common.message.ApiMessageType; import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; import org.apache.kafka.common.message.ProduceRequestData; import org.apache.kafka.common.message.ProduceResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestTestUtils; import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.DelayedReceive; @@ -71,6 +80,12 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class NetworkClientTest { @@ -992,32 +1007,48 @@ public class NetworkClientTest { return (mockHostResolver.useNewAddresses() && newAddresses.contains(inetAddress)) || (!mockHostResolver.useNewAddresses() && initialAddresses.contains(inetAddress)); }); + + ClientTelemetrySender mockClientTelemetrySender = mock(ClientTelemetrySender.class); + when(mockClientTelemetrySender.timeToNextUpdate(anyLong())).thenReturn(0L); + NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, - time, false, new ApiVersions(), null, new LogContext(), mockHostResolver); + time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender); // Connect to one the initial addresses, then change the addresses and disconnect client.ready(node, time.milliseconds()); time.sleep(connectionSetupTimeoutMaxMsTest); client.poll(0, time.milliseconds()); assertTrue(client.isReady(node, time.milliseconds())); + // First poll should try to update the node but couldn't because node remains in connecting state + // i.e. connection handling is completed after telemetry update. + assertNull(client.telemetryConnectedNode()); + + client.poll(0, time.milliseconds()); + assertEquals(node, client.telemetryConnectedNode()); mockHostResolver.changeAddresses(); selector.serverDisconnect(node.idString()); client.poll(0, time.milliseconds()); assertFalse(client.isReady(node, time.milliseconds())); + assertNull(client.telemetryConnectedNode()); time.sleep(reconnectBackoffMaxMsTest); client.ready(node, time.milliseconds()); time.sleep(connectionSetupTimeoutMaxMsTest); client.poll(0, time.milliseconds()); assertTrue(client.isReady(node, time.milliseconds())); + assertNull(client.telemetryConnectedNode()); + + client.poll(0, time.milliseconds()); + assertEquals(node, client.telemetryConnectedNode()); // We should have tried to connect to one initial address and one new address, and resolved DNS twice assertEquals(1, initialAddressConns.get()); assertEquals(1, newAddressConns.get()); assertEquals(2, mockHostResolver.resolutionCount()); + verify(mockClientTelemetrySender, times(5)).timeToNextUpdate(anyLong()); } @Test @@ -1036,16 +1067,21 @@ public class NetworkClientTest { // Refuse first connection attempt return initialAddressConns.get() > 1; }); + + ClientTelemetrySender mockClientTelemetrySender = mock(ClientTelemetrySender.class); + when(mockClientTelemetrySender.timeToNextUpdate(anyLong())).thenReturn(0L); + NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, - time, false, new ApiVersions(), null, new LogContext(), mockHostResolver); + time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender); // First connection attempt should fail client.ready(node, time.milliseconds()); time.sleep(connectionSetupTimeoutMaxMsTest); client.poll(0, time.milliseconds()); assertFalse(client.isReady(node, time.milliseconds())); + assertNull(client.telemetryConnectedNode()); // Second connection attempt should succeed time.sleep(reconnectBackoffMaxMsTest); @@ -1053,12 +1089,18 @@ public class NetworkClientTest { time.sleep(connectionSetupTimeoutMaxMsTest); client.poll(0, time.milliseconds()); assertTrue(client.isReady(node, time.milliseconds())); + assertNull(client.telemetryConnectedNode()); + + // Next client poll after handling connection setup should update telemetry node. + client.poll(0, time.milliseconds()); + assertEquals(node, client.telemetryConnectedNode()); // We should have tried to connect to two of the initial addresses, none of the new address, and should // only have resolved DNS once assertEquals(2, initialAddressConns.get()); assertEquals(0, newAddressConns.get()); assertEquals(1, mockHostResolver.resolutionCount()); + verify(mockClientTelemetrySender, times(3)).timeToNextUpdate(anyLong()); } @Test @@ -1077,21 +1119,30 @@ public class NetworkClientTest { // Refuse first connection attempt to the new addresses return initialAddresses.contains(inetAddress) || newAddressConns.get() > 1; }); + + ClientTelemetrySender mockClientTelemetrySender = mock(ClientTelemetrySender.class); + when(mockClientTelemetrySender.timeToNextUpdate(anyLong())).thenReturn(0L); + NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, - time, false, new ApiVersions(), null, new LogContext(), mockHostResolver); + time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender); // Connect to one the initial addresses, then change the addresses and disconnect client.ready(node, time.milliseconds()); time.sleep(connectionSetupTimeoutMaxMsTest); client.poll(0, time.milliseconds()); assertTrue(client.isReady(node, time.milliseconds())); + assertNull(client.telemetryConnectedNode()); + // Next client poll after handling connection setup should update telemetry node. + client.poll(0, time.milliseconds()); + assertEquals(node, client.telemetryConnectedNode()); mockHostResolver.changeAddresses(); selector.serverDisconnect(node.idString()); client.poll(0, time.milliseconds()); assertFalse(client.isReady(node, time.milliseconds())); + assertNull(client.telemetryConnectedNode()); // First connection attempt to new addresses should fail time.sleep(reconnectBackoffMaxMsTest); @@ -1099,6 +1150,7 @@ public class NetworkClientTest { time.sleep(connectionSetupTimeoutMaxMsTest); client.poll(0, time.milliseconds()); assertFalse(client.isReady(node, time.milliseconds())); + assertNull(client.telemetryConnectedNode()); // Second connection attempt to new addresses should succeed time.sleep(reconnectBackoffMaxMsTest); @@ -1106,12 +1158,18 @@ public class NetworkClientTest { time.sleep(connectionSetupTimeoutMaxMsTest); client.poll(0, time.milliseconds()); assertTrue(client.isReady(node, time.milliseconds())); + assertNull(client.telemetryConnectedNode()); + + // Next client poll after handling connection setup should update telemetry node. + client.poll(0, time.milliseconds()); + assertEquals(node, client.telemetryConnectedNode()); // We should have tried to connect to one of the initial addresses and two of the new addresses (the first one // failed), and resolved DNS twice, once for each set of addresses assertEquals(1, initialAddressConns.get()); assertEquals(2, newAddressConns.get()); assertEquals(2, mockHostResolver.resolutionCount()); + verify(mockClientTelemetrySender, times(6)).timeToNextUpdate(anyLong()); } @Test @@ -1168,6 +1226,61 @@ public class NetworkClientTest { assertTrue(client.connectionFailed(node)); } + @Test + public void testTelemetryRequest() { + ClientTelemetrySender mockClientTelemetrySender = mock(ClientTelemetrySender.class); + when(mockClientTelemetrySender.timeToNextUpdate(anyLong())).thenReturn(0L); + + NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, + reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, + defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, + time, true, new ApiVersions(), null, new LogContext(), new DefaultHostResolver(), mockClientTelemetrySender); + + // Send the ApiVersionsRequest + client.ready(node, time.milliseconds()); + client.poll(0, time.milliseconds()); + assertNull(client.telemetryConnectedNode()); + assertTrue(client.hasInFlightRequests(node.idString())); + delayedApiVersionsResponse(0, ApiKeys.API_VERSIONS.latestVersion(), TestUtils.defaultApiVersionsResponse( + ApiMessageType.ListenerType.BROKER)); + // handle ApiVersionsResponse + client.poll(0, time.milliseconds()); + // the ApiVersionsRequest is gone + assertFalse(client.hasInFlightRequests(node.idString())); + selector.clear(); + + GetTelemetrySubscriptionsRequest.Builder getRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true); + when(mockClientTelemetrySender.createRequest()).thenReturn(Optional.of(getRequest)); + + GetTelemetrySubscriptionsResponse getResponse = new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData()); + ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(getResponse, ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS.latestVersion(), 1); + selector.completeReceive(new NetworkReceive(node.idString(), buffer)); + + // Initiate poll to send GetTelemetrySubscriptions request + client.poll(0, time.milliseconds()); + assertTrue(client.isReady(node, time.milliseconds())); + assertEquals(node, client.telemetryConnectedNode()); + verify(mockClientTelemetrySender, times(1)).handleResponse(any(GetTelemetrySubscriptionsResponse.class)); + selector.clear(); + + PushTelemetryRequest.Builder pushRequest = new PushTelemetryRequest.Builder( + new PushTelemetryRequestData(), true); + when(mockClientTelemetrySender.createRequest()).thenReturn(Optional.of(pushRequest)); + + PushTelemetryResponse pushResponse = new PushTelemetryResponse(new PushTelemetryResponseData()); + ByteBuffer pushBuffer = RequestTestUtils.serializeResponseWithHeader(pushResponse, ApiKeys.PUSH_TELEMETRY.latestVersion(), 2); + selector.completeReceive(new NetworkReceive(node.idString(), pushBuffer)); + + // Initiate poll to send PushTelemetry request + client.poll(0, time.milliseconds()); + assertTrue(client.isReady(node, time.milliseconds())); + assertEquals(node, client.telemetryConnectedNode()); + verify(mockClientTelemetrySender, times(1)).handleResponse(any(PushTelemetryResponse.class)); + verify(mockClientTelemetrySender, times(4)).timeToNextUpdate(anyLong()); + verify(mockClientTelemetrySender, times(2)).createRequest(); + } + private RequestHeader parseHeader(ByteBuffer buffer) { buffer.getInt(); // skip size return RequestHeader.parse(buffer.slice()); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index bab1814dfaf..19bff2ea5e8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -33,8 +33,8 @@ import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; -import org.apache.kafka.common.TopicCollection; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicCollection; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionReplica; import org.apache.kafka.common.Uuid; @@ -110,10 +110,11 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse; -import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.InitProducerIdResponseData; +import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; @@ -177,6 +178,8 @@ import org.apache.kafka.common.requests.DescribeUserScramCredentialsResponse; import org.apache.kafka.common.requests.ElectLeadersResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; @@ -225,6 +228,7 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -7063,6 +7067,48 @@ public class KafkaAdminClientTest { } } + @Test + public void testClientInstanceId() { + try (AdminClientUnitTestEnv env = mockClientEnv()) { + Uuid expected = Uuid.randomUuid(); + + GetTelemetrySubscriptionsResponseData responseData = + new GetTelemetrySubscriptionsResponseData().setClientInstanceId(expected).setErrorCode(Errors.NONE.code()); + + env.kafkaClient().prepareResponse( + request -> request instanceof GetTelemetrySubscriptionsRequest, + new GetTelemetrySubscriptionsResponse(responseData)); + + Uuid result = env.adminClient().clientInstanceId(Duration.ofMillis(10)); + assertEquals(expected, result); + } + } + + @Test + public void testClientInstanceIdInvalidTimeout() { + Properties props = new Properties(); + props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + + KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props); + Exception exception = assertThrows(IllegalArgumentException.class, () -> admin.clientInstanceId(Duration.ofMillis(-1))); + assertEquals("The timeout cannot be negative.", exception.getMessage()); + + admin.close(); + } + + @Test + public void testClientInstanceIdNoTelemetryReporterRegistered() { + Properties props = new Properties(); + props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "false"); + + KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props); + Exception exception = assertThrows(IllegalStateException.class, () -> admin.clientInstanceId(Duration.ofMillis(0))); + assertEquals("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", exception.getMessage()); + + admin.close(); + } + private UnregisterBrokerResponse prepareUnregisterBrokerResponse(Errors error, int throttleTimeMs) { return new UnregisterBrokerResponse(new UnregisterBrokerResponseData() .setErrorCode(error.code()) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 04067c7bf5b..93a11bef11f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -56,6 +56,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData; import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse; import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse; import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -86,6 +87,8 @@ import org.apache.kafka.common.requests.SyncGroupResponse; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -96,9 +99,9 @@ import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.MockedStatic; +import org.mockito.internal.stubbing.answers.CallsRealMethods; -import javax.management.MBeanServer; -import javax.management.ObjectName; import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.time.Duration; @@ -132,6 +135,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.management.MBeanServer; +import javax.management.ObjectName; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; @@ -148,8 +153,13 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Note to future authors in this class. If you close the consumer, close with DURATION.ZERO to reduce the duration of @@ -220,33 +230,50 @@ public class KafkaConsumerTest { props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); - MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metricsRegistry().reporters().get(0); + assertEquals(3, consumer.metricsRegistry().reporters().size()); + MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metricsRegistry().reporters().stream() + .filter(reporter -> reporter instanceof MockMetricsReporter).findFirst().get(); assertEquals(consumer.clientId(), mockMetricsReporter.clientId); - assertEquals(2, consumer.metricsRegistry().reporters().size()); } @ParameterizedTest @EnumSource(GroupProtocol.class) @SuppressWarnings("deprecation") - public void testDisableJmxReporter(GroupProtocol groupProtocol) { + public void testDisableJmxAndClientTelemetryReporter(GroupProtocol groupProtocol) { Properties props = new Properties(); props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ConsumerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false"); + props.setProperty(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG, "false"); consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); assertTrue(consumer.metricsRegistry().reporters().isEmpty()); } @ParameterizedTest @EnumSource(GroupProtocol.class) - public void testExplicitlyEnableJmxReporter(GroupProtocol groupProtocol) { + public void testExplicitlyOnlyEnableJmxReporter(GroupProtocol groupProtocol) { Properties props = new Properties(); props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter"); + props.setProperty(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG, "false"); consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); assertEquals(1, consumer.metricsRegistry().reporters().size()); + assertTrue(consumer.metricsRegistry().reporters().get(0) instanceof JmxReporter); + } + + @ParameterizedTest + @EnumSource(GroupProtocol.class) + @SuppressWarnings("deprecation") + public void testExplicitlyOnlyEnableClientTelemetryReporter(GroupProtocol groupProtocol) { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(ConsumerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false"); + consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); + assertEquals(1, consumer.metricsRegistry().reporters().size()); + assertTrue(consumer.metricsRegistry().reporters().get(0) instanceof ClientTelemetryReporter); } // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol. @@ -3293,6 +3320,53 @@ public void testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro ); } + @ParameterizedTest + @EnumSource(GroupProtocol.class) + public void testClientInstanceId() { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + + ClientTelemetryReporter clientTelemetryReporter = mock(ClientTelemetryReporter.class); + clientTelemetryReporter.configure(any()); + + MockedStatic mockedCommonClientConfigs = mockStatic(CommonClientConfigs.class, new CallsRealMethods()); + mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter(anyString(), any())).thenReturn(Optional.of(clientTelemetryReporter)); + + ClientTelemetrySender clientTelemetrySender = mock(ClientTelemetrySender.class); + Uuid expectedUuid = Uuid.randomUuid(); + when(clientTelemetryReporter.telemetrySender()).thenReturn(clientTelemetrySender); + when(clientTelemetrySender.clientInstanceId(any())).thenReturn(Optional.of(expectedUuid)); + + consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); + Uuid uuid = consumer.clientInstanceId(Duration.ofMillis(0)); + assertEquals(expectedUuid, uuid); + + mockedCommonClientConfigs.close(); + } + + @ParameterizedTest + @EnumSource(GroupProtocol.class) + public void testClientInstanceIdInvalidTimeout() { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + + consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); + Exception exception = assertThrows(IllegalArgumentException.class, () -> consumer.clientInstanceId(Duration.ofMillis(-1))); + assertEquals("The timeout cannot be negative.", exception.getMessage()); + } + + @ParameterizedTest + @EnumSource(GroupProtocol.class) + public void testClientInstanceIdNoTelemetryReporterRegistered() { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG, "false"); + + consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); + Exception exception = assertThrows(IllegalStateException.class, () -> consumer.clientInstanceId(Duration.ofMillis(0))); + assertEquals("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", exception.getMessage()); + } + private KafkaConsumer consumerForCheckingTimeoutException(GroupProtocol groupProtocol) { ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 1bc4c778704..ba0d3bacef4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -3700,7 +3700,8 @@ public abstract class ConsumerCoordinatorTest { autoCommitIntervalMs, null, true, - null); + null, + Optional.empty()); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); client.setNodeApiVersions(NodeApiVersions.create(ApiKeys.OFFSET_FETCH.id, (short) 0, upperVersion)); @@ -3863,7 +3864,8 @@ public abstract class ConsumerCoordinatorTest { autoCommitIntervalMs, null, false, - null); + null, + Optional.empty()); } private Collection getRevoked(final List owned, @@ -4089,7 +4091,7 @@ public abstract class ConsumerCoordinatorTest { coordinator = new ConsumerCoordinator(rebalanceConfig, new LogContext(), consumerClient, Collections.singletonList(assignor), metadata, subscriptions, - metrics, consumerId + groupId, time, false, autoCommitIntervalMs, null, false, rackId); + metrics, consumerId + groupId, time, false, autoCommitIntervalMs, null, false, rackId, Optional.empty()); } private static MetadataResponse rackAwareMetadata(int numNodes, diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index df34088427a..9a247dd5e0f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.errors.ClusterAuthorizationException; @@ -50,6 +51,7 @@ import org.apache.kafka.common.message.AddOffsetsToTxnResponseData; import org.apache.kafka.common.message.EndTxnResponseData; import org.apache.kafka.common.message.InitProducerIdResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; +import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -72,6 +74,8 @@ import org.apache.kafka.common.requests.TxnOffsetCommitResponse; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; @@ -86,9 +90,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.MockedStatic; +import org.mockito.internal.stubbing.answers.CallsRealMethods; -import javax.management.MBeanServer; -import javax.management.ObjectName; import java.lang.management.ManagementFactory; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -114,6 +118,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import javax.management.MBeanServer; +import javax.management.ObjectName; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; @@ -130,9 +136,11 @@ import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.notNull; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -454,31 +462,48 @@ public class KafkaProducerTest { KafkaProducer producer = new KafkaProducer<>( props, new StringSerializer(), new StringSerializer()); - MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().get(0); + assertEquals(3, producer.metrics.reporters().size()); + MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().stream() + .filter(reporter -> reporter instanceof MockMetricsReporter).findFirst().get(); assertEquals(producer.getClientId(), mockMetricsReporter.clientId); - assertEquals(2, producer.metrics.reporters().size()); + producer.close(); } @Test @SuppressWarnings("deprecation") - public void testDisableJmxReporter() { + public void testDisableJmxAndClientTelemetryReporter() { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ProducerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false"); + props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false"); KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); assertTrue(producer.metrics.reporters().isEmpty()); producer.close(); } @Test - public void testExplicitlyEnableJmxReporter() { + public void testExplicitlyOnlyEnableJmxReporter() { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter"); + props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false"); KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); assertEquals(1, producer.metrics.reporters().size()); + assertTrue(producer.metrics.reporters().get(0) instanceof JmxReporter); + producer.close(); + } + + @Test + @SuppressWarnings("deprecation") + public void testExplicitlyOnlyEnableClientTelemetryReporter() { + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(ProducerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false"); + KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); + assertEquals(1, producer.metrics.reporters().size()); + assertTrue(producer.metrics.reporters().get(0) instanceof ClientTelemetryReporter); producer.close(); } @@ -1656,6 +1681,55 @@ public class KafkaProducerTest { verifyInvalidGroupMetadata(new ConsumerGroupMetadata("group", 2, JoinGroupRequest.UNKNOWN_MEMBER_ID, Optional.empty())); } + @Test + public void testClientInstanceId() { + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + + ClientTelemetryReporter clientTelemetryReporter = mock(ClientTelemetryReporter.class); + clientTelemetryReporter.configure(any()); + + MockedStatic mockedCommonClientConfigs = mockStatic(CommonClientConfigs.class, new CallsRealMethods()); + mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter(anyString(), any())).thenReturn(Optional.of(clientTelemetryReporter)); + + ClientTelemetrySender clientTelemetrySender = mock(ClientTelemetrySender.class); + Uuid expectedUuid = Uuid.randomUuid(); + when(clientTelemetryReporter.telemetrySender()).thenReturn(clientTelemetrySender); + when(clientTelemetrySender.clientInstanceId(any())).thenReturn(Optional.of(expectedUuid)); + + KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); + Uuid uuid = producer.clientInstanceId(Duration.ofMillis(0)); + assertEquals(expectedUuid, uuid); + + mockedCommonClientConfigs.close(); + producer.close(); + } + + @Test + public void testClientInstanceIdInvalidTimeout() { + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + + KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); + Exception exception = assertThrows(IllegalArgumentException.class, () -> producer.clientInstanceId(Duration.ofMillis(-1))); + assertEquals("The timeout cannot be negative.", exception.getMessage()); + + producer.close(); + } + + @Test + public void testClientInstanceIdNoTelemetryReporterRegistered() { + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false"); + + KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); + Exception exception = assertThrows(IllegalStateException.class, () -> producer.clientInstanceId(Duration.ofMillis(0))); + assertEquals("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", exception.getMessage()); + + producer.close(); + } + private void verifyInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) { Map configs = new HashMap<>(); configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id"); @@ -2403,7 +2477,8 @@ public class KafkaProducerTest { interceptors, partitioner, time, - ioThread + ioThread, + Optional.empty() ); } }