KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) (#14843)

The PR adds changes for the client APIs to register ClientTelemetryReporter, if enabled, and periodically report client metrics. The changes include front facing API changes with NetworkCLient issuing telemetry APIs.

The PR build is dependent on: #14620, #14724

Reviewers: Philip Nee <pnee@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Kirk True <ktrue@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Walker Carlson <wcarlson@apache.org>
This commit is contained in:
Apoorv Mittal 2023-12-05 23:20:33 +05:30 committed by GitHub
parent f2aeff0026
commit 2b99d0e450
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 672 additions and 65 deletions

View File

@ -66,7 +66,7 @@
files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor|AbstractRequest|AbstractResponse).java"/>
<suppress checks="ParameterNumber"
files="(NetworkClient|FieldSpec|KafkaRaftClient).java"/>
files="(NetworkClient|FieldSpec|KafkaRaftClient|KafkaProducer).java"/>
<suppress checks="ParameterNumber"
files="(KafkaConsumer|ConsumerCoordinator).java"/>
<suppress checks="ParameterNumber"
@ -77,11 +77,13 @@
files="DefaultRecordBatch.java"/>
<suppress checks="ParameterNumber"
files="MemoryRecordsBuilder.java"/>
<suppress checks="ParameterNumber"
files="ClientUtils.java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest).java"/>
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest|NetworkClientTest).java"/>
<suppress checks="BooleanExpressionComplexity"
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
@ -156,7 +157,8 @@ public final class ClientUtils {
Time time,
int maxInFlightRequestsPerConnection,
Metadata metadata,
Sensor throttleTimeSensor) {
Sensor throttleTimeSensor,
ClientTelemetrySender clientTelemetrySender) {
return createNetworkClient(config,
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG),
metrics,
@ -169,7 +171,8 @@ public final class ClientUtils {
metadata,
null,
new DefaultHostResolver(),
throttleTimeSensor);
throttleTimeSensor,
clientTelemetrySender);
}
public static NetworkClient createNetworkClient(AbstractConfig config,
@ -195,6 +198,7 @@ public final class ClientUtils {
null,
metadataUpdater,
hostResolver,
null,
null);
}
@ -210,7 +214,8 @@ public final class ClientUtils {
Metadata metadata,
MetadataUpdater metadataUpdater,
HostResolver hostResolver,
Sensor throttleTimeSensor) {
Sensor throttleTimeSensor,
ClientTelemetrySender clientTelemetrySender) {
ChannelBuilder channelBuilder = null;
Selector selector = null;
@ -239,7 +244,8 @@ public final class ClientUtils {
apiVersions,
throttleTimeSensor,
logContext,
hostResolver);
hostResolver,
clientTelemetrySender);
} catch (Throwable t) {
closeQuietly(selector, "Selector");
closeQuietly(channelBuilder, "ChannelBuilder");

View File

@ -22,6 +22,8 @@ import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,6 +32,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* Configurations shared by Kafka client applications: producer, consumer, connect, etc.
@ -294,4 +297,14 @@ public class CommonClientConfigs {
}
return reporters;
}
public static Optional<ClientTelemetryReporter> telemetryReporter(String clientId, AbstractConfig config) {
if (!config.getBoolean(CommonClientConfigs.ENABLE_METRICS_PUSH_CONFIG)) {
return Optional.empty();
}
ClientTelemetryReporter telemetryReporter = new ClientTelemetryReporter(Time.SYSTEM);
telemetryReporter.configure(config.originals(Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId)));
return Optional.of(telemetryReporter);
}
}

View File

@ -38,10 +38,13 @@ import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CorrelationIdMismatchException;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.PushTelemetryResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@ -128,6 +131,8 @@ public class NetworkClient implements KafkaClient {
private final AtomicReference<State> state;
private final TelemetrySender telemetrySender;
public NetworkClient(Selectable selector,
Metadata metadata,
String clientId,
@ -194,7 +199,8 @@ public class NetworkClient implements KafkaClient {
apiVersions,
throttleTimeSensor,
logContext,
new DefaultHostResolver());
new DefaultHostResolver(),
null);
}
public NetworkClient(Selectable selector,
@ -229,7 +235,8 @@ public class NetworkClient implements KafkaClient {
apiVersions,
null,
logContext,
new DefaultHostResolver());
new DefaultHostResolver(),
null);
}
public NetworkClient(MetadataUpdater metadataUpdater,
@ -249,7 +256,8 @@ public class NetworkClient implements KafkaClient {
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext,
HostResolver hostResolver) {
HostResolver hostResolver,
ClientTelemetrySender clientTelemetrySender) {
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
* super constructor is invoked.
@ -279,6 +287,7 @@ public class NetworkClient implements KafkaClient {
this.throttleTimeSensor = throttleTimeSensor;
this.log = logContext.logger(NetworkClient.class);
this.state = new AtomicReference<>(State.ACTIVE);
this.telemetrySender = (clientTelemetrySender != null) ? new TelemetrySender(clientTelemetrySender) : null;
}
/**
@ -361,6 +370,8 @@ public class NetworkClient implements KafkaClient {
}
} else if (request.header.apiKey() == ApiKeys.METADATA) {
metadataUpdater.handleFailedRequest(now, Optional.empty());
} else if (isTelemetryApi(request.header.apiKey()) && telemetrySender != null) {
telemetrySender.handleFailedRequest(request.header.apiKey(), null);
}
}
}
@ -522,6 +533,8 @@ public class NetworkClient implements KafkaClient {
abortedSends.add(clientResponse);
else if (clientRequest.apiKey() == ApiKeys.METADATA)
metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException));
else if (isTelemetryApi(clientRequest.apiKey()) && telemetrySender != null)
telemetrySender.handleFailedRequest(clientRequest.apiKey(), unsupportedVersionException);
}
}
@ -567,8 +580,9 @@ public class NetworkClient implements KafkaClient {
}
long metadataTimeout = metadataUpdater.maybeUpdate(now);
long telemetryTimeout = telemetrySender != null ? telemetrySender.maybeUpdate(now) : Integer.MAX_VALUE;
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
this.selector.poll(Utils.min(timeout, metadataTimeout, telemetryTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
@ -663,6 +677,8 @@ public class NetworkClient implements KafkaClient {
if (state.compareAndSet(State.CLOSING, State.CLOSED)) {
this.selector.close();
this.metadataUpdater.close();
if (telemetrySender != null)
telemetrySender.close();
} else {
log.warn("Attempting to close NetworkClient that has already been closed.");
}
@ -925,6 +941,10 @@ public class NetworkClient implements KafkaClient {
metadataUpdater.handleSuccessfulResponse(req.header, now, (MetadataResponse) response);
else if (req.isInternalRequest && response instanceof ApiVersionsResponse)
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) response);
else if (req.isInternalRequest && response instanceof GetTelemetrySubscriptionsResponse)
telemetrySender.handleResponse((GetTelemetrySubscriptionsResponse) response);
else if (req.isInternalRequest && response instanceof PushTelemetryResponse)
telemetrySender.handleResponse((PushTelemetryResponse) response);
else
responses.add(req.completed(response, now));
}
@ -1042,6 +1062,25 @@ public class NetworkClient implements KafkaClient {
}
}
/**
* Return true if there's at least one connection establishment is currently underway
*/
private boolean isAnyNodeConnecting() {
for (Node node : metadataUpdater.fetchNodes()) {
if (connectionStates.isConnecting(node.idString())) {
return true;
}
}
return false;
}
/**
* Return true if the ApiKey belongs to the Telemetry API.
*/
private boolean isTelemetryApi(ApiKeys apiKey) {
return apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY;
}
class DefaultMetadataUpdater implements MetadataUpdater {
/* the current cluster metadata */
@ -1161,18 +1200,6 @@ public class NetworkClient implements KafkaClient {
this.metadata.close();
}
/**
* Return true if there's at least one connection establishment is currently underway
*/
private boolean isAnyNodeConnecting() {
for (Node node : fetchNodes()) {
if (connectionStates.isConnecting(node.idString())) {
return true;
}
}
return false;
}
/**
* Add a metadata request to the list of sends if we can make one
*/
@ -1222,6 +1249,95 @@ public class NetworkClient implements KafkaClient {
}
class TelemetrySender {
private final ClientTelemetrySender clientTelemetrySender;
private Node stickyNode;
public TelemetrySender(ClientTelemetrySender clientTelemetrySender) {
this.clientTelemetrySender = clientTelemetrySender;
}
public long maybeUpdate(long now) {
long timeToNextUpdate = clientTelemetrySender.timeToNextUpdate(defaultRequestTimeoutMs);
if (timeToNextUpdate > 0)
return timeToNextUpdate;
// Per KIP-714, let's continue to re-use the same broker for as long as possible.
if (stickyNode == null) {
stickyNode = leastLoadedNode(now);
if (stickyNode == null) {
log.debug("Give up sending telemetry request since no node is available");
return reconnectBackoffMs;
}
}
return maybeUpdate(now, stickyNode);
}
private long maybeUpdate(long now, Node node) {
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId, now)) {
Optional<AbstractRequest.Builder<?>> requestOpt = clientTelemetrySender.createRequest();
if (!requestOpt.isPresent())
return Long.MAX_VALUE;
AbstractRequest.Builder<?> request = requestOpt.get();
ClientRequest clientRequest = newClientRequest(nodeConnectionId, request, now, true);
doSend(clientRequest, true, now);
return defaultRequestTimeoutMs;
} else {
// Per KIP-714, if we can't issue a request to this broker node, let's clear it out
// and try another broker on the next loop.
stickyNode = null;
}
// If there's any connection establishment underway, wait until it completes. This prevents
// the client from unnecessarily connecting to additional nodes while a previous connection
// attempt has not been completed.
if (isAnyNodeConnecting())
return reconnectBackoffMs;
if (connectionStates.canConnect(nodeConnectionId, now)) {
// We don't have a connection to this node right now, make one
log.debug("Initialize connection to node {} for sending telemetry request", node);
initiateConnect(node, now);
return reconnectBackoffMs;
}
// In either case, we just need to wait for a network event to let us know the selected
// connection might be usable again.
return Long.MAX_VALUE;
}
public void handleResponse(GetTelemetrySubscriptionsResponse response) {
clientTelemetrySender.handleResponse(response);
}
public void handleResponse(PushTelemetryResponse response) {
clientTelemetrySender.handleResponse(response);
}
public void handleFailedRequest(ApiKeys apiKey, KafkaException maybeFatalException) {
if (apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS)
clientTelemetrySender.handleFailedGetTelemetrySubscriptionsRequest(maybeFatalException);
else if (apiKey == ApiKeys.PUSH_TELEMETRY)
clientTelemetrySender.handleFailedPushTelemetryRequest(maybeFatalException);
else
throw new IllegalStateException("Invalid api key for failed telemetry request");
}
public void close() {
try {
clientTelemetrySender.close();
} catch (Exception exception) {
log.error("Failed to close client telemetry sender", exception);
}
}
}
@Override
public ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder<?> requestBuilder,
@ -1239,6 +1355,11 @@ public class NetworkClient implements KafkaClient {
return correlation++;
}
// visible for testing
Node telemetryConnectedNode() {
return telemetrySender.stickyNode;
}
@Override
public ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder<?> requestBuilder,

View File

@ -140,6 +140,7 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName;
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData;
import org.apache.kafka.common.message.ListGroupsRequestData;
@ -208,6 +209,8 @@ import org.apache.kafka.common.requests.ElectLeadersRequest;
import org.apache.kafka.common.requests.ElectLeadersResponse;
import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
@ -379,6 +382,12 @@ public class KafkaAdminClient extends AdminClient {
private final long retryBackoffMs;
private final long retryBackoffMaxMs;
private final ExponentialBackoff retryBackoff;
private final boolean clientTelemetryEnabled;
/**
* The telemetry requests client instance id.
*/
private Uuid clientInstanceId;
/**
* Get or create a list value from a map.
@ -533,6 +542,7 @@ public class KafkaAdminClient extends AdminClient {
}
}
// Visible for tests
static KafkaAdminClient createInternal(AdminClientConfig config,
AdminMetadataManager metadataManager,
KafkaClient client,
@ -585,6 +595,7 @@ public class KafkaAdminClient extends AdminClient {
CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
retryBackoffMaxMs,
CommonClientConfigs.RETRY_BACKOFF_JITTER);
this.clientTelemetryEnabled = config.getBoolean(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG);
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka admin client initialized");
@ -4420,7 +4431,52 @@ public class KafkaAdminClient extends AdminClient {
@Override
public Uuid clientInstanceId(Duration timeout) {
throw new UnsupportedOperationException();
if (timeout.isNegative()) {
throw new IllegalArgumentException("The timeout cannot be negative.");
}
if (!clientTelemetryEnabled) {
throw new IllegalStateException("Telemetry is not enabled. Set config `" + AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
}
if (clientInstanceId != null) {
return clientInstanceId;
}
final long now = time.milliseconds();
final KafkaFutureImpl<Uuid> future = new KafkaFutureImpl<>();
runnable.call(new Call("getTelemetrySubscriptions", calcDeadlineMs(now, (int) timeout.toMillis()),
new LeastLoadedNodeProvider()) {
@Override
GetTelemetrySubscriptionsRequest.Builder createRequest(int timeoutMs) {
return new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true);
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
GetTelemetrySubscriptionsResponse response = (GetTelemetrySubscriptionsResponse) abstractResponse;
if (response.error() != Errors.NONE) {
future.completeExceptionally(response.error().exception());
} else {
future.complete(response.data().clientInstanceId());
}
}
@Override
void handleFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
}, now);
try {
clientInstanceId = future.get();
} catch (Exception e) {
log.error("Error occurred while fetching client instance id", e);
throw new KafkaException("Error occurred while fetching client instance id", e);
}
return clientInstanceId;
}
private <K, V> void invokeDriver(

View File

@ -64,6 +64,8 @@ import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
@ -79,6 +81,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -130,6 +133,7 @@ public abstract class AbstractCoordinator implements Closeable {
private final Heartbeat heartbeat;
private final GroupCoordinatorMetrics sensors;
private final GroupRebalanceConfig rebalanceConfig;
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
protected final Time time;
protected final ConsumerNetworkClient client;
@ -160,6 +164,16 @@ public abstract class AbstractCoordinator implements Closeable {
Metrics metrics,
String metricGrpPrefix,
Time time) {
this(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, time, Optional.empty());
}
public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
LogContext logContext,
ConsumerNetworkClient client,
Metrics metrics,
String metricGrpPrefix,
Time time,
Optional<ClientTelemetryReporter> clientTelemetryReporter) {
Objects.requireNonNull(rebalanceConfig.groupId,
"Expected a non-null group id for coordinator construction");
this.rebalanceConfig = rebalanceConfig;
@ -173,6 +187,7 @@ public abstract class AbstractCoordinator implements Closeable {
CommonClientConfigs.RETRY_BACKOFF_JITTER);
this.heartbeat = new Heartbeat(rebalanceConfig, time);
this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
this.clientTelemetryReporter = clientTelemetryReporter;
}
/**
@ -648,6 +663,8 @@ public abstract class AbstractCoordinator implements Closeable {
joinResponse.data().memberId(), joinResponse.data().protocolName());
log.info("Successfully joined group with generation {}", AbstractCoordinator.this.generation);
clientTelemetryReporter.ifPresent(reporter -> reporter.updateMetricsLabels(
Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, joinResponse.data().memberId())));
if (joinResponse.isLeader()) {
onLeaderElected(joinResponse).chain(future);

View File

@ -65,9 +65,12 @@ import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@ -158,6 +161,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
private final int defaultApiTimeoutMs;
private volatile boolean closed = false;
private final List<ConsumerPartitionAssignor> assignors;
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
// to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
private boolean cachedSubscriptionHasAllFetchPositions;
@ -185,7 +189,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
log.debug("Initializing the Kafka consumer");
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.time = Time.SYSTEM;
this.metrics = createMetrics(config, time);
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
this.clientTelemetryReporter.ifPresent(reporters::add);
this.metrics = createMetrics(config, time, reporters);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
List<ConsumerInterceptor<K, V>> interceptorList = configuredConsumerInterceptors(config);
@ -215,7 +222,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
config,
apiVersions,
metrics,
fetchMetricsManager);
fetchMetricsManager,
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
final Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(time,
logContext,
backgroundEventQueue,
@ -305,6 +313,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
this.applicationEventHandler = applicationEventHandler;
this.assignors = assignors;
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
this.clientTelemetryReporter = Optional.empty();
}
// Visible for testing
@ -330,6 +339,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
this.assignors = assignors;
this.clientTelemetryReporter = Optional.empty();
ConsumerMetrics metricsRegistry = new ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
FetchMetricsManager fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
@ -918,8 +928,12 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
log.trace("Closing the Kafka consumer");
AtomicReference<Throwable> firstException = new AtomicReference<>();
final Timer closeTimer = time.timer(timeout);
clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis()));
closeTimer.update();
if (applicationEventHandler != null)
closeQuietly(() -> applicationEventHandler.close(timeout), "Failed to close application event handler with a timeout(ms)=" + timeout, firstException);
closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed to close application event handler with a timeout(ms)=" + closeTimer.remainingMs(), firstException);
// Invoke all callbacks after the background thread exists in case if there are unsent async
// commits
@ -930,6 +944,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException);
closeQuietly(metrics, "consumer metrics", firstException);
closeQuietly(deserializers, "consumer deserializers", firstException);
clientTelemetryReporter.ifPresent(reporter -> closeQuietly(reporter, "async consumer telemetry reporter", firstException));
AppInfoParser.unregisterAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics);
log.debug("Kafka consumer has been closed");
@ -979,7 +994,11 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
@Override
public Uuid clientInstanceId(Duration timeout) {
throw new KafkaException("method not implemented");
if (!clientTelemetryReporter.isPresent()) {
throw new IllegalStateException("Telemetry is not enabled. Set config `" + ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
}
return ClientTelemetryUtils.fetchClientInstanceId(clientTelemetryReporter.get(), timeout);
}
@Override

View File

@ -56,6 +56,7 @@ import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
@ -168,13 +169,15 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
int autoCommitIntervalMs,
ConsumerInterceptors<?, ?> interceptors,
boolean throwOnFetchStableOffsetsUnsupported,
String rackId) {
String rackId,
Optional<ClientTelemetryReporter> clientTelemetryReporter) {
super(rebalanceConfig,
logContext,
client,
metrics,
metricGrpPrefix,
time);
time,
clientTelemetryReporter);
this.rebalanceConfig = rebalanceConfig;
this.log = logContext.logger(ConsumerCoordinator.class);
this.metadata = metadata;

View File

@ -38,6 +38,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
@ -81,7 +82,8 @@ public final class ConsumerUtils {
Time time,
Metadata metadata,
Sensor throttleTimeSensor,
long retryBackoffMs) {
long retryBackoffMs,
ClientTelemetrySender clientTelemetrySender) {
NetworkClient netClient = ClientUtils.createNetworkClient(config,
metrics,
CONSUMER_METRIC_GROUP_PREFIX,
@ -90,7 +92,8 @@ public final class ConsumerUtils {
time,
CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION,
metadata,
throttleTimeSensor);
throttleTimeSensor,
clientTelemetrySender);
// Will avoid blocking an extended period of time to prevent heartbeat thread starvation
int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
@ -130,6 +133,11 @@ public final class ConsumerUtils {
}
public static Metrics createMetrics(ConsumerConfig config, Time time) {
return createMetrics(config, time, CommonClientConfigs.metricsReporters(
config.getString(ConsumerConfig.CLIENT_ID_CONFIG), config));
}
public static Metrics createMetrics(ConsumerConfig config, Time time, List<MetricsReporter> reporters) {
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
Map<String, String> metricsTags = Collections.singletonMap(CONSUMER_CLIENT_ID_METRIC_TAG, clientId);
MetricConfig metricConfig = new MetricConfig()
@ -137,7 +145,6 @@ public final class ConsumerUtils {
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricsTags);
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
MetricsContext metricsContext = new KafkaMetricsContext(CONSUMER_JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
return new Metrics(metricConfig, reporters, time, metricsContext);

View File

@ -48,7 +48,10 @@ import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@ -130,6 +133,7 @@ public class LegacyKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
private final int defaultApiTimeoutMs;
private volatile boolean closed = false;
private final List<ConsumerPartitionAssignor> assignors;
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
// currentThread holds the threadId of the current thread accessing LegacyKafkaConsumer
// and is used to prevent multi-threaded access
@ -160,7 +164,10 @@ public class LegacyKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.time = Time.SYSTEM;
this.metrics = createMetrics(config, time);
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
this.clientTelemetryReporter.ifPresent(reporters::add);
this.metrics = createMetrics(config, time, reporters);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
@ -188,7 +195,8 @@ public class LegacyKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
time,
metadata,
fetchMetricsManager.throttleTimeSensor(),
retryBackoffMs);
retryBackoffMs,
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
@ -214,7 +222,8 @@ public class LegacyKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
config.getString(ConsumerConfig.CLIENT_RACK_CONFIG));
config.getString(ConsumerConfig.CLIENT_RACK_CONFIG),
clientTelemetryReporter);
}
this.fetcher = new Fetcher<>(
logContext,
@ -282,6 +291,7 @@ public class LegacyKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.clientTelemetryReporter = Optional.empty();
int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
int rebalanceTimeoutMs = config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
@ -327,7 +337,8 @@ public class LegacyKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
autoCommitIntervalMs,
interceptors,
throwOnStableOffsetNotSupported,
rackId
rackId,
clientTelemetryReporter
);
} else {
this.coordinator = null;
@ -880,7 +891,12 @@ public class LegacyKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
@Override
public Uuid clientInstanceId(Duration timeout) {
throw new UnsupportedOperationException();
if (!clientTelemetryReporter.isPresent()) {
throw new IllegalStateException("Telemetry is not enabled. Set config `" + ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
}
return ClientTelemetryUtils.fetchClientInstanceId(clientTelemetryReporter.get(), timeout);
}
@Override
@ -1108,6 +1124,8 @@ public class LegacyKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
AtomicReference<Throwable> firstException = new AtomicReference<>();
final Timer closeTimer = createTimerForRequest(timeout);
clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis()));
closeTimer.update();
// Close objects with a timeout. The timeout is required because the coordinator & the fetcher send requests to
// the server in the process of closing which may not respect the overall timeout defined for closing the
// consumer.
@ -1134,6 +1152,7 @@ public class LegacyKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
closeQuietly(metrics, "consumer metrics", firstException);
closeQuietly(client, "consumer network client", firstException);
closeQuietly(deserializers, "consumer deserializers", firstException);
clientTelemetryReporter.ifPresent(reporter -> closeQuietly(reporter, "consumer telemetry reporter", firstException));
AppInfoParser.unregisterAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics);
log.debug("Kafka consumer has been closed");
Throwable exception = firstException.get();

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
@ -369,7 +370,8 @@ public class NetworkClientDelegate implements AutoCloseable {
final ConsumerConfig config,
final ApiVersions apiVersions,
final Metrics metrics,
final FetchMetricsManager fetchMetricsManager) {
final FetchMetricsManager fetchMetricsManager,
final ClientTelemetrySender clientTelemetrySender) {
return new CachedSupplier<NetworkClientDelegate>() {
@Override
protected NetworkClientDelegate create() {
@ -381,7 +383,8 @@ public class NetworkClientDelegate implements AutoCloseable {
time,
CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION,
metadata,
fetchMetricsManager.throttleTimeSensor());
fetchMetricsManager.throttleTimeSensor(),
clientTelemetrySender);
return new NetworkClientDelegate(time, config, logContext, client);
}
};

View File

@ -66,10 +66,13 @@ import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@ -80,6 +83,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -257,6 +261,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private final ProducerInterceptors<K, V> interceptors;
private final ApiVersions apiVersions;
private final TransactionManager transactionManager;
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@ -364,6 +369,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
this.clientTelemetryReporter.ifPresent(reporters::add);
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
@ -480,7 +487,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
ProducerInterceptors<K, V> interceptors,
Partitioner partitioner,
Time time,
KafkaThread ioThread) {
KafkaThread ioThread,
Optional<ClientTelemetryReporter> clientTelemetryReporter) {
this.producerConfig = config;
this.time = time;
this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
@ -503,6 +511,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.metadata = metadata;
this.sender = sender;
this.ioThread = ioThread;
this.clientTelemetryReporter = clientTelemetryReporter;
}
// visible for testing
@ -519,7 +528,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
time,
maxInflightRequests,
metadata,
throttleTimeSensor);
throttleTimeSensor,
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG));
return new Sender(logContext,
@ -1280,7 +1290,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
*/
@Override
public Uuid clientInstanceId(Duration timeout) {
throw new UnsupportedOperationException();
if (!clientTelemetryReporter.isPresent()) {
throw new IllegalStateException("Telemetry is not enabled. Set config `" + ProducerConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
}
return ClientTelemetryUtils.fetchClientInstanceId(clientTelemetryReporter.get(), timeout);
}
/**
@ -1341,16 +1355,22 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
timeoutMs);
} else {
// Try to close gracefully.
if (this.sender != null)
final Timer closeTimer = time.timer(timeout);
if (this.sender != null) {
this.sender.initiateClose();
closeTimer.update();
}
if (this.ioThread != null) {
try {
this.ioThread.join(timeoutMs);
this.ioThread.join(closeTimer.remainingMs());
} catch (InterruptedException t) {
firstException.compareAndSet(null, new InterruptException(t));
log.error("Interrupted while joining ioThread", t);
} finally {
closeTimer.update();
}
}
clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(closeTimer.remainingMs()));
}
}
@ -1374,6 +1394,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
Utils.closeQuietly(keySerializer, "producer keySerializer", firstException);
Utils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
Utils.closeQuietly(partitioner, "producer partitioner", firstException);
clientTelemetryReporter.ifPresent(reporter -> Utils.closeQuietly(reporter, "producer telemetry reporter", firstException));
AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
Throwable exception = firstException.get();
if (exception != null && !swallowException) {

View File

@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -194,4 +195,13 @@ public class ClientTelemetryUtils {
throw new KafkaException("Unable to parse MetricsData payload", e);
}
}
public static Uuid fetchClientInstanceId(ClientTelemetryReporter clientTelemetryReporter, Duration timeout) {
if (timeout.isNegative()) {
throw new IllegalArgumentException("The timeout cannot be negative.");
}
Optional<Uuid> optionalUuid = clientTelemetryReporter.telemetrySender().clientInstanceId(timeout);
return optionalUuid.orElse(null);
}
}

View File

@ -26,20 +26,29 @@ import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.message.PushTelemetryRequestData;
import org.apache.kafka.common.message.PushTelemetryResponseData;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.PushTelemetryRequest;
import org.apache.kafka.common.requests.PushTelemetryResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.DelayedReceive;
@ -71,6 +80,12 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class NetworkClientTest {
@ -992,32 +1007,48 @@ public class NetworkClientTest {
return (mockHostResolver.useNewAddresses() && newAddresses.contains(inetAddress)) ||
(!mockHostResolver.useNewAddresses() && initialAddresses.contains(inetAddress));
});
ClientTelemetrySender mockClientTelemetrySender = mock(ClientTelemetrySender.class);
when(mockClientTelemetrySender.timeToNextUpdate(anyLong())).thenReturn(0L);
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
time, false, new ApiVersions(), null, new LogContext(), mockHostResolver);
time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender);
// Connect to one the initial addresses, then change the addresses and disconnect
client.ready(node, time.milliseconds());
time.sleep(connectionSetupTimeoutMaxMsTest);
client.poll(0, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()));
// First poll should try to update the node but couldn't because node remains in connecting state
// i.e. connection handling is completed after telemetry update.
assertNull(client.telemetryConnectedNode());
client.poll(0, time.milliseconds());
assertEquals(node, client.telemetryConnectedNode());
mockHostResolver.changeAddresses();
selector.serverDisconnect(node.idString());
client.poll(0, time.milliseconds());
assertFalse(client.isReady(node, time.milliseconds()));
assertNull(client.telemetryConnectedNode());
time.sleep(reconnectBackoffMaxMsTest);
client.ready(node, time.milliseconds());
time.sleep(connectionSetupTimeoutMaxMsTest);
client.poll(0, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()));
assertNull(client.telemetryConnectedNode());
client.poll(0, time.milliseconds());
assertEquals(node, client.telemetryConnectedNode());
// We should have tried to connect to one initial address and one new address, and resolved DNS twice
assertEquals(1, initialAddressConns.get());
assertEquals(1, newAddressConns.get());
assertEquals(2, mockHostResolver.resolutionCount());
verify(mockClientTelemetrySender, times(5)).timeToNextUpdate(anyLong());
}
@Test
@ -1036,16 +1067,21 @@ public class NetworkClientTest {
// Refuse first connection attempt
return initialAddressConns.get() > 1;
});
ClientTelemetrySender mockClientTelemetrySender = mock(ClientTelemetrySender.class);
when(mockClientTelemetrySender.timeToNextUpdate(anyLong())).thenReturn(0L);
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
time, false, new ApiVersions(), null, new LogContext(), mockHostResolver);
time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender);
// First connection attempt should fail
client.ready(node, time.milliseconds());
time.sleep(connectionSetupTimeoutMaxMsTest);
client.poll(0, time.milliseconds());
assertFalse(client.isReady(node, time.milliseconds()));
assertNull(client.telemetryConnectedNode());
// Second connection attempt should succeed
time.sleep(reconnectBackoffMaxMsTest);
@ -1053,12 +1089,18 @@ public class NetworkClientTest {
time.sleep(connectionSetupTimeoutMaxMsTest);
client.poll(0, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()));
assertNull(client.telemetryConnectedNode());
// Next client poll after handling connection setup should update telemetry node.
client.poll(0, time.milliseconds());
assertEquals(node, client.telemetryConnectedNode());
// We should have tried to connect to two of the initial addresses, none of the new address, and should
// only have resolved DNS once
assertEquals(2, initialAddressConns.get());
assertEquals(0, newAddressConns.get());
assertEquals(1, mockHostResolver.resolutionCount());
verify(mockClientTelemetrySender, times(3)).timeToNextUpdate(anyLong());
}
@Test
@ -1077,21 +1119,30 @@ public class NetworkClientTest {
// Refuse first connection attempt to the new addresses
return initialAddresses.contains(inetAddress) || newAddressConns.get() > 1;
});
ClientTelemetrySender mockClientTelemetrySender = mock(ClientTelemetrySender.class);
when(mockClientTelemetrySender.timeToNextUpdate(anyLong())).thenReturn(0L);
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
time, false, new ApiVersions(), null, new LogContext(), mockHostResolver);
time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender);
// Connect to one the initial addresses, then change the addresses and disconnect
client.ready(node, time.milliseconds());
time.sleep(connectionSetupTimeoutMaxMsTest);
client.poll(0, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()));
assertNull(client.telemetryConnectedNode());
// Next client poll after handling connection setup should update telemetry node.
client.poll(0, time.milliseconds());
assertEquals(node, client.telemetryConnectedNode());
mockHostResolver.changeAddresses();
selector.serverDisconnect(node.idString());
client.poll(0, time.milliseconds());
assertFalse(client.isReady(node, time.milliseconds()));
assertNull(client.telemetryConnectedNode());
// First connection attempt to new addresses should fail
time.sleep(reconnectBackoffMaxMsTest);
@ -1099,6 +1150,7 @@ public class NetworkClientTest {
time.sleep(connectionSetupTimeoutMaxMsTest);
client.poll(0, time.milliseconds());
assertFalse(client.isReady(node, time.milliseconds()));
assertNull(client.telemetryConnectedNode());
// Second connection attempt to new addresses should succeed
time.sleep(reconnectBackoffMaxMsTest);
@ -1106,12 +1158,18 @@ public class NetworkClientTest {
time.sleep(connectionSetupTimeoutMaxMsTest);
client.poll(0, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()));
assertNull(client.telemetryConnectedNode());
// Next client poll after handling connection setup should update telemetry node.
client.poll(0, time.milliseconds());
assertEquals(node, client.telemetryConnectedNode());
// We should have tried to connect to one of the initial addresses and two of the new addresses (the first one
// failed), and resolved DNS twice, once for each set of addresses
assertEquals(1, initialAddressConns.get());
assertEquals(2, newAddressConns.get());
assertEquals(2, mockHostResolver.resolutionCount());
verify(mockClientTelemetrySender, times(6)).timeToNextUpdate(anyLong());
}
@Test
@ -1168,6 +1226,61 @@ public class NetworkClientTest {
assertTrue(client.connectionFailed(node));
}
@Test
public void testTelemetryRequest() {
ClientTelemetrySender mockClientTelemetrySender = mock(ClientTelemetrySender.class);
when(mockClientTelemetrySender.timeToNextUpdate(anyLong())).thenReturn(0L);
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
time, true, new ApiVersions(), null, new LogContext(), new DefaultHostResolver(), mockClientTelemetrySender);
// Send the ApiVersionsRequest
client.ready(node, time.milliseconds());
client.poll(0, time.milliseconds());
assertNull(client.telemetryConnectedNode());
assertTrue(client.hasInFlightRequests(node.idString()));
delayedApiVersionsResponse(0, ApiKeys.API_VERSIONS.latestVersion(), TestUtils.defaultApiVersionsResponse(
ApiMessageType.ListenerType.BROKER));
// handle ApiVersionsResponse
client.poll(0, time.milliseconds());
// the ApiVersionsRequest is gone
assertFalse(client.hasInFlightRequests(node.idString()));
selector.clear();
GetTelemetrySubscriptionsRequest.Builder getRequest = new GetTelemetrySubscriptionsRequest.Builder(
new GetTelemetrySubscriptionsRequestData(), true);
when(mockClientTelemetrySender.createRequest()).thenReturn(Optional.of(getRequest));
GetTelemetrySubscriptionsResponse getResponse = new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData());
ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(getResponse, ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS.latestVersion(), 1);
selector.completeReceive(new NetworkReceive(node.idString(), buffer));
// Initiate poll to send GetTelemetrySubscriptions request
client.poll(0, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()));
assertEquals(node, client.telemetryConnectedNode());
verify(mockClientTelemetrySender, times(1)).handleResponse(any(GetTelemetrySubscriptionsResponse.class));
selector.clear();
PushTelemetryRequest.Builder pushRequest = new PushTelemetryRequest.Builder(
new PushTelemetryRequestData(), true);
when(mockClientTelemetrySender.createRequest()).thenReturn(Optional.of(pushRequest));
PushTelemetryResponse pushResponse = new PushTelemetryResponse(new PushTelemetryResponseData());
ByteBuffer pushBuffer = RequestTestUtils.serializeResponseWithHeader(pushResponse, ApiKeys.PUSH_TELEMETRY.latestVersion(), 2);
selector.completeReceive(new NetworkReceive(node.idString(), pushBuffer));
// Initiate poll to send PushTelemetry request
client.poll(0, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()));
assertEquals(node, client.telemetryConnectedNode());
verify(mockClientTelemetrySender, times(1)).handleResponse(any(PushTelemetryResponse.class));
verify(mockClientTelemetrySender, times(4)).timeToNextUpdate(anyLong());
verify(mockClientTelemetrySender, times(2)).createRequest();
}
private RequestHeader parseHeader(ByteBuffer buffer) {
buffer.getInt(); // skip size
return RequestHeader.parse(buffer.slice());

View File

@ -33,8 +33,8 @@ import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicCollection;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicCollection;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.Uuid;
@ -110,10 +110,11 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
@ -177,6 +178,8 @@ import org.apache.kafka.common.requests.DescribeUserScramCredentialsResponse;
import org.apache.kafka.common.requests.ElectLeadersResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
@ -225,6 +228,7 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -7063,6 +7067,48 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testClientInstanceId() {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
Uuid expected = Uuid.randomUuid();
GetTelemetrySubscriptionsResponseData responseData =
new GetTelemetrySubscriptionsResponseData().setClientInstanceId(expected).setErrorCode(Errors.NONE.code());
env.kafkaClient().prepareResponse(
request -> request instanceof GetTelemetrySubscriptionsRequest,
new GetTelemetrySubscriptionsResponse(responseData));
Uuid result = env.adminClient().clientInstanceId(Duration.ofMillis(10));
assertEquals(expected, result);
}
}
@Test
public void testClientInstanceIdInvalidTimeout() {
Properties props = new Properties();
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props);
Exception exception = assertThrows(IllegalArgumentException.class, () -> admin.clientInstanceId(Duration.ofMillis(-1)));
assertEquals("The timeout cannot be negative.", exception.getMessage());
admin.close();
}
@Test
public void testClientInstanceIdNoTelemetryReporterRegistered() {
Properties props = new Properties();
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props);
Exception exception = assertThrows(IllegalStateException.class, () -> admin.clientInstanceId(Duration.ofMillis(0)));
assertEquals("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", exception.getMessage());
admin.close();
}
private UnregisterBrokerResponse prepareUnregisterBrokerResponse(Errors error, int throttleTimeMs) {
return new UnregisterBrokerResponse(new UnregisterBrokerResponseData()
.setErrorCode(error.code())

View File

@ -56,6 +56,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@ -86,6 +87,8 @@ import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@ -96,9 +99,9 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.MockedStatic;
import org.mockito.internal.stubbing.answers.CallsRealMethods;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.time.Duration;
@ -132,6 +135,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
@ -148,8 +153,13 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Note to future authors in this class. If you close the consumer, close with DURATION.ZERO to reduce the duration of
@ -220,33 +230,50 @@ public class KafkaConsumerTest {
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metricsRegistry().reporters().get(0);
assertEquals(3, consumer.metricsRegistry().reporters().size());
MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metricsRegistry().reporters().stream()
.filter(reporter -> reporter instanceof MockMetricsReporter).findFirst().get();
assertEquals(consumer.clientId(), mockMetricsReporter.clientId);
assertEquals(2, consumer.metricsRegistry().reporters().size());
}
@ParameterizedTest
@EnumSource(GroupProtocol.class)
@SuppressWarnings("deprecation")
public void testDisableJmxReporter(GroupProtocol groupProtocol) {
public void testDisableJmxAndClientTelemetryReporter(GroupProtocol groupProtocol) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
props.setProperty(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
assertTrue(consumer.metricsRegistry().reporters().isEmpty());
}
@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testExplicitlyEnableJmxReporter(GroupProtocol groupProtocol) {
public void testExplicitlyOnlyEnableJmxReporter(GroupProtocol groupProtocol) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
props.setProperty(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
assertEquals(1, consumer.metricsRegistry().reporters().size());
assertTrue(consumer.metricsRegistry().reporters().get(0) instanceof JmxReporter);
}
@ParameterizedTest
@EnumSource(GroupProtocol.class)
@SuppressWarnings("deprecation")
public void testExplicitlyOnlyEnableClientTelemetryReporter(GroupProtocol groupProtocol) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
assertEquals(1, consumer.metricsRegistry().reporters().size());
assertTrue(consumer.metricsRegistry().reporters().get(0) instanceof ClientTelemetryReporter);
}
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
@ -3293,6 +3320,53 @@ public void testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
);
}
@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testClientInstanceId() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
ClientTelemetryReporter clientTelemetryReporter = mock(ClientTelemetryReporter.class);
clientTelemetryReporter.configure(any());
MockedStatic<CommonClientConfigs> mockedCommonClientConfigs = mockStatic(CommonClientConfigs.class, new CallsRealMethods());
mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter(anyString(), any())).thenReturn(Optional.of(clientTelemetryReporter));
ClientTelemetrySender clientTelemetrySender = mock(ClientTelemetrySender.class);
Uuid expectedUuid = Uuid.randomUuid();
when(clientTelemetryReporter.telemetrySender()).thenReturn(clientTelemetrySender);
when(clientTelemetrySender.clientInstanceId(any())).thenReturn(Optional.of(expectedUuid));
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
Uuid uuid = consumer.clientInstanceId(Duration.ofMillis(0));
assertEquals(expectedUuid, uuid);
mockedCommonClientConfigs.close();
}
@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testClientInstanceIdInvalidTimeout() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
Exception exception = assertThrows(IllegalArgumentException.class, () -> consumer.clientInstanceId(Duration.ofMillis(-1)));
assertEquals("The timeout cannot be negative.", exception.getMessage());
}
@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testClientInstanceIdNoTelemetryReporterRegistered() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
Exception exception = assertThrows(IllegalStateException.class, () -> consumer.clientInstanceId(Duration.ofMillis(0)));
assertEquals("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", exception.getMessage());
}
private KafkaConsumer<String, String> consumerForCheckingTimeoutException(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);

View File

@ -3700,7 +3700,8 @@ public abstract class ConsumerCoordinatorTest {
autoCommitIntervalMs,
null,
true,
null);
null,
Optional.empty());
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
client.setNodeApiVersions(NodeApiVersions.create(ApiKeys.OFFSET_FETCH.id, (short) 0, upperVersion));
@ -3863,7 +3864,8 @@ public abstract class ConsumerCoordinatorTest {
autoCommitIntervalMs,
null,
false,
null);
null,
Optional.empty());
}
private Collection<TopicPartition> getRevoked(final List<TopicPartition> owned,
@ -4089,7 +4091,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator = new ConsumerCoordinator(rebalanceConfig, new LogContext(), consumerClient,
Collections.singletonList(assignor), metadata, subscriptions,
metrics, consumerId + groupId, time, false, autoCommitIntervalMs, null, false, rackId);
metrics, consumerId + groupId, time, false, autoCommitIntervalMs, null, false, rackId, Optional.empty());
}
private static MetadataResponse rackAwareMetadata(int numNodes,

View File

@ -37,6 +37,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
@ -50,6 +51,7 @@ import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@ -72,6 +74,8 @@ import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@ -86,9 +90,9 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;
import org.mockito.internal.stubbing.answers.CallsRealMethods;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
@ -114,6 +118,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
@ -130,9 +136,11 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -454,31 +462,48 @@ public class KafkaProducerTest {
KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().get(0);
assertEquals(3, producer.metrics.reporters().size());
MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().stream()
.filter(reporter -> reporter instanceof MockMetricsReporter).findFirst().get();
assertEquals(producer.getClientId(), mockMetricsReporter.clientId);
assertEquals(2, producer.metrics.reporters().size());
producer.close();
}
@Test
@SuppressWarnings("deprecation")
public void testDisableJmxReporter() {
public void testDisableJmxAndClientTelemetryReporter() {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
assertTrue(producer.metrics.reporters().isEmpty());
producer.close();
}
@Test
public void testExplicitlyEnableJmxReporter() {
public void testExplicitlyOnlyEnableJmxReporter() {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
assertEquals(1, producer.metrics.reporters().size());
assertTrue(producer.metrics.reporters().get(0) instanceof JmxReporter);
producer.close();
}
@Test
@SuppressWarnings("deprecation")
public void testExplicitlyOnlyEnableClientTelemetryReporter() {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
assertEquals(1, producer.metrics.reporters().size());
assertTrue(producer.metrics.reporters().get(0) instanceof ClientTelemetryReporter);
producer.close();
}
@ -1656,6 +1681,55 @@ public class KafkaProducerTest {
verifyInvalidGroupMetadata(new ConsumerGroupMetadata("group", 2, JoinGroupRequest.UNKNOWN_MEMBER_ID, Optional.empty()));
}
@Test
public void testClientInstanceId() {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
ClientTelemetryReporter clientTelemetryReporter = mock(ClientTelemetryReporter.class);
clientTelemetryReporter.configure(any());
MockedStatic<CommonClientConfigs> mockedCommonClientConfigs = mockStatic(CommonClientConfigs.class, new CallsRealMethods());
mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter(anyString(), any())).thenReturn(Optional.of(clientTelemetryReporter));
ClientTelemetrySender clientTelemetrySender = mock(ClientTelemetrySender.class);
Uuid expectedUuid = Uuid.randomUuid();
when(clientTelemetryReporter.telemetrySender()).thenReturn(clientTelemetrySender);
when(clientTelemetrySender.clientInstanceId(any())).thenReturn(Optional.of(expectedUuid));
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
Uuid uuid = producer.clientInstanceId(Duration.ofMillis(0));
assertEquals(expectedUuid, uuid);
mockedCommonClientConfigs.close();
producer.close();
}
@Test
public void testClientInstanceIdInvalidTimeout() {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
Exception exception = assertThrows(IllegalArgumentException.class, () -> producer.clientInstanceId(Duration.ofMillis(-1)));
assertEquals("The timeout cannot be negative.", exception.getMessage());
producer.close();
}
@Test
public void testClientInstanceIdNoTelemetryReporterRegistered() {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
Exception exception = assertThrows(IllegalStateException.class, () -> producer.clientInstanceId(Duration.ofMillis(0)));
assertEquals("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", exception.getMessage());
producer.close();
}
private void verifyInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
@ -2403,7 +2477,8 @@ public class KafkaProducerTest {
interceptors,
partitioner,
time,
ioThread
ioThread,
Optional.empty()
);
}
}