diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index c654ba7a2b6..3777d8714a2 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -66,7 +66,7 @@
files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor|AbstractRequest|AbstractResponse).java"/>
+ files="(NetworkClient|FieldSpec|KafkaRaftClient|KafkaProducer).java"/>
+
+ files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest|NetworkClientTest).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index b77fbeda742..6b6b56059c8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
@@ -156,7 +157,8 @@ public final class ClientUtils {
Time time,
int maxInFlightRequestsPerConnection,
Metadata metadata,
- Sensor throttleTimeSensor) {
+ Sensor throttleTimeSensor,
+ ClientTelemetrySender clientTelemetrySender) {
return createNetworkClient(config,
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG),
metrics,
@@ -169,7 +171,8 @@ public final class ClientUtils {
metadata,
null,
new DefaultHostResolver(),
- throttleTimeSensor);
+ throttleTimeSensor,
+ clientTelemetrySender);
}
public static NetworkClient createNetworkClient(AbstractConfig config,
@@ -195,6 +198,7 @@ public final class ClientUtils {
null,
metadataUpdater,
hostResolver,
+ null,
null);
}
@@ -210,7 +214,8 @@ public final class ClientUtils {
Metadata metadata,
MetadataUpdater metadataUpdater,
HostResolver hostResolver,
- Sensor throttleTimeSensor) {
+ Sensor throttleTimeSensor,
+ ClientTelemetrySender clientTelemetrySender) {
ChannelBuilder channelBuilder = null;
Selector selector = null;
@@ -239,7 +244,8 @@ public final class ClientUtils {
apiVersions,
throttleTimeSensor,
logContext,
- hostResolver);
+ hostResolver,
+ clientTelemetrySender);
} catch (Throwable t) {
closeQuietly(selector, "Selector");
closeQuietly(channelBuilder, "ChannelBuilder");
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 872c7210096..a65e2467f1b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +32,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* Configurations shared by Kafka client applications: producer, consumer, connect, etc.
@@ -294,4 +297,14 @@ public class CommonClientConfigs {
}
return reporters;
}
+
+ public static Optional telemetryReporter(String clientId, AbstractConfig config) {
+ if (!config.getBoolean(CommonClientConfigs.ENABLE_METRICS_PUSH_CONFIG)) {
+ return Optional.empty();
+ }
+
+ ClientTelemetryReporter telemetryReporter = new ClientTelemetryReporter(Time.SYSTEM);
+ telemetryReporter.configure(config.originals(Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId)));
+ return Optional.of(telemetryReporter);
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index ad81d1faed4..a596f660d02 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -38,10 +38,13 @@ import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CorrelationIdMismatchException;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -128,6 +131,8 @@ public class NetworkClient implements KafkaClient {
private final AtomicReference state;
+ private final TelemetrySender telemetrySender;
+
public NetworkClient(Selectable selector,
Metadata metadata,
String clientId,
@@ -194,7 +199,8 @@ public class NetworkClient implements KafkaClient {
apiVersions,
throttleTimeSensor,
logContext,
- new DefaultHostResolver());
+ new DefaultHostResolver(),
+ null);
}
public NetworkClient(Selectable selector,
@@ -229,7 +235,8 @@ public class NetworkClient implements KafkaClient {
apiVersions,
null,
logContext,
- new DefaultHostResolver());
+ new DefaultHostResolver(),
+ null);
}
public NetworkClient(MetadataUpdater metadataUpdater,
@@ -249,7 +256,8 @@ public class NetworkClient implements KafkaClient {
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext,
- HostResolver hostResolver) {
+ HostResolver hostResolver,
+ ClientTelemetrySender clientTelemetrySender) {
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
* super constructor is invoked.
@@ -279,6 +287,7 @@ public class NetworkClient implements KafkaClient {
this.throttleTimeSensor = throttleTimeSensor;
this.log = logContext.logger(NetworkClient.class);
this.state = new AtomicReference<>(State.ACTIVE);
+ this.telemetrySender = (clientTelemetrySender != null) ? new TelemetrySender(clientTelemetrySender) : null;
}
/**
@@ -361,6 +370,8 @@ public class NetworkClient implements KafkaClient {
}
} else if (request.header.apiKey() == ApiKeys.METADATA) {
metadataUpdater.handleFailedRequest(now, Optional.empty());
+ } else if (isTelemetryApi(request.header.apiKey()) && telemetrySender != null) {
+ telemetrySender.handleFailedRequest(request.header.apiKey(), null);
}
}
}
@@ -522,6 +533,8 @@ public class NetworkClient implements KafkaClient {
abortedSends.add(clientResponse);
else if (clientRequest.apiKey() == ApiKeys.METADATA)
metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException));
+ else if (isTelemetryApi(clientRequest.apiKey()) && telemetrySender != null)
+ telemetrySender.handleFailedRequest(clientRequest.apiKey(), unsupportedVersionException);
}
}
@@ -567,8 +580,9 @@ public class NetworkClient implements KafkaClient {
}
long metadataTimeout = metadataUpdater.maybeUpdate(now);
+ long telemetryTimeout = telemetrySender != null ? telemetrySender.maybeUpdate(now) : Integer.MAX_VALUE;
try {
- this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
+ this.selector.poll(Utils.min(timeout, metadataTimeout, telemetryTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
@@ -663,6 +677,8 @@ public class NetworkClient implements KafkaClient {
if (state.compareAndSet(State.CLOSING, State.CLOSED)) {
this.selector.close();
this.metadataUpdater.close();
+ if (telemetrySender != null)
+ telemetrySender.close();
} else {
log.warn("Attempting to close NetworkClient that has already been closed.");
}
@@ -925,6 +941,10 @@ public class NetworkClient implements KafkaClient {
metadataUpdater.handleSuccessfulResponse(req.header, now, (MetadataResponse) response);
else if (req.isInternalRequest && response instanceof ApiVersionsResponse)
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) response);
+ else if (req.isInternalRequest && response instanceof GetTelemetrySubscriptionsResponse)
+ telemetrySender.handleResponse((GetTelemetrySubscriptionsResponse) response);
+ else if (req.isInternalRequest && response instanceof PushTelemetryResponse)
+ telemetrySender.handleResponse((PushTelemetryResponse) response);
else
responses.add(req.completed(response, now));
}
@@ -1042,6 +1062,25 @@ public class NetworkClient implements KafkaClient {
}
}
+ /**
+ * Return true if there's at least one connection establishment is currently underway
+ */
+ private boolean isAnyNodeConnecting() {
+ for (Node node : metadataUpdater.fetchNodes()) {
+ if (connectionStates.isConnecting(node.idString())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Return true if the ApiKey belongs to the Telemetry API.
+ */
+ private boolean isTelemetryApi(ApiKeys apiKey) {
+ return apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY;
+ }
+
class DefaultMetadataUpdater implements MetadataUpdater {
/* the current cluster metadata */
@@ -1161,18 +1200,6 @@ public class NetworkClient implements KafkaClient {
this.metadata.close();
}
- /**
- * Return true if there's at least one connection establishment is currently underway
- */
- private boolean isAnyNodeConnecting() {
- for (Node node : fetchNodes()) {
- if (connectionStates.isConnecting(node.idString())) {
- return true;
- }
- }
- return false;
- }
-
/**
* Add a metadata request to the list of sends if we can make one
*/
@@ -1222,6 +1249,95 @@ public class NetworkClient implements KafkaClient {
}
+ class TelemetrySender {
+
+ private final ClientTelemetrySender clientTelemetrySender;
+ private Node stickyNode;
+
+ public TelemetrySender(ClientTelemetrySender clientTelemetrySender) {
+ this.clientTelemetrySender = clientTelemetrySender;
+ }
+
+ public long maybeUpdate(long now) {
+ long timeToNextUpdate = clientTelemetrySender.timeToNextUpdate(defaultRequestTimeoutMs);
+ if (timeToNextUpdate > 0)
+ return timeToNextUpdate;
+
+ // Per KIP-714, let's continue to re-use the same broker for as long as possible.
+ if (stickyNode == null) {
+ stickyNode = leastLoadedNode(now);
+ if (stickyNode == null) {
+ log.debug("Give up sending telemetry request since no node is available");
+ return reconnectBackoffMs;
+ }
+ }
+
+ return maybeUpdate(now, stickyNode);
+ }
+
+ private long maybeUpdate(long now, Node node) {
+ String nodeConnectionId = node.idString();
+
+ if (canSendRequest(nodeConnectionId, now)) {
+ Optional> requestOpt = clientTelemetrySender.createRequest();
+
+ if (!requestOpt.isPresent())
+ return Long.MAX_VALUE;
+
+ AbstractRequest.Builder> request = requestOpt.get();
+ ClientRequest clientRequest = newClientRequest(nodeConnectionId, request, now, true);
+ doSend(clientRequest, true, now);
+ return defaultRequestTimeoutMs;
+ } else {
+ // Per KIP-714, if we can't issue a request to this broker node, let's clear it out
+ // and try another broker on the next loop.
+ stickyNode = null;
+ }
+
+ // If there's any connection establishment underway, wait until it completes. This prevents
+ // the client from unnecessarily connecting to additional nodes while a previous connection
+ // attempt has not been completed.
+ if (isAnyNodeConnecting())
+ return reconnectBackoffMs;
+
+ if (connectionStates.canConnect(nodeConnectionId, now)) {
+ // We don't have a connection to this node right now, make one
+ log.debug("Initialize connection to node {} for sending telemetry request", node);
+ initiateConnect(node, now);
+ return reconnectBackoffMs;
+ }
+
+ // In either case, we just need to wait for a network event to let us know the selected
+ // connection might be usable again.
+ return Long.MAX_VALUE;
+ }
+
+ public void handleResponse(GetTelemetrySubscriptionsResponse response) {
+ clientTelemetrySender.handleResponse(response);
+ }
+
+ public void handleResponse(PushTelemetryResponse response) {
+ clientTelemetrySender.handleResponse(response);
+ }
+
+ public void handleFailedRequest(ApiKeys apiKey, KafkaException maybeFatalException) {
+ if (apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS)
+ clientTelemetrySender.handleFailedGetTelemetrySubscriptionsRequest(maybeFatalException);
+ else if (apiKey == ApiKeys.PUSH_TELEMETRY)
+ clientTelemetrySender.handleFailedPushTelemetryRequest(maybeFatalException);
+ else
+ throw new IllegalStateException("Invalid api key for failed telemetry request");
+ }
+
+ public void close() {
+ try {
+ clientTelemetrySender.close();
+ } catch (Exception exception) {
+ log.error("Failed to close client telemetry sender", exception);
+ }
+ }
+ }
+
@Override
public ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder> requestBuilder,
@@ -1239,6 +1355,11 @@ public class NetworkClient implements KafkaClient {
return correlation++;
}
+ // visible for testing
+ Node telemetryConnectedNode() {
+ return telemetrySender.stickyNode;
+ }
+
@Override
public ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder> requestBuilder,
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 18bd108a789..f663d6efc60 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -140,6 +140,7 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName;
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData;
import org.apache.kafka.common.message.ListGroupsRequestData;
@@ -208,6 +209,8 @@ import org.apache.kafka.common.requests.ElectLeadersRequest;
import org.apache.kafka.common.requests.ElectLeadersResponse;
import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
@@ -379,6 +382,12 @@ public class KafkaAdminClient extends AdminClient {
private final long retryBackoffMs;
private final long retryBackoffMaxMs;
private final ExponentialBackoff retryBackoff;
+ private final boolean clientTelemetryEnabled;
+
+ /**
+ * The telemetry requests client instance id.
+ */
+ private Uuid clientInstanceId;
/**
* Get or create a list value from a map.
@@ -533,6 +542,7 @@ public class KafkaAdminClient extends AdminClient {
}
}
+ // Visible for tests
static KafkaAdminClient createInternal(AdminClientConfig config,
AdminMetadataManager metadataManager,
KafkaClient client,
@@ -585,6 +595,7 @@ public class KafkaAdminClient extends AdminClient {
CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
retryBackoffMaxMs,
CommonClientConfigs.RETRY_BACKOFF_JITTER);
+ this.clientTelemetryEnabled = config.getBoolean(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG);
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka admin client initialized");
@@ -4420,7 +4431,52 @@ public class KafkaAdminClient extends AdminClient {
@Override
public Uuid clientInstanceId(Duration timeout) {
- throw new UnsupportedOperationException();
+ if (timeout.isNegative()) {
+ throw new IllegalArgumentException("The timeout cannot be negative.");
+ }
+
+ if (!clientTelemetryEnabled) {
+ throw new IllegalStateException("Telemetry is not enabled. Set config `" + AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
+ }
+
+ if (clientInstanceId != null) {
+ return clientInstanceId;
+ }
+
+ final long now = time.milliseconds();
+ final KafkaFutureImpl future = new KafkaFutureImpl<>();
+ runnable.call(new Call("getTelemetrySubscriptions", calcDeadlineMs(now, (int) timeout.toMillis()),
+ new LeastLoadedNodeProvider()) {
+
+ @Override
+ GetTelemetrySubscriptionsRequest.Builder createRequest(int timeoutMs) {
+ return new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true);
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ GetTelemetrySubscriptionsResponse response = (GetTelemetrySubscriptionsResponse) abstractResponse;
+ if (response.error() != Errors.NONE) {
+ future.completeExceptionally(response.error().exception());
+ } else {
+ future.complete(response.data().clientInstanceId());
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+ }, now);
+
+ try {
+ clientInstanceId = future.get();
+ } catch (Exception e) {
+ log.error("Error occurred while fetching client instance id", e);
+ throw new KafkaException("Error occurred while fetching client instance id", e);
+ }
+
+ return clientInstanceId;
}
private void invokeDriver(
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 24d05e7d63b..3ab4f2e7e6d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -64,6 +64,8 @@ import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
@@ -79,6 +81,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -130,6 +133,7 @@ public abstract class AbstractCoordinator implements Closeable {
private final Heartbeat heartbeat;
private final GroupCoordinatorMetrics sensors;
private final GroupRebalanceConfig rebalanceConfig;
+ private final Optional clientTelemetryReporter;
protected final Time time;
protected final ConsumerNetworkClient client;
@@ -160,6 +164,16 @@ public abstract class AbstractCoordinator implements Closeable {
Metrics metrics,
String metricGrpPrefix,
Time time) {
+ this(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, time, Optional.empty());
+ }
+
+ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
+ LogContext logContext,
+ ConsumerNetworkClient client,
+ Metrics metrics,
+ String metricGrpPrefix,
+ Time time,
+ Optional clientTelemetryReporter) {
Objects.requireNonNull(rebalanceConfig.groupId,
"Expected a non-null group id for coordinator construction");
this.rebalanceConfig = rebalanceConfig;
@@ -173,6 +187,7 @@ public abstract class AbstractCoordinator implements Closeable {
CommonClientConfigs.RETRY_BACKOFF_JITTER);
this.heartbeat = new Heartbeat(rebalanceConfig, time);
this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
+ this.clientTelemetryReporter = clientTelemetryReporter;
}
/**
@@ -648,6 +663,8 @@ public abstract class AbstractCoordinator implements Closeable {
joinResponse.data().memberId(), joinResponse.data().protocolName());
log.info("Successfully joined group with generation {}", AbstractCoordinator.this.generation);
+ clientTelemetryReporter.ifPresent(reporter -> reporter.updateMetricsLabels(
+ Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, joinResponse.data().memberId())));
if (joinResponse.isLeader()) {
onLeaderElected(joinResponse).chain(future);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 196bed50286..7733c45bc4a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -65,9 +65,12 @@ import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@@ -158,6 +161,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate {
private final int defaultApiTimeoutMs;
private volatile boolean closed = false;
private final List assignors;
+ private final Optional clientTelemetryReporter;
// to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
private boolean cachedSubscriptionHasAllFetchPositions;
@@ -185,7 +189,10 @@ public class AsyncKafkaConsumer implements ConsumerDelegate {
log.debug("Initializing the Kafka consumer");
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.time = Time.SYSTEM;
- this.metrics = createMetrics(config, time);
+ List reporters = CommonClientConfigs.metricsReporters(clientId, config);
+ this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
+ this.clientTelemetryReporter.ifPresent(reporters::add);
+ this.metrics = createMetrics(config, time, reporters);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
List> interceptorList = configuredConsumerInterceptors(config);
@@ -215,7 +222,8 @@ public class AsyncKafkaConsumer implements ConsumerDelegate {
config,
apiVersions,
metrics,
- fetchMetricsManager);
+ fetchMetricsManager,
+ clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
final Supplier requestManagersSupplier = RequestManagers.supplier(time,
logContext,
backgroundEventQueue,
@@ -305,6 +313,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate {
this.applicationEventHandler = applicationEventHandler;
this.assignors = assignors;
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
+ this.clientTelemetryReporter = Optional.empty();
}
// Visible for testing
@@ -330,6 +339,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate {
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
this.assignors = assignors;
+ this.clientTelemetryReporter = Optional.empty();
ConsumerMetrics metricsRegistry = new ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
FetchMetricsManager fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
@@ -918,8 +928,12 @@ public class AsyncKafkaConsumer implements ConsumerDelegate {
log.trace("Closing the Kafka consumer");
AtomicReference firstException = new AtomicReference<>();
+ final Timer closeTimer = time.timer(timeout);
+ clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis()));
+ closeTimer.update();
+
if (applicationEventHandler != null)
- closeQuietly(() -> applicationEventHandler.close(timeout), "Failed to close application event handler with a timeout(ms)=" + timeout, firstException);
+ closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed to close application event handler with a timeout(ms)=" + closeTimer.remainingMs(), firstException);
// Invoke all callbacks after the background thread exists in case if there are unsent async
// commits
@@ -930,6 +944,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate {
closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException);
closeQuietly(metrics, "consumer metrics", firstException);
closeQuietly(deserializers, "consumer deserializers", firstException);
+ clientTelemetryReporter.ifPresent(reporter -> closeQuietly(reporter, "async consumer telemetry reporter", firstException));
AppInfoParser.unregisterAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics);
log.debug("Kafka consumer has been closed");
@@ -979,7 +994,11 @@ public class AsyncKafkaConsumer implements ConsumerDelegate {
@Override
public Uuid clientInstanceId(Duration timeout) {
- throw new KafkaException("method not implemented");
+ if (!clientTelemetryReporter.isPresent()) {
+ throw new IllegalStateException("Telemetry is not enabled. Set config `" + ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
+ }
+
+ return ClientTelemetryUtils.fetchClientInstanceId(clientTelemetryReporter.get(), timeout);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 590c8a0976a..e3ce79e373f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -56,6 +56,7 @@ import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
@@ -168,13 +169,15 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
int autoCommitIntervalMs,
ConsumerInterceptors, ?> interceptors,
boolean throwOnFetchStableOffsetsUnsupported,
- String rackId) {
+ String rackId,
+ Optional clientTelemetryReporter) {
super(rebalanceConfig,
logContext,
client,
metrics,
metricGrpPrefix,
- time);
+ time,
+ clientTelemetryReporter);
this.rebalanceConfig = rebalanceConfig;
this.log = logContext.logger(ConsumerCoordinator.class);
this.metadata = metadata;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
index e267293f98b..d599d41a245 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
@@ -81,7 +82,8 @@ public final class ConsumerUtils {
Time time,
Metadata metadata,
Sensor throttleTimeSensor,
- long retryBackoffMs) {
+ long retryBackoffMs,
+ ClientTelemetrySender clientTelemetrySender) {
NetworkClient netClient = ClientUtils.createNetworkClient(config,
metrics,
CONSUMER_METRIC_GROUP_PREFIX,
@@ -90,7 +92,8 @@ public final class ConsumerUtils {
time,
CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION,
metadata,
- throttleTimeSensor);
+ throttleTimeSensor,
+ clientTelemetrySender);
// Will avoid blocking an extended period of time to prevent heartbeat thread starvation
int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -130,6 +133,11 @@ public final class ConsumerUtils {
}
public static Metrics createMetrics(ConsumerConfig config, Time time) {
+ return createMetrics(config, time, CommonClientConfigs.metricsReporters(
+ config.getString(ConsumerConfig.CLIENT_ID_CONFIG), config));
+ }
+
+ public static Metrics createMetrics(ConsumerConfig config, Time time, List reporters) {
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
Map metricsTags = Collections.singletonMap(CONSUMER_CLIENT_ID_METRIC_TAG, clientId);
MetricConfig metricConfig = new MetricConfig()
@@ -137,7 +145,6 @@ public final class ConsumerUtils {
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricsTags);
- List reporters = CommonClientConfigs.metricsReporters(clientId, config);
MetricsContext metricsContext = new KafkaMetricsContext(CONSUMER_JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
return new Metrics(metricConfig, reporters, time, metricsContext);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java
index e427322d31b..563b437cba6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java
@@ -48,7 +48,10 @@ import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@@ -130,6 +133,7 @@ public class LegacyKafkaConsumer implements ConsumerDelegate {
private final int defaultApiTimeoutMs;
private volatile boolean closed = false;
private final List assignors;
+ private final Optional clientTelemetryReporter;
// currentThread holds the threadId of the current thread accessing LegacyKafkaConsumer
// and is used to prevent multi-threaded access
@@ -160,7 +164,10 @@ public class LegacyKafkaConsumer implements ConsumerDelegate {
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.time = Time.SYSTEM;
- this.metrics = createMetrics(config, time);
+ List reporters = CommonClientConfigs.metricsReporters(clientId, config);
+ this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
+ this.clientTelemetryReporter.ifPresent(reporters::add);
+ this.metrics = createMetrics(config, time, reporters);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
@@ -188,7 +195,8 @@ public class LegacyKafkaConsumer implements ConsumerDelegate {
time,
metadata,
fetchMetricsManager.throttleTimeSensor(),
- retryBackoffMs);
+ retryBackoffMs,
+ clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
@@ -214,7 +222,8 @@ public class LegacyKafkaConsumer implements ConsumerDelegate {
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
- config.getString(ConsumerConfig.CLIENT_RACK_CONFIG));
+ config.getString(ConsumerConfig.CLIENT_RACK_CONFIG),
+ clientTelemetryReporter);
}
this.fetcher = new Fetcher<>(
logContext,
@@ -282,6 +291,7 @@ public class LegacyKafkaConsumer implements ConsumerDelegate {
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+ this.clientTelemetryReporter = Optional.empty();
int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
int rebalanceTimeoutMs = config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
@@ -327,7 +337,8 @@ public class LegacyKafkaConsumer implements ConsumerDelegate {
autoCommitIntervalMs,
interceptors,
throwOnStableOffsetNotSupported,
- rackId
+ rackId,
+ clientTelemetryReporter
);
} else {
this.coordinator = null;
@@ -880,7 +891,12 @@ public class LegacyKafkaConsumer implements ConsumerDelegate {
@Override
public Uuid clientInstanceId(Duration timeout) {
- throw new UnsupportedOperationException();
+ if (!clientTelemetryReporter.isPresent()) {
+ throw new IllegalStateException("Telemetry is not enabled. Set config `" + ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
+
+ }
+
+ return ClientTelemetryUtils.fetchClientInstanceId(clientTelemetryReporter.get(), timeout);
}
@Override
@@ -1108,6 +1124,8 @@ public class LegacyKafkaConsumer implements ConsumerDelegate {
AtomicReference firstException = new AtomicReference<>();
final Timer closeTimer = createTimerForRequest(timeout);
+ clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis()));
+ closeTimer.update();
// Close objects with a timeout. The timeout is required because the coordinator & the fetcher send requests to
// the server in the process of closing which may not respect the overall timeout defined for closing the
// consumer.
@@ -1134,6 +1152,7 @@ public class LegacyKafkaConsumer implements ConsumerDelegate {
closeQuietly(metrics, "consumer metrics", firstException);
closeQuietly(client, "consumer network client", firstException);
closeQuietly(deserializers, "consumer deserializers", firstException);
+ clientTelemetryReporter.ifPresent(reporter -> closeQuietly(reporter, "consumer telemetry reporter", firstException));
AppInfoParser.unregisterAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics);
log.debug("Kafka consumer has been closed");
Throwable exception = firstException.get();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index 732de9055c2..141f5f955c8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
@@ -369,7 +370,8 @@ public class NetworkClientDelegate implements AutoCloseable {
final ConsumerConfig config,
final ApiVersions apiVersions,
final Metrics metrics,
- final FetchMetricsManager fetchMetricsManager) {
+ final FetchMetricsManager fetchMetricsManager,
+ final ClientTelemetrySender clientTelemetrySender) {
return new CachedSupplier() {
@Override
protected NetworkClientDelegate create() {
@@ -381,7 +383,8 @@ public class NetworkClientDelegate implements AutoCloseable {
time,
CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION,
metadata,
- fetchMetricsManager.throttleTimeSensor());
+ fetchMetricsManager.throttleTimeSensor(),
+ clientTelemetrySender);
return new NetworkClientDelegate(time, config, logContext, client);
}
};
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 2b9b4c2b05a..94273358e7d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -66,10 +66,13 @@ import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@@ -80,6 +83,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -257,6 +261,7 @@ public class KafkaProducer implements Producer {
private final ProducerInterceptors interceptors;
private final ApiVersions apiVersions;
private final TransactionManager transactionManager;
+ private final Optional clientTelemetryReporter;
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -364,6 +369,8 @@ public class KafkaProducer implements Producer {
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
List reporters = CommonClientConfigs.metricsReporters(clientId, config);
+ this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
+ this.clientTelemetryReporter.ifPresent(reporters::add);
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
@@ -480,7 +487,8 @@ public class KafkaProducer implements Producer {
ProducerInterceptors interceptors,
Partitioner partitioner,
Time time,
- KafkaThread ioThread) {
+ KafkaThread ioThread,
+ Optional clientTelemetryReporter) {
this.producerConfig = config;
this.time = time;
this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
@@ -503,6 +511,7 @@ public class KafkaProducer implements Producer {
this.metadata = metadata;
this.sender = sender;
this.ioThread = ioThread;
+ this.clientTelemetryReporter = clientTelemetryReporter;
}
// visible for testing
@@ -519,7 +528,8 @@ public class KafkaProducer implements Producer {
time,
maxInflightRequests,
metadata,
- throttleTimeSensor);
+ throttleTimeSensor,
+ clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG));
return new Sender(logContext,
@@ -1280,7 +1290,11 @@ public class KafkaProducer implements Producer {
*/
@Override
public Uuid clientInstanceId(Duration timeout) {
- throw new UnsupportedOperationException();
+ if (!clientTelemetryReporter.isPresent()) {
+ throw new IllegalStateException("Telemetry is not enabled. Set config `" + ProducerConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
+ }
+
+ return ClientTelemetryUtils.fetchClientInstanceId(clientTelemetryReporter.get(), timeout);
}
/**
@@ -1341,16 +1355,22 @@ public class KafkaProducer implements Producer {
timeoutMs);
} else {
// Try to close gracefully.
- if (this.sender != null)
+ final Timer closeTimer = time.timer(timeout);
+ if (this.sender != null) {
this.sender.initiateClose();
+ closeTimer.update();
+ }
if (this.ioThread != null) {
try {
- this.ioThread.join(timeoutMs);
+ this.ioThread.join(closeTimer.remainingMs());
} catch (InterruptedException t) {
firstException.compareAndSet(null, new InterruptException(t));
log.error("Interrupted while joining ioThread", t);
+ } finally {
+ closeTimer.update();
}
}
+ clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(closeTimer.remainingMs()));
}
}
@@ -1374,6 +1394,7 @@ public class KafkaProducer implements Producer {
Utils.closeQuietly(keySerializer, "producer keySerializer", firstException);
Utils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
Utils.closeQuietly(partitioner, "producer partitioner", firstException);
+ clientTelemetryReporter.ifPresent(reporter -> Utils.closeQuietly(reporter, "producer telemetry reporter", firstException));
AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
Throwable exception = firstException.get();
if (exception != null && !swallowException) {
diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
index 91bc6f23b46..d1d62a7efe2 100644
--- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -194,4 +195,13 @@ public class ClientTelemetryUtils {
throw new KafkaException("Unable to parse MetricsData payload", e);
}
}
+
+ public static Uuid fetchClientInstanceId(ClientTelemetryReporter clientTelemetryReporter, Duration timeout) {
+ if (timeout.isNegative()) {
+ throw new IllegalArgumentException("The timeout cannot be negative.");
+ }
+
+ Optional optionalUuid = clientTelemetryReporter.telemetrySender().clientInstanceId(timeout);
+ return optionalUuid.orElse(null);
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index abf44f4c45b..747049fd192 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -26,20 +26,29 @@ import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.DelayedReceive;
@@ -71,6 +80,12 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class NetworkClientTest {
@@ -992,32 +1007,48 @@ public class NetworkClientTest {
return (mockHostResolver.useNewAddresses() && newAddresses.contains(inetAddress)) ||
(!mockHostResolver.useNewAddresses() && initialAddresses.contains(inetAddress));
});
+
+ ClientTelemetrySender mockClientTelemetrySender = mock(ClientTelemetrySender.class);
+ when(mockClientTelemetrySender.timeToNextUpdate(anyLong())).thenReturn(0L);
+
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
- time, false, new ApiVersions(), null, new LogContext(), mockHostResolver);
+ time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender);
// Connect to one the initial addresses, then change the addresses and disconnect
client.ready(node, time.milliseconds());
time.sleep(connectionSetupTimeoutMaxMsTest);
client.poll(0, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()));
+ // First poll should try to update the node but couldn't because node remains in connecting state
+ // i.e. connection handling is completed after telemetry update.
+ assertNull(client.telemetryConnectedNode());
+
+ client.poll(0, time.milliseconds());
+ assertEquals(node, client.telemetryConnectedNode());
mockHostResolver.changeAddresses();
selector.serverDisconnect(node.idString());
client.poll(0, time.milliseconds());
assertFalse(client.isReady(node, time.milliseconds()));
+ assertNull(client.telemetryConnectedNode());
time.sleep(reconnectBackoffMaxMsTest);
client.ready(node, time.milliseconds());
time.sleep(connectionSetupTimeoutMaxMsTest);
client.poll(0, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()));
+ assertNull(client.telemetryConnectedNode());
+
+ client.poll(0, time.milliseconds());
+ assertEquals(node, client.telemetryConnectedNode());
// We should have tried to connect to one initial address and one new address, and resolved DNS twice
assertEquals(1, initialAddressConns.get());
assertEquals(1, newAddressConns.get());
assertEquals(2, mockHostResolver.resolutionCount());
+ verify(mockClientTelemetrySender, times(5)).timeToNextUpdate(anyLong());
}
@Test
@@ -1036,16 +1067,21 @@ public class NetworkClientTest {
// Refuse first connection attempt
return initialAddressConns.get() > 1;
});
+
+ ClientTelemetrySender mockClientTelemetrySender = mock(ClientTelemetrySender.class);
+ when(mockClientTelemetrySender.timeToNextUpdate(anyLong())).thenReturn(0L);
+
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
- time, false, new ApiVersions(), null, new LogContext(), mockHostResolver);
+ time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender);
// First connection attempt should fail
client.ready(node, time.milliseconds());
time.sleep(connectionSetupTimeoutMaxMsTest);
client.poll(0, time.milliseconds());
assertFalse(client.isReady(node, time.milliseconds()));
+ assertNull(client.telemetryConnectedNode());
// Second connection attempt should succeed
time.sleep(reconnectBackoffMaxMsTest);
@@ -1053,12 +1089,18 @@ public class NetworkClientTest {
time.sleep(connectionSetupTimeoutMaxMsTest);
client.poll(0, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()));
+ assertNull(client.telemetryConnectedNode());
+
+ // Next client poll after handling connection setup should update telemetry node.
+ client.poll(0, time.milliseconds());
+ assertEquals(node, client.telemetryConnectedNode());
// We should have tried to connect to two of the initial addresses, none of the new address, and should
// only have resolved DNS once
assertEquals(2, initialAddressConns.get());
assertEquals(0, newAddressConns.get());
assertEquals(1, mockHostResolver.resolutionCount());
+ verify(mockClientTelemetrySender, times(3)).timeToNextUpdate(anyLong());
}
@Test
@@ -1077,21 +1119,30 @@ public class NetworkClientTest {
// Refuse first connection attempt to the new addresses
return initialAddresses.contains(inetAddress) || newAddressConns.get() > 1;
});
+
+ ClientTelemetrySender mockClientTelemetrySender = mock(ClientTelemetrySender.class);
+ when(mockClientTelemetrySender.timeToNextUpdate(anyLong())).thenReturn(0L);
+
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
- time, false, new ApiVersions(), null, new LogContext(), mockHostResolver);
+ time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender);
// Connect to one the initial addresses, then change the addresses and disconnect
client.ready(node, time.milliseconds());
time.sleep(connectionSetupTimeoutMaxMsTest);
client.poll(0, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()));
+ assertNull(client.telemetryConnectedNode());
+ // Next client poll after handling connection setup should update telemetry node.
+ client.poll(0, time.milliseconds());
+ assertEquals(node, client.telemetryConnectedNode());
mockHostResolver.changeAddresses();
selector.serverDisconnect(node.idString());
client.poll(0, time.milliseconds());
assertFalse(client.isReady(node, time.milliseconds()));
+ assertNull(client.telemetryConnectedNode());
// First connection attempt to new addresses should fail
time.sleep(reconnectBackoffMaxMsTest);
@@ -1099,6 +1150,7 @@ public class NetworkClientTest {
time.sleep(connectionSetupTimeoutMaxMsTest);
client.poll(0, time.milliseconds());
assertFalse(client.isReady(node, time.milliseconds()));
+ assertNull(client.telemetryConnectedNode());
// Second connection attempt to new addresses should succeed
time.sleep(reconnectBackoffMaxMsTest);
@@ -1106,12 +1158,18 @@ public class NetworkClientTest {
time.sleep(connectionSetupTimeoutMaxMsTest);
client.poll(0, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()));
+ assertNull(client.telemetryConnectedNode());
+
+ // Next client poll after handling connection setup should update telemetry node.
+ client.poll(0, time.milliseconds());
+ assertEquals(node, client.telemetryConnectedNode());
// We should have tried to connect to one of the initial addresses and two of the new addresses (the first one
// failed), and resolved DNS twice, once for each set of addresses
assertEquals(1, initialAddressConns.get());
assertEquals(2, newAddressConns.get());
assertEquals(2, mockHostResolver.resolutionCount());
+ verify(mockClientTelemetrySender, times(6)).timeToNextUpdate(anyLong());
}
@Test
@@ -1168,6 +1226,61 @@ public class NetworkClientTest {
assertTrue(client.connectionFailed(node));
}
+ @Test
+ public void testTelemetryRequest() {
+ ClientTelemetrySender mockClientTelemetrySender = mock(ClientTelemetrySender.class);
+ when(mockClientTelemetrySender.timeToNextUpdate(anyLong())).thenReturn(0L);
+
+ NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
+ reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
+ defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
+ time, true, new ApiVersions(), null, new LogContext(), new DefaultHostResolver(), mockClientTelemetrySender);
+
+ // Send the ApiVersionsRequest
+ client.ready(node, time.milliseconds());
+ client.poll(0, time.milliseconds());
+ assertNull(client.telemetryConnectedNode());
+ assertTrue(client.hasInFlightRequests(node.idString()));
+ delayedApiVersionsResponse(0, ApiKeys.API_VERSIONS.latestVersion(), TestUtils.defaultApiVersionsResponse(
+ ApiMessageType.ListenerType.BROKER));
+ // handle ApiVersionsResponse
+ client.poll(0, time.milliseconds());
+ // the ApiVersionsRequest is gone
+ assertFalse(client.hasInFlightRequests(node.idString()));
+ selector.clear();
+
+ GetTelemetrySubscriptionsRequest.Builder getRequest = new GetTelemetrySubscriptionsRequest.Builder(
+ new GetTelemetrySubscriptionsRequestData(), true);
+ when(mockClientTelemetrySender.createRequest()).thenReturn(Optional.of(getRequest));
+
+ GetTelemetrySubscriptionsResponse getResponse = new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData());
+ ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(getResponse, ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS.latestVersion(), 1);
+ selector.completeReceive(new NetworkReceive(node.idString(), buffer));
+
+ // Initiate poll to send GetTelemetrySubscriptions request
+ client.poll(0, time.milliseconds());
+ assertTrue(client.isReady(node, time.milliseconds()));
+ assertEquals(node, client.telemetryConnectedNode());
+ verify(mockClientTelemetrySender, times(1)).handleResponse(any(GetTelemetrySubscriptionsResponse.class));
+ selector.clear();
+
+ PushTelemetryRequest.Builder pushRequest = new PushTelemetryRequest.Builder(
+ new PushTelemetryRequestData(), true);
+ when(mockClientTelemetrySender.createRequest()).thenReturn(Optional.of(pushRequest));
+
+ PushTelemetryResponse pushResponse = new PushTelemetryResponse(new PushTelemetryResponseData());
+ ByteBuffer pushBuffer = RequestTestUtils.serializeResponseWithHeader(pushResponse, ApiKeys.PUSH_TELEMETRY.latestVersion(), 2);
+ selector.completeReceive(new NetworkReceive(node.idString(), pushBuffer));
+
+ // Initiate poll to send PushTelemetry request
+ client.poll(0, time.milliseconds());
+ assertTrue(client.isReady(node, time.milliseconds()));
+ assertEquals(node, client.telemetryConnectedNode());
+ verify(mockClientTelemetrySender, times(1)).handleResponse(any(PushTelemetryResponse.class));
+ verify(mockClientTelemetrySender, times(4)).timeToNextUpdate(anyLong());
+ verify(mockClientTelemetrySender, times(2)).createRequest();
+ }
+
private RequestHeader parseHeader(ByteBuffer buffer) {
buffer.getInt(); // skip size
return RequestHeader.parse(buffer.slice());
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index bab1814dfaf..19bff2ea5e8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -33,8 +33,8 @@ import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
-import org.apache.kafka.common.TopicCollection;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicCollection;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.Uuid;
@@ -110,10 +110,11 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
-import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
@@ -177,6 +178,8 @@ import org.apache.kafka.common.requests.DescribeUserScramCredentialsResponse;
import org.apache.kafka.common.requests.ElectLeadersResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
@@ -225,6 +228,7 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -7063,6 +7067,48 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testClientInstanceId() {
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ Uuid expected = Uuid.randomUuid();
+
+ GetTelemetrySubscriptionsResponseData responseData =
+ new GetTelemetrySubscriptionsResponseData().setClientInstanceId(expected).setErrorCode(Errors.NONE.code());
+
+ env.kafkaClient().prepareResponse(
+ request -> request instanceof GetTelemetrySubscriptionsRequest,
+ new GetTelemetrySubscriptionsResponse(responseData));
+
+ Uuid result = env.adminClient().clientInstanceId(Duration.ofMillis(10));
+ assertEquals(expected, result);
+ }
+ }
+
+ @Test
+ public void testClientInstanceIdInvalidTimeout() {
+ Properties props = new Properties();
+ props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+ KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props);
+ Exception exception = assertThrows(IllegalArgumentException.class, () -> admin.clientInstanceId(Duration.ofMillis(-1)));
+ assertEquals("The timeout cannot be negative.", exception.getMessage());
+
+ admin.close();
+ }
+
+ @Test
+ public void testClientInstanceIdNoTelemetryReporterRegistered() {
+ Properties props = new Properties();
+ props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
+
+ KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props);
+ Exception exception = assertThrows(IllegalStateException.class, () -> admin.clientInstanceId(Duration.ofMillis(0)));
+ assertEquals("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", exception.getMessage());
+
+ admin.close();
+ }
+
private UnregisterBrokerResponse prepareUnregisterBrokerResponse(Errors error, int throttleTimeMs) {
return new UnregisterBrokerResponse(new UnregisterBrokerResponseData()
.setErrorCode(error.code())
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 04067c7bf5b..93a11bef11f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -56,6 +56,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -86,6 +87,8 @@ import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -96,9 +99,9 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.MockedStatic;
+import org.mockito.internal.stubbing.answers.CallsRealMethods;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.time.Duration;
@@ -132,6 +135,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
@@ -148,8 +153,13 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Note to future authors in this class. If you close the consumer, close with DURATION.ZERO to reduce the duration of
@@ -220,33 +230,50 @@ public class KafkaConsumerTest {
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
- MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metricsRegistry().reporters().get(0);
+ assertEquals(3, consumer.metricsRegistry().reporters().size());
+ MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metricsRegistry().reporters().stream()
+ .filter(reporter -> reporter instanceof MockMetricsReporter).findFirst().get();
assertEquals(consumer.clientId(), mockMetricsReporter.clientId);
- assertEquals(2, consumer.metricsRegistry().reporters().size());
}
@ParameterizedTest
@EnumSource(GroupProtocol.class)
@SuppressWarnings("deprecation")
- public void testDisableJmxReporter(GroupProtocol groupProtocol) {
+ public void testDisableJmxAndClientTelemetryReporter(GroupProtocol groupProtocol) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
+ props.setProperty(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
assertTrue(consumer.metricsRegistry().reporters().isEmpty());
}
@ParameterizedTest
@EnumSource(GroupProtocol.class)
- public void testExplicitlyEnableJmxReporter(GroupProtocol groupProtocol) {
+ public void testExplicitlyOnlyEnableJmxReporter(GroupProtocol groupProtocol) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
+ props.setProperty(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
assertEquals(1, consumer.metricsRegistry().reporters().size());
+ assertTrue(consumer.metricsRegistry().reporters().get(0) instanceof JmxReporter);
+ }
+
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ @SuppressWarnings("deprecation")
+ public void testExplicitlyOnlyEnableClientTelemetryReporter(GroupProtocol groupProtocol) {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(ConsumerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
+ consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
+ assertEquals(1, consumer.metricsRegistry().reporters().size());
+ assertTrue(consumer.metricsRegistry().reporters().get(0) instanceof ClientTelemetryReporter);
}
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
@@ -3293,6 +3320,53 @@ public void testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
);
}
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testClientInstanceId() {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+ ClientTelemetryReporter clientTelemetryReporter = mock(ClientTelemetryReporter.class);
+ clientTelemetryReporter.configure(any());
+
+ MockedStatic mockedCommonClientConfigs = mockStatic(CommonClientConfigs.class, new CallsRealMethods());
+ mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter(anyString(), any())).thenReturn(Optional.of(clientTelemetryReporter));
+
+ ClientTelemetrySender clientTelemetrySender = mock(ClientTelemetrySender.class);
+ Uuid expectedUuid = Uuid.randomUuid();
+ when(clientTelemetryReporter.telemetrySender()).thenReturn(clientTelemetrySender);
+ when(clientTelemetrySender.clientInstanceId(any())).thenReturn(Optional.of(expectedUuid));
+
+ consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
+ Uuid uuid = consumer.clientInstanceId(Duration.ofMillis(0));
+ assertEquals(expectedUuid, uuid);
+
+ mockedCommonClientConfigs.close();
+ }
+
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testClientInstanceIdInvalidTimeout() {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+ consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
+ Exception exception = assertThrows(IllegalArgumentException.class, () -> consumer.clientInstanceId(Duration.ofMillis(-1)));
+ assertEquals("The timeout cannot be negative.", exception.getMessage());
+ }
+
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testClientInstanceIdNoTelemetryReporterRegistered() {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
+
+ consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
+ Exception exception = assertThrows(IllegalStateException.class, () -> consumer.clientInstanceId(Duration.ofMillis(0)));
+ assertEquals("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", exception.getMessage());
+ }
+
private KafkaConsumer consumerForCheckingTimeoutException(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 1bc4c778704..ba0d3bacef4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -3700,7 +3700,8 @@ public abstract class ConsumerCoordinatorTest {
autoCommitIntervalMs,
null,
true,
- null);
+ null,
+ Optional.empty());
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
client.setNodeApiVersions(NodeApiVersions.create(ApiKeys.OFFSET_FETCH.id, (short) 0, upperVersion));
@@ -3863,7 +3864,8 @@ public abstract class ConsumerCoordinatorTest {
autoCommitIntervalMs,
null,
false,
- null);
+ null,
+ Optional.empty());
}
private Collection getRevoked(final List owned,
@@ -4089,7 +4091,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator = new ConsumerCoordinator(rebalanceConfig, new LogContext(), consumerClient,
Collections.singletonList(assignor), metadata, subscriptions,
- metrics, consumerId + groupId, time, false, autoCommitIntervalMs, null, false, rackId);
+ metrics, consumerId + groupId, time, false, autoCommitIntervalMs, null, false, rackId, Optional.empty());
}
private static MetadataResponse rackAwareMetadata(int numNodes,
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index df34088427a..9a247dd5e0f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
@@ -50,6 +51,7 @@ import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -72,6 +74,8 @@ import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@@ -86,9 +90,9 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.MockedStatic;
+import org.mockito.internal.stubbing.answers.CallsRealMethods;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
@@ -114,6 +118,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
@@ -130,9 +136,11 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -454,31 +462,48 @@ public class KafkaProducerTest {
KafkaProducer producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
- MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().get(0);
+ assertEquals(3, producer.metrics.reporters().size());
+ MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().stream()
+ .filter(reporter -> reporter instanceof MockMetricsReporter).findFirst().get();
assertEquals(producer.getClientId(), mockMetricsReporter.clientId);
- assertEquals(2, producer.metrics.reporters().size());
+
producer.close();
}
@Test
@SuppressWarnings("deprecation")
- public void testDisableJmxReporter() {
+ public void testDisableJmxAndClientTelemetryReporter() {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
+ props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
assertTrue(producer.metrics.reporters().isEmpty());
producer.close();
}
@Test
- public void testExplicitlyEnableJmxReporter() {
+ public void testExplicitlyOnlyEnableJmxReporter() {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
+ props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
assertEquals(1, producer.metrics.reporters().size());
+ assertTrue(producer.metrics.reporters().get(0) instanceof JmxReporter);
+ producer.close();
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testExplicitlyOnlyEnableClientTelemetryReporter() {
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(ProducerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
+ KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
+ assertEquals(1, producer.metrics.reporters().size());
+ assertTrue(producer.metrics.reporters().get(0) instanceof ClientTelemetryReporter);
producer.close();
}
@@ -1656,6 +1681,55 @@ public class KafkaProducerTest {
verifyInvalidGroupMetadata(new ConsumerGroupMetadata("group", 2, JoinGroupRequest.UNKNOWN_MEMBER_ID, Optional.empty()));
}
+ @Test
+ public void testClientInstanceId() {
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+ ClientTelemetryReporter clientTelemetryReporter = mock(ClientTelemetryReporter.class);
+ clientTelemetryReporter.configure(any());
+
+ MockedStatic mockedCommonClientConfigs = mockStatic(CommonClientConfigs.class, new CallsRealMethods());
+ mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter(anyString(), any())).thenReturn(Optional.of(clientTelemetryReporter));
+
+ ClientTelemetrySender clientTelemetrySender = mock(ClientTelemetrySender.class);
+ Uuid expectedUuid = Uuid.randomUuid();
+ when(clientTelemetryReporter.telemetrySender()).thenReturn(clientTelemetrySender);
+ when(clientTelemetrySender.clientInstanceId(any())).thenReturn(Optional.of(expectedUuid));
+
+ KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
+ Uuid uuid = producer.clientInstanceId(Duration.ofMillis(0));
+ assertEquals(expectedUuid, uuid);
+
+ mockedCommonClientConfigs.close();
+ producer.close();
+ }
+
+ @Test
+ public void testClientInstanceIdInvalidTimeout() {
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+ KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
+ Exception exception = assertThrows(IllegalArgumentException.class, () -> producer.clientInstanceId(Duration.ofMillis(-1)));
+ assertEquals("The timeout cannot be negative.", exception.getMessage());
+
+ producer.close();
+ }
+
+ @Test
+ public void testClientInstanceIdNoTelemetryReporterRegistered() {
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
+
+ KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
+ Exception exception = assertThrows(IllegalStateException.class, () -> producer.clientInstanceId(Duration.ofMillis(0)));
+ assertEquals("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", exception.getMessage());
+
+ producer.close();
+ }
+
private void verifyInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) {
Map configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
@@ -2403,7 +2477,8 @@ public class KafkaProducerTest {
interceptors,
partitioner,
time,
- ioThread
+ ioThread,
+ Optional.empty()
);
}
}