From 30d7c71f09d1f87daad03f44313f172f8b944f5a Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Thu, 22 May 2025 10:05:35 -0500 Subject: [PATCH] KAFKA-18904: Add Admin#listConfigResources [2/N] (#19743) * Add new functions `listConfigResources(Set configResourceTypes, ListConfigResourcesOptions options)` and `listConfigResources()` to `Admin` interface. * New functions can list all kind of config resource types. * If input is a set with a type other than `CLIENT_METRICS` and request version is 0, return `UnsupportedVersionException`. * Deprecate functions `listClientMetricsResources(ListClientMetricsResourcesOptions options)` and `listClientMetricsResources()`. * Deprecate classes `ListClientMetricsResourcesResult` and `ClientMetricsResourceListing`. * Change `ClientMetricsCommand` to use `listConfigResources`. * Add integration tests to `PlaintextAdminIntegrationTest.java`. * Add unit tests to `KafkaAdminClientTest.java`. Reviewers: Andrew Schofield --------- Signed-off-by: PoAn Yang --- .../org/apache/kafka/clients/admin/Admin.java | 26 + .../admin/ClientMetricsResourceListing.java | 1 + .../kafka/clients/admin/ForwardingAdmin.java | 6 + .../kafka/clients/admin/KafkaAdminClient.java | 659 ++++++++++-------- .../ListClientMetricsResourcesOptions.java | 2 + .../ListClientMetricsResourcesResult.java | 2 + .../admin/ListConfigResourcesOptions.java | 23 + .../admin/ListConfigResourcesResult.java | 53 ++ .../requests/ListConfigResourcesRequest.java | 11 + .../requests/ListConfigResourcesResponse.java | 9 - .../clients/admin/AdminClientTestUtils.java | 16 +- .../clients/admin/KafkaAdminClientTest.java | 67 ++ .../kafka/clients/admin/MockAdminClient.java | 6 + .../common/requests/RequestResponseTest.java | 26 +- .../api/PlaintextAdminIntegrationTest.scala | 86 +++ .../unit/kafka/server/KafkaApisTest.scala | 3 +- ...TestingMetricsInterceptingAdminClient.java | 8 + .../kafka/tools/ClientMetricsCommand.java | 19 +- .../kafka/tools/ClientMetricsCommandTest.java | 14 +- 19 files changed, 695 insertions(+), 342 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesOptions.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesResult.java diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 84553dbe50a..3f90405bc65 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -1775,12 +1775,36 @@ public interface Admin extends AutoCloseable { FenceProducersResult fenceProducers(Collection transactionalIds, FenceProducersOptions options); + /** + * List the configuration resources available in the cluster which matches config resource type. + * If no config resource types are specified, all configuration resources will be listed. + * + * @param configResourceTypes The set of configuration resource types to list. + * @param options The options to use when listing the configuration resources. + * @return The ListConfigurationResourcesResult. + */ + ListConfigResourcesResult listConfigResources(Set configResourceTypes, ListConfigResourcesOptions options); + + /** + * List all configuration resources available in the cluster with the default options. + *

+ * This is a convenience method for {@link #listConfigResources(Set, ListConfigResourcesOptions)} + * with default options. See the overload for more details. + * + * @return The ListConfigurationResourcesResult. + */ + default ListConfigResourcesResult listConfigResources() { + return listConfigResources(Set.of(), new ListConfigResourcesOptions()); + } + /** * List the client metrics configuration resources available in the cluster. * * @param options The options to use when listing the client metrics resources. * @return The ListClientMetricsResourcesResult. + * @deprecated Since 4.1. Use {@link #listConfigResources(Set, ListConfigResourcesOptions)} instead. */ + @Deprecated(since = "4.1", forRemoval = true) ListClientMetricsResourcesResult listClientMetricsResources(ListClientMetricsResourcesOptions options); /** @@ -1790,7 +1814,9 @@ public interface Admin extends AutoCloseable { * with default options. See the overload for more details. * * @return The ListClientMetricsResourcesResult. + * @deprecated Since 4.1. Use {@link #listConfigResources()} instead. */ + @Deprecated(since = "4.1", forRemoval = true) default ListClientMetricsResourcesResult listClientMetricsResources() { return listClientMetricsResources(new ListClientMetricsResourcesOptions()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ClientMetricsResourceListing.java b/clients/src/main/java/org/apache/kafka/clients/admin/ClientMetricsResourceListing.java index b5c85b58732..d5af97080b0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ClientMetricsResourceListing.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ClientMetricsResourceListing.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.admin; import java.util.Objects; +@Deprecated(since = "4.1") public class ClientMetricsResourceListing { private final String name; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java index 5ac988e0acd..b99e4f6587b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java @@ -300,6 +300,12 @@ public class ForwardingAdmin implements Admin { return delegate.fenceProducers(transactionalIds, options); } + @Override + public ListConfigResourcesResult listConfigResources(Set configResourceTypes, ListConfigResourcesOptions options) { + return delegate.listConfigResources(configResourceTypes, options); + } + + @SuppressWarnings({"deprecation", "removal"}) @Override public ListClientMetricsResourcesResult listClientMetricsResources(ListClientMetricsResourcesOptions options) { return delegate.listClientMetricsResources(options); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 0abad889c85..b283d65cbee 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -419,11 +419,11 @@ public class KafkaAdminClient extends AdminClient { /** * Get or create a list value from a map. * - * @param map The map to get or create the element from. - * @param key The key. - * @param The key type. - * @param The value type. - * @return The list value. + * @param map The map to get or create the element from. + * @param key The key. + * @param The key type. + * @param The value type. + * @return The list value. */ static List getOrCreateListValue(Map> map, K key) { return map.computeIfAbsent(key, k -> new LinkedList<>()); @@ -432,9 +432,9 @@ public class KafkaAdminClient extends AdminClient { /** * Send an exception to every element in a collection of KafkaFutureImpls. * - * @param futures The collection of KafkaFutureImpl objects. - * @param exc The exception - * @param The KafkaFutureImpl result type. + * @param futures The collection of KafkaFutureImpl objects. + * @param exc The exception + * @param The KafkaFutureImpl result type. */ private static void completeAllExceptionally(Collection> futures, Throwable exc) { completeAllExceptionally(futures.stream(), exc); @@ -443,9 +443,9 @@ public class KafkaAdminClient extends AdminClient { /** * Send an exception to all futures in the provided stream * - * @param futures The stream of KafkaFutureImpl objects. - * @param exc The exception - * @param The KafkaFutureImpl result type. + * @param futures The stream of KafkaFutureImpl objects. + * @param exc The exception + * @param The KafkaFutureImpl result type. */ private static void completeAllExceptionally(Stream> futures, Throwable exc) { futures.forEach(future -> future.completeExceptionally(exc)); @@ -454,9 +454,9 @@ public class KafkaAdminClient extends AdminClient { /** * Get the current time remaining before a deadline as an integer. * - * @param now The current time in milliseconds. - * @param deadlineMs The deadline time in milliseconds. - * @return The time delta in milliseconds. + * @param now The current time in milliseconds. + * @param deadlineMs The deadline time in milliseconds. + * @return The time delta in milliseconds. */ static int calcTimeoutMsRemainingAsInt(long now, long deadlineMs) { long deltaMs = deadlineMs - now; @@ -470,9 +470,8 @@ public class KafkaAdminClient extends AdminClient { /** * Generate the client id based on the configuration. * - * @param config The configuration - * - * @return The client id + * @param config The configuration + * @return The client id */ static String generateClientId(AdminClientConfig config) { String clientId = config.getString(AdminClientConfig.CLIENT_ID_CONFIG); @@ -488,10 +487,9 @@ public class KafkaAdminClient extends AdminClient { /** * Get the deadline for a particular call. * - * @param now The current time in milliseconds. - * @param optionTimeoutMs The timeout option given by the user. - * - * @return The deadline in milliseconds. + * @param now The current time in milliseconds. + * @param optionTimeoutMs The timeout option given by the user. + * @return The deadline in milliseconds. */ private long calcDeadlineMs(long now, Integer optionTimeoutMs) { if (optionTimeoutMs != null) @@ -502,9 +500,8 @@ public class KafkaAdminClient extends AdminClient { /** * Pretty-print an exception. * - * @param throwable The exception. - * - * @return A compact human-readable string. + * @param throwable The exception. + * @return A compact human-readable string. */ static String prettyPrintException(Throwable throwable) { if (throwable == null) @@ -550,7 +547,7 @@ public class KafkaAdminClient extends AdminClient { .recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricTags); MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, - config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); + config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); metrics = new Metrics(metricConfig, reporters, time, metricsContext); networkClient = ClientUtils.createNetworkClient(config, clientId, @@ -656,11 +653,11 @@ public class KafkaAdminClient extends AdminClient { if (defaultApiTimeoutMs < requestTimeoutMs) { if (config.originals().containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) { throw new ConfigException("The specified value of " + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG + - " must be no smaller than the value of " + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG + "."); + " must be no smaller than the value of " + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG + "."); } else { log.warn("Overriding the default value for {} ({}) with the explicitly configured request timeout {}", - AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, this.defaultApiTimeoutMs, - requestTimeoutMs); + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, this.defaultApiTimeoutMs, + requestTimeoutMs); return requestTimeoutMs; } } @@ -718,6 +715,7 @@ public class KafkaAdminClient extends AdminClient { */ private interface NodeProvider { Node provide(); + boolean supportsUseControllers(); } @@ -727,7 +725,7 @@ public class KafkaAdminClient extends AdminClient { long now = time.milliseconds(); LeastLoadedNode leastLoadedNode = client.leastLoadedNode(now); if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP - && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) { + && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) { metadataManager.rebootstrap(now); } @@ -757,7 +755,7 @@ public class KafkaAdminClient extends AdminClient { @Override public Node provide() { if (metadataManager.isReady() && - (metadataManager.nodeById(nodeId) != null)) { + (metadataManager.nodeById(nodeId) != null)) { return metadataManager.nodeById(nodeId); } // If we can't find the node with the given constant ID, we schedule a @@ -791,7 +789,7 @@ public class KafkaAdminClient extends AdminClient { @Override public Node provide() { if (metadataManager.isReady() && - (metadataManager.controller() != null)) { + (metadataManager.controller() != null)) { return metadataManager.controller(); } metadataManager.requestUpdate(); @@ -893,13 +891,13 @@ public class KafkaAdminClient extends AdminClient { /** * Handle a failure. - * + *

* Depending on what the exception is and how many times we have already tried, we may choose to * fail the Call, or retry it. It is important to print the stack traces here in some cases, * since they are not necessarily preserved in ApiVersionException objects. * - * @param now The current time in milliseconds. - * @param throwable The failure exception. + * @param now The current time in milliseconds. + * @param throwable The failure exception. */ final void fail(long now, Throwable throwable) { if (curNode != null) { @@ -915,7 +913,7 @@ public class KafkaAdminClient extends AdminClient { // protocol downgrade will not count against the total number of retries we get for // this RPC. That is why 'tries' is not incremented. if ((throwable instanceof UnsupportedVersionException) && - handleUnsupportedVersionException((UnsupportedVersionException) throwable)) { + handleUnsupportedVersionException((UnsupportedVersionException) throwable)) { log.debug("{} attempting protocol downgrade and then retry.", this); runnable.pendingCalls.add(this); return; @@ -969,16 +967,14 @@ public class KafkaAdminClient extends AdminClient { * Create an AbstractRequest.Builder for this Call. * * @param timeoutMs The timeout in milliseconds. - * - * @return The AbstractRequest builder. + * @return The AbstractRequest builder. */ abstract AbstractRequest.Builder createRequest(int timeoutMs); /** * Process the call response. * - * @param abstractResponse The AbstractResponse. - * + * @param abstractResponse The AbstractResponse. */ abstract void handleResponse(AbstractResponse abstractResponse); @@ -986,16 +982,15 @@ public class KafkaAdminClient extends AdminClient { * Handle a failure. This will only be called if the failure exception was not * retriable, or if we hit a timeout. * - * @param throwable The exception. + * @param throwable The exception. */ abstract void handleFailure(Throwable throwable); /** * Handle an UnsupportedVersionException. * - * @param exception The exception. - * - * @return True if the exception can be handled; false otherwise. + * @param exception The exception. + * @return True if the exception can be handled; false otherwise. */ boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { return false; @@ -1032,7 +1027,7 @@ public class KafkaAdminClient extends AdminClient { /** * Create a new timeout processor. * - * @param now The current time in milliseconds since the epoch. + * @param now The current time in milliseconds since the epoch. */ TimeoutProcessor(long now) { this.now = now; @@ -1044,9 +1039,8 @@ public class KafkaAdminClient extends AdminClient { * Timed out calls will be removed and failed. * The remaining milliseconds until the next timeout will be updated. * - * @param calls The collection of calls. - * - * @return The number of calls which were timed out. + * @param calls The collection of calls. + * @return The number of calls which were timed out. */ int handleTimeouts(Collection calls, String msg) { int numTimedOut = 0; @@ -1068,9 +1062,8 @@ public class KafkaAdminClient extends AdminClient { * Check whether a call should be timed out. * The remaining milliseconds until the next timeout will be updated. * - * @param call The call. - * - * @return True if the call should be timed out. + * @param call The call. + * @return True if the call should be timed out. */ boolean callHasExpired(Call call) { int remainingMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs); @@ -1130,7 +1123,7 @@ public class KafkaAdminClient extends AdminClient { /** * Time out the elements in the pendingCalls list which are expired. * - * @param processor The timeout processor. + * @param processor The timeout processor. */ private void timeoutPendingCalls(TimeoutProcessor processor) { int numTimedOut = processor.handleTimeouts(pendingCalls, "Timed out waiting for a node assignment."); @@ -1141,7 +1134,7 @@ public class KafkaAdminClient extends AdminClient { /** * Time out calls which have been assigned to nodes. * - * @param processor The timeout processor. + * @param processor The timeout processor. */ private int timeoutCallsToSend(TimeoutProcessor processor) { int numTimedOut = 0; @@ -1156,7 +1149,7 @@ public class KafkaAdminClient extends AdminClient { /** * Drain all the calls from newCalls into pendingCalls. - * + *

* This function holds the lock for the minimum amount of time, to avoid blocking * users of AdminClient who will also take the lock to add new calls. */ @@ -1168,7 +1161,7 @@ public class KafkaAdminClient extends AdminClient { * Add some calls to pendingCalls, and then clear the input list. * Also clears Call#curNode. * - * @param calls The calls to add. + * @param calls The calls to add. */ private void transitionToPendingAndClearList(List calls) { for (Call call : calls) { @@ -1181,9 +1174,9 @@ public class KafkaAdminClient extends AdminClient { /** * Choose nodes for the calls in the pendingCalls list. * - * @param now The current time in milliseconds. - * @return The minimum time until a call is ready to be retried if any of the pending - * calls are backing off after a failure + * @param now The current time in milliseconds. + * @return The minimum time until a call is ready to be retried if any of the pending + * calls are backing off after a failure */ private long maybeDrainPendingCalls(long now) { long pollTimeout = Long.MAX_VALUE; @@ -1241,8 +1234,8 @@ public class KafkaAdminClient extends AdminClient { /** * Send the calls which are ready. * - * @param now The current time in milliseconds. - * @return The minimum timeout we need for poll(). + * @param now The current time in milliseconds. + * @return The minimum timeout we need for poll(). */ private long sendEligibleCalls(long now) { long pollTimeout = Long.MAX_VALUE; @@ -1264,7 +1257,7 @@ public class KafkaAdminClient extends AdminClient { if (deadline != null) { if (now >= deadline) { log.info("Disconnecting from {} and revoking {} node assignment(s) " + - "because the node is taking too long to become ready.", + "because the node is taking too long to become ready.", node.idString(), calls.size()); transitionToPendingAndClearList(calls); client.disconnect(node.idString()); @@ -1317,12 +1310,12 @@ public class KafkaAdminClient extends AdminClient { /** * Time out expired calls that are in flight. - * + *

* Calls that are in flight may have been partially or completely sent over the wire. They may * even be in the process of being processed by the remote server. At the moment, our only option * to time them out is to close the entire connection. * - * @param processor The timeout processor. + * @param processor The timeout processor. */ private void timeoutCallsInFlight(TimeoutProcessor processor) { int numTimedOut = 0; @@ -1345,8 +1338,8 @@ public class KafkaAdminClient extends AdminClient { /** * Handle responses from the server. * - * @param now The current time in milliseconds. - * @param responses The latest responses from KafkaClient. + * @param now The current time in milliseconds. + * @param responses The latest responses from KafkaClient. */ private void handleResponses(long now, List responses) { for (ClientResponse response : responses) { @@ -1357,7 +1350,7 @@ public class KafkaAdminClient extends AdminClient { // If the server returns information about a correlation ID we didn't use yet, // an internal server error has occurred. Close the connection and log an error message. log.error("Internal server error on {}: server returned information about unknown " + - "correlation ID {}, requestHeader = {}", response.destination(), correlationId, + "correlation ID {}, requestHeader = {}", response.destination(), correlationId, response.requestHeader()); client.disconnect(response.destination()); continue; @@ -1476,7 +1469,7 @@ public class KafkaAdminClient extends AdminClient { numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls, "The AdminClient thread has exited."); numTimedOut += timeoutCallsToSend(timeoutProcessor); numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(), - "The AdminClient thread has exited."); + "The AdminClient thread has exited."); if (numTimedOut > 0) { log.info("Timed out {} remaining operation(s) during close.", numTimedOut); } @@ -1546,13 +1539,13 @@ public class KafkaAdminClient extends AdminClient { /** * Queue a call for sending. - * + *

* If the AdminClient thread has exited, this will fail. Otherwise, it will succeed (even * if the AdminClient is shutting down). This function should called when retrying an * existing call. * - * @param call The new call object. - * @param now The current time in milliseconds. + * @param call The new call object. + * @param now The current time in milliseconds. */ void enqueue(Call call, long now) { if (call.tries > maxRetries) { @@ -1583,18 +1576,18 @@ public class KafkaAdminClient extends AdminClient { /** * Initiate a new call. - * + *

* This will fail if the AdminClient is scheduled to shut down. * - * @param call The new call object. - * @param now The current time in milliseconds. + * @param call The new call object. + * @param now The current time in milliseconds. */ void call(Call call, long now) { if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) { log.debug("Cannot accept new call {} when AdminClient is closing.", call); call.handleFailure(new IllegalStateException("Cannot accept new calls when AdminClient is closing.")); } else if (metadataManager.usingBootstrapControllers() && - (!call.nodeProvider.supportsUseControllers())) { + (!call.nodeProvider.supportsUseControllers())) { call.fail(now, new UnsupportedEndpointTypeException("This Admin API is not " + "yet supported when communicating directly with the controller quorum.")); } else { @@ -1616,7 +1609,7 @@ public class KafkaAdminClient extends AdminClient { private Call makeControllerMetadataCall(long now) { // Use DescribeCluster here, as specified by KIP-919. return new Call(true, "describeCluster", calcDeadlineMs(now, requestTimeoutMs), - new MetadataUpdateNodeIdProvider()) { + new MetadataUpdateNodeIdProvider()) { @Override public DescribeClusterRequest.Builder createRequest(int timeoutMs) { return new DescribeClusterRequest.Builder(new DescribeClusterRequestData() @@ -1659,7 +1652,7 @@ public class KafkaAdminClient extends AdminClient { // We use MetadataRequest here so that we can continue to support brokers that are too // old to handle DescribeCluster. return new Call(true, "fetchMetadata", calcDeadlineMs(now, requestTimeoutMs), - new MetadataUpdateNodeIdProvider()) { + new MetadataUpdateNodeIdProvider()) { @Override public MetadataRequest.Builder createRequest(int timeoutMs) { // Since this only requests node information, it's safe to pass true @@ -1748,10 +1741,10 @@ public class KafkaAdminClient extends AdminClient { * Used when a response handler expected a result for some entity but no result was present. */ private static void completeUnrealizedFutures( - Stream>> futures, - Function messageFormatter) { + Stream>> futures, + Function messageFormatter) { futures.filter(entry -> !entry.getValue().isDone()).forEach(entry -> - entry.getValue().completeExceptionally(new ApiException(messageFormatter.apply(entry.getKey())))); + entry.getValue().completeExceptionally(new ApiException(messageFormatter.apply(entry.getKey())))); } /** @@ -1759,11 +1752,11 @@ public class KafkaAdminClient extends AdminClient { * the initial error back to the caller if the request timed out. */ private static void maybeCompleteQuotaExceededException( - boolean shouldRetryOnQuotaViolation, - Throwable throwable, - Map> futures, - Map quotaExceededExceptions, - int throttleTimeDelta) { + boolean shouldRetryOnQuotaViolation, + Throwable throwable, + Map> futures, + Map quotaExceededExceptions, + int throttleTimeDelta) { if (shouldRetryOnQuotaViolation && throwable instanceof TimeoutException) { quotaExceededExceptions.forEach((key, value) -> futures.get(key).completeExceptionally( new ThrottlingQuotaExceededException( @@ -2042,10 +2035,10 @@ public class KafkaAdminClient extends AdminClient { @Override DeleteTopicsRequest.Builder createRequest(int timeoutMs) { return new DeleteTopicsRequest.Builder( - new DeleteTopicsRequestData() - .setTopics(topicIds.stream().map( - topic -> new DeleteTopicState().setTopicId(topic)).collect(Collectors.toList())) - .setTimeoutMs(timeoutMs)); + new DeleteTopicsRequestData() + .setTopics(topicIds.stream().map( + topic -> new DeleteTopicState().setTopicId(topic)).collect(Collectors.toList())) + .setTimeoutMs(timeoutMs)); } @Override @@ -2065,7 +2058,7 @@ public class KafkaAdminClient extends AdminClient { if (error.isFailure()) { if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) { ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException( - response.throttleTimeMs(), error.messageWithFallback()); + response.throttleTimeMs(), error.messageWithFallback()); if (options.shouldRetryOnQuotaViolation()) { retryTopics.add(result.topicId()); retryTopicQuotaExceededExceptions.put(result.topicId(), quotaExceededException); @@ -2088,7 +2081,7 @@ public class KafkaAdminClient extends AdminClient { } else { final long now = time.milliseconds(); final Call call = getDeleteTopicsWithIdsCall(options, futures, retryTopics, - retryTopicQuotaExceededExceptions, now, deadline); + retryTopicQuotaExceededExceptions, now, deadline); runnable.call(call, now); } } @@ -2098,7 +2091,7 @@ public class KafkaAdminClient extends AdminClient { // If there were any topics retries due to a quota exceeded exception, we propagate // the initial error back to the caller if the request timed out. maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(), - throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now)); + throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now)); // Fail all the other remaining futures completeAllExceptionally(futures.values(), throwable); } @@ -2285,7 +2278,7 @@ public class KafkaAdminClient extends AdminClient { } if (partiallyFinishedTopicDescription != null && - (responseCursor == null || !responseCursor.topicName().equals(partiallyFinishedTopicDescription.name()))) { + (responseCursor == null || !responseCursor.topicName().equals(partiallyFinishedTopicDescription.name()))) { // We can't simply check nextTopicDescription != null here to close the partiallyFinishedTopicDescription. // Because the responseCursor topic may not show in the response. String topicName = partiallyFinishedTopicDescription.name(); @@ -2368,7 +2361,7 @@ public class KafkaAdminClient extends AdminClient { if (topicIdIsUnrepresentable(topicId)) { KafkaFutureImpl future = new KafkaFutureImpl<>(); future.completeExceptionally(new InvalidTopicException("The given topic id '" + - topicId + "' cannot be represented in a request.")); + topicId + "' cannot be represented in a request.")); topicFutures.put(topicId, future); } else if (!topicFutures.containsKey(topicId)) { topicFutures.put(topicId, new KafkaFutureImpl<>()); @@ -2377,14 +2370,14 @@ public class KafkaAdminClient extends AdminClient { } final long now = time.milliseconds(); Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedNodeProvider()) { + new LeastLoadedNodeProvider()) { @Override MetadataRequest.Builder createRequest(int timeoutMs) { return new MetadataRequest.Builder(new MetadataRequestData() - .setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList)) - .setAllowAutoTopicCreation(false) - .setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations())); + .setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList)) + .setAllowAutoTopicCreation(false) + .setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations())); } @Override @@ -2446,8 +2439,8 @@ public class KafkaAdminClient extends AdminClient { List partitions = new ArrayList<>(partitionInfos.size()); for (PartitionInfo partitionInfo : partitionInfos) { TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo( - partitionInfo.partition(), leader(partitionInfo), Arrays.asList(partitionInfo.replicas()), - Arrays.asList(partitionInfo.inSyncReplicas())); + partitionInfo.partition(), leader(partitionInfo), Arrays.asList(partitionInfo.replicas()), + Arrays.asList(partitionInfo.inSyncReplicas())); partitions.add(topicPartitionInfo); } partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition)); @@ -2482,7 +2475,7 @@ public class KafkaAdminClient extends AdminClient { return new DescribeClusterRequest.Builder(new DescribeClusterRequestData() .setIncludeClusterAuthorizedOperations(options.includeAuthorizedOperations()) .setEndpointType(metadataManager.usingBootstrapControllers() ? - EndpointType.CONTROLLER.id() : EndpointType.BROKER.id()) + EndpointType.CONTROLLER.id() : EndpointType.BROKER.id()) .setIncludeFencedBrokers(options.includeFencedBrokers())); } else { // Since this only requests node information, it's safe to pass true for allowAutoTopicCreation (and it @@ -2566,7 +2559,7 @@ public class KafkaAdminClient extends AdminClient { if (filter.isUnknown()) { KafkaFutureImpl> future = new KafkaFutureImpl<>(); future.completeExceptionally(new InvalidRequestException("The AclBindingFilter " + - "must not contain UNKNOWN elements.")); + "must not contain UNKNOWN elements.")); return new DescribeAclsResult(future); } final long now = time.milliseconds(); @@ -2766,15 +2759,15 @@ public class KafkaAdminClient extends AdminClient { if (future == null) { if (node != null) { log.warn("The config {} in the response from node {} is not in the request", - configResource, node); + configResource, node); } else { log.warn("The config {} in the response from the least loaded broker is not in the request", - configResource); + configResource); } } else { if (describeConfigsResult.errorCode() != Errors.NONE.code()) { future.completeExceptionally(Errors.forCode(describeConfigsResult.errorCode()) - .exception(describeConfigsResult.errorMessage())); + .exception(describeConfigsResult.errorMessage())); } else { future.complete(describeConfigResult(describeConfigsResult)); } @@ -2810,15 +2803,15 @@ public class KafkaAdminClient extends AdminClient { private Config describeConfigResult(DescribeConfigsResponseData.DescribeConfigsResult describeConfigsResult) { return new Config(describeConfigsResult.configs().stream().map(config -> new ConfigEntry( - config.name(), - config.value(), - DescribeConfigsResponse.ConfigSource.forId(config.configSource()).source(), - config.isSensitive(), - config.readOnly(), - (config.synonyms().stream().map(synonym -> new ConfigEntry.ConfigSynonym(synonym.name(), synonym.value(), - DescribeConfigsResponse.ConfigSource.forId(synonym.source()).source()))).collect(Collectors.toList()), - DescribeConfigsResponse.ConfigType.forId(config.configType()).type(), - config.documentation() + config.name(), + config.value(), + DescribeConfigsResponse.ConfigSource.forId(config.configSource()).source(), + config.isSensitive(), + config.readOnly(), + (config.synonyms().stream().map(synonym -> new ConfigEntry.ConfigSynonym(synonym.name(), synonym.value(), + DescribeConfigsResponse.ConfigSource.forId(synonym.source()).source()))).collect(Collectors.toList()), + DescribeConfigsResponse.ConfigType.forId(config.configType()).type(), + config.documentation() )).collect(Collectors.toList())); } @@ -2930,7 +2923,7 @@ public class KafkaAdminClient extends AdminClient { futures.put(replica, new KafkaFutureImpl<>()); Map replicaAssignmentByBroker = new HashMap<>(); - for (Map.Entry entry: replicaAssignment.entrySet()) { + for (Map.Entry entry : replicaAssignment.entrySet()) { TopicPartitionReplica replica = entry.getKey(); String logDir = entry.getValue(); int brokerId = replica.brokerId(); @@ -2951,7 +2944,7 @@ public class KafkaAdminClient extends AdminClient { } final long now = time.milliseconds(); - for (Map.Entry entry: replicaAssignmentByBroker.entrySet()) { + for (Map.Entry entry : replicaAssignmentByBroker.entrySet()) { final int brokerId = entry.getKey(); final AlterReplicaLogDirsRequestData assignment = entry.getValue(); @@ -2966,15 +2959,15 @@ public class KafkaAdminClient extends AdminClient { @Override public void handleResponse(AbstractResponse abstractResponse) { AlterReplicaLogDirsResponse response = (AlterReplicaLogDirsResponse) abstractResponse; - for (AlterReplicaLogDirTopicResult topicResult: response.data().results()) { - for (AlterReplicaLogDirPartitionResult partitionResult: topicResult.partitions()) { + for (AlterReplicaLogDirTopicResult topicResult : response.data().results()) { + for (AlterReplicaLogDirPartitionResult partitionResult : topicResult.partitions()) { TopicPartitionReplica replica = new TopicPartitionReplica( - topicResult.topicName(), partitionResult.partitionIndex(), brokerId); + topicResult.topicName(), partitionResult.partitionIndex(), brokerId); KafkaFutureImpl future = futures.get(replica); if (future == null) { log.warn("The partition {} in the response from broker {} is not in the request", - new TopicPartition(topicResult.topicName(), partitionResult.partitionIndex()), - brokerId); + new TopicPartition(topicResult.topicName(), partitionResult.partitionIndex()), + brokerId); } else if (partitionResult.errorCode() == Errors.NONE.code()) { future.complete(null); } else { @@ -2986,8 +2979,9 @@ public class KafkaAdminClient extends AdminClient { completeUnrealizedFutures( futures.entrySet().stream().filter(entry -> entry.getKey().brokerId() == brokerId), replica -> "The response from broker " + brokerId + - " did not contain a result for replica " + replica); + " did not contain a result for replica " + replica); } + @Override void handleFailure(Throwable throwable) { // Only completes the futures of brokerId @@ -3030,11 +3024,12 @@ public class KafkaAdminClient extends AdminClient { } else { // Up to v3 DescribeLogDirsResponse did not have an error code field, hence it defaults to None Errors error = response.data().errorCode() == Errors.NONE.code() - ? Errors.CLUSTER_AUTHORIZATION_FAILED - : Errors.forCode(response.data().errorCode()); + ? Errors.CLUSTER_AUTHORIZATION_FAILED + : Errors.forCode(response.data().errorCode()); future.completeExceptionally(error.exception()); } } + @Override void handleFailure(Throwable throwable) { future.completeExceptionally(throwable); @@ -3052,15 +3047,15 @@ public class KafkaAdminClient extends AdminClient { for (DescribeLogDirsResponseData.DescribeLogDirsTopic t : logDirResult.topics()) { for (DescribeLogDirsResponseData.DescribeLogDirsPartition p : t.partitions()) { replicaInfoMap.put( - new TopicPartition(t.name(), p.partitionIndex()), - new ReplicaInfo(p.partitionSize(), p.offsetLag(), p.isFutureKey())); + new TopicPartition(t.name(), p.partitionIndex()), + new ReplicaInfo(p.partitionSize(), p.offsetLag(), p.isFutureKey())); } } result.put(logDirResult.logDir(), new LogDirDescription( - Errors.forCode(logDirResult.errorCode()).exception(), - replicaInfoMap, - logDirResult.totalBytes(), - logDirResult.usableBytes())); + Errors.forCode(logDirResult.errorCode()).exception(), + replicaInfoMap, + logDirResult.totalBytes(), + logDirResult.usableBytes())); } return result; } @@ -3075,7 +3070,7 @@ public class KafkaAdminClient extends AdminClient { Map partitionsByBroker = new HashMap<>(); - for (TopicPartitionReplica replica: replicas) { + for (TopicPartitionReplica replica : replicas) { DescribeLogDirsRequestData requestData = partitionsByBroker.computeIfAbsent(replica.brokerId(), brokerId -> new DescribeLogDirsRequestData()); DescribableLogDirTopic describableLogDirTopic = requestData.topics().find(replica.topic()); @@ -3083,7 +3078,7 @@ public class KafkaAdminClient extends AdminClient { List partitions = new ArrayList<>(); partitions.add(replica.partition()); describableLogDirTopic = new DescribableLogDirTopic().setTopic(replica.topic()) - .setPartitions(partitions); + .setPartitions(partitions); requestData.topics().add(describableLogDirTopic); } else { describableLogDirTopic.partitions().add(replica.partition()); @@ -3091,11 +3086,11 @@ public class KafkaAdminClient extends AdminClient { } final long now = time.milliseconds(); - for (Map.Entry entry: partitionsByBroker.entrySet()) { + for (Map.Entry entry : partitionsByBroker.entrySet()) { final int brokerId = entry.getKey(); final DescribeLogDirsRequestData topicPartitions = entry.getValue(); final Map replicaDirInfoByPartition = new HashMap<>(); - for (DescribableLogDirTopic topicPartition: topicPartitions.topics()) { + for (DescribableLogDirTopic topicPartition : topicPartitions.topics()) { for (Integer partitionId : topicPartition.partitions()) { replicaDirInfoByPartition.put(new TopicPartition(topicPartition.topic(), partitionId), new ReplicaLogDirInfo()); } @@ -3113,7 +3108,7 @@ public class KafkaAdminClient extends AdminClient { @Override public void handleResponse(AbstractResponse abstractResponse) { DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse; - for (Map.Entry responseEntry: logDirDescriptions(response).entrySet()) { + for (Map.Entry responseEntry : logDirDescriptions(response).entrySet()) { String logDir = responseEntry.getKey(); LogDirDescription logDirInfo = responseEntry.getValue(); @@ -3124,7 +3119,7 @@ public class KafkaAdminClient extends AdminClient { handleFailure(new IllegalStateException( "The error " + logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal")); - for (Map.Entry replicaInfoEntry: logDirInfo.replicaInfos().entrySet()) { + for (Map.Entry replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) { TopicPartition tp = replicaInfoEntry.getKey(); ReplicaInfo replicaInfo = replicaInfoEntry.getValue(); ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp); @@ -3132,24 +3127,25 @@ public class KafkaAdminClient extends AdminClient { log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp); } else if (replicaInfo.isFuture()) { replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(), - replicaLogDirInfo.getCurrentReplicaOffsetLag(), - logDir, - replicaInfo.offsetLag())); + replicaLogDirInfo.getCurrentReplicaOffsetLag(), + logDir, + replicaInfo.offsetLag())); } else { replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir, - replicaInfo.offsetLag(), - replicaLogDirInfo.getFutureReplicaLogDir(), - replicaLogDirInfo.getFutureReplicaOffsetLag())); + replicaInfo.offsetLag(), + replicaLogDirInfo.getFutureReplicaLogDir(), + replicaLogDirInfo.getFutureReplicaOffsetLag())); } } } - for (Map.Entry entry: replicaDirInfoByPartition.entrySet()) { + for (Map.Entry entry : replicaDirInfoByPartition.entrySet()) { TopicPartition tp = entry.getKey(); KafkaFutureImpl future = futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId)); future.complete(entry.getValue()); } } + @Override void handleFailure(Throwable throwable) { completeAllExceptionally(futures.values(), throwable); @@ -3285,8 +3281,8 @@ public class KafkaAdminClient extends AdminClient { List renewers = new ArrayList<>(); for (KafkaPrincipal principal : options.renewers()) { renewers.add(new CreatableRenewers() - .setPrincipalName(principal.getName()) - .setPrincipalType(principal.getPrincipalType())); + .setPrincipalName(principal.getName()) + .setPrincipalType(principal.getPrincipalType())); } runnable.call(new Call("createDelegationToken", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @@ -3310,7 +3306,7 @@ public class KafkaAdminClient extends AdminClient { delegationTokenFuture.completeExceptionally(response.error().exception()); } else { CreateDelegationTokenResponseData data = response.data(); - TokenInformation tokenInfo = new TokenInformation(data.tokenId(), new KafkaPrincipal(data.principalType(), data.principalName()), + TokenInformation tokenInfo = new TokenInformation(data.tokenId(), new KafkaPrincipal(data.principalType(), data.principalName()), new KafkaPrincipal(data.tokenRequesterPrincipalType(), data.tokenRequesterPrincipalName()), options.renewers(), data.issueTimestampMs(), data.maxTimestampMs(), data.expiryTimestampMs()); DelegationToken token = new DelegationToken(tokenInfo, data.hmac()); @@ -3329,7 +3325,7 @@ public class KafkaAdminClient extends AdminClient { @Override public RenewDelegationTokenResult renewDelegationToken(final byte[] hmac, final RenewDelegationTokenOptions options) { - final KafkaFutureImpl expiryTimeFuture = new KafkaFutureImpl<>(); + final KafkaFutureImpl expiryTimeFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); runnable.call(new Call("renewDelegationToken", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @@ -3337,7 +3333,7 @@ public class KafkaAdminClient extends AdminClient { @Override RenewDelegationTokenRequest.Builder createRequest(int timeoutMs) { return new RenewDelegationTokenRequest.Builder( - new RenewDelegationTokenRequestData() + new RenewDelegationTokenRequestData() .setHmac(hmac) .setRenewPeriodMs(options.renewTimePeriodMs())); } @@ -3363,7 +3359,7 @@ public class KafkaAdminClient extends AdminClient { @Override public ExpireDelegationTokenResult expireDelegationToken(final byte[] hmac, final ExpireDelegationTokenOptions options) { - final KafkaFutureImpl expiryTimeFuture = new KafkaFutureImpl<>(); + final KafkaFutureImpl expiryTimeFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); runnable.call(new Call("expireDelegationToken", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @@ -3371,9 +3367,9 @@ public class KafkaAdminClient extends AdminClient { @Override ExpireDelegationTokenRequest.Builder createRequest(int timeoutMs) { return new ExpireDelegationTokenRequest.Builder( - new ExpireDelegationTokenRequestData() - .setHmac(hmac) - .setExpiryTimePeriodMs(options.expiryTimePeriodMs())); + new ExpireDelegationTokenRequestData() + .setHmac(hmac) + .setExpiryTimePeriodMs(options.expiryTimePeriodMs())); } @Override @@ -3397,7 +3393,7 @@ public class KafkaAdminClient extends AdminClient { @Override public DescribeDelegationTokenResult describeDelegationToken(final DescribeDelegationTokenOptions options) { - final KafkaFutureImpl> tokensFuture = new KafkaFutureImpl<>(); + final KafkaFutureImpl> tokensFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); runnable.call(new Call("describeDelegationToken", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @@ -3579,11 +3575,11 @@ public class KafkaAdminClient extends AdminClient { public DescribeConsumerGroupsResult describeConsumerGroups(final Collection groupIds, final DescribeConsumerGroupsOptions options) { SimpleAdminApiFuture future = - DescribeConsumerGroupsHandler.newFuture(groupIds); + DescribeConsumerGroupsHandler.newFuture(groupIds); DescribeConsumerGroupsHandler handler = new DescribeConsumerGroupsHandler(options.includeAuthorizedOperations(), logContext); invokeDriver(handler, future, options.timeoutMs); return new DescribeConsumerGroupsResult(future.all().entrySet().stream() - .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); + .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); } @Deprecated @@ -3660,13 +3656,13 @@ public class KafkaAdminClient extends AdminClient { @Override ListGroupsRequest.Builder createRequest(int timeoutMs) { List states = options.groupStates() - .stream() - .map(GroupState::toString) - .collect(Collectors.toList()); + .stream() + .map(GroupState::toString) + .collect(Collectors.toList()); List groupTypes = options.types() - .stream() - .map(GroupType::toString) - .collect(Collectors.toList()); + .stream() + .map(GroupType::toString) + .collect(Collectors.toList()); return new ListGroupsRequest.Builder(new ListGroupsRequestData() .setStatesFilter(states) .setTypesFilter(groupTypes) @@ -3678,17 +3674,17 @@ public class KafkaAdminClient extends AdminClient { if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) { final String groupId = group.groupId(); final Optional groupState = group.groupState().isEmpty() - ? Optional.empty() - : Optional.of(GroupState.parse(group.groupState())); + ? Optional.empty() + : Optional.of(GroupState.parse(group.groupState())); final Optional type = group.groupType().isEmpty() - ? Optional.empty() - : Optional.of(GroupType.parse(group.groupType())); + ? Optional.empty() + : Optional.of(GroupType.parse(group.groupType())); final ConsumerGroupListing groupListing = new ConsumerGroupListing( - groupId, - groupState, - type, - protocolType.isEmpty() - ); + groupId, + groupState, + type, + protocolType.isEmpty() + ); results.addListing(groupListing); } } @@ -3736,7 +3732,7 @@ public class KafkaAdminClient extends AdminClient { public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map groupSpecs, ListConsumerGroupOffsetsOptions options) { SimpleAdminApiFuture> future = - ListConsumerGroupOffsetsHandler.newFuture(groupSpecs.keySet()); + ListConsumerGroupOffsetsHandler.newFuture(groupSpecs.keySet()); ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupSpecs, options.requireStable(), logContext); invokeDriver(handler, future, options.timeoutMs); @@ -3745,7 +3741,7 @@ public class KafkaAdminClient extends AdminClient { @Override public ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map groupSpecs, - ListStreamsGroupOffsetsOptions options) { + ListStreamsGroupOffsetsOptions options) { Map consumerGroupSpecs = groupSpecs.entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, @@ -3760,11 +3756,11 @@ public class KafkaAdminClient extends AdminClient { @Override public DeleteConsumerGroupsResult deleteConsumerGroups(Collection groupIds, DeleteConsumerGroupsOptions options) { SimpleAdminApiFuture future = - DeleteConsumerGroupsHandler.newFuture(groupIds); + DeleteConsumerGroupsHandler.newFuture(groupIds); DeleteConsumerGroupsHandler handler = new DeleteConsumerGroupsHandler(logContext); invokeDriver(handler, future, options.timeoutMs); return new DeleteConsumerGroupsResult(future.all().entrySet().stream() - .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); + .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); } @Override @@ -3776,11 +3772,11 @@ public class KafkaAdminClient extends AdminClient { @Override public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets( - String groupId, - Set partitions, - DeleteConsumerGroupOffsetsOptions options) { + String groupId, + Set partitions, + DeleteConsumerGroupOffsetsOptions options) { SimpleAdminApiFuture> future = - DeleteConsumerGroupOffsetsHandler.newFuture(groupId); + DeleteConsumerGroupOffsetsHandler.newFuture(groupId); DeleteConsumerGroupOffsetsHandler handler = new DeleteConsumerGroupOffsetsHandler(groupId, partitions, logContext); invokeDriver(handler, future, options.timeoutMs); return new DeleteConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)), partitions); @@ -3800,11 +3796,11 @@ public class KafkaAdminClient extends AdminClient { public DescribeShareGroupsResult describeShareGroups(final Collection groupIds, final DescribeShareGroupsOptions options) { SimpleAdminApiFuture future = - DescribeShareGroupsHandler.newFuture(groupIds); + DescribeShareGroupsHandler.newFuture(groupIds); DescribeShareGroupsHandler handler = new DescribeShareGroupsHandler(options.includeAuthorizedOperations(), logContext); invokeDriver(handler, future, options.timeoutMs); return new DescribeShareGroupsResult(future.all().entrySet().stream() - .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); + .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); } @Override @@ -3871,13 +3867,13 @@ public class KafkaAdminClient extends AdminClient { @Override public ElectLeadersResult electLeaders( - final ElectionType electionType, - final Set topicPartitions, - ElectLeadersOptions options) { + final ElectionType electionType, + final Set topicPartitions, + ElectLeadersOptions options) { final KafkaFutureImpl>> electionFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); runnable.call(new Call("electLeaders", calcDeadlineMs(now, options.timeoutMs()), - new ControllerNodeProvider()) { + new ControllerNodeProvider()) { @Override public ElectLeadersRequest.Builder createRequest(int timeoutMs) { @@ -3910,8 +3906,8 @@ public class KafkaAdminClient extends AdminClient { @Override public AlterPartitionReassignmentsResult alterPartitionReassignments( - Map> reassignments, - AlterPartitionReassignmentsOptions options) { + Map> reassignments, + AlterPartitionReassignmentsOptions options) { final Map> futures = new HashMap<>(); final Map>> topicsToReassignments = new TreeMap<>(); for (Map.Entry> entry : reassignments.entrySet()) { @@ -3924,13 +3920,13 @@ public class KafkaAdminClient extends AdminClient { if (topicNameIsUnrepresentable(topic)) { future.completeExceptionally(new InvalidTopicException("The given topic name '" + - topic + "' cannot be represented in a request.")); + topic + "' cannot be represented in a request.")); } else if (topicPartition.partition() < 0) { future.completeExceptionally(new InvalidTopicException("The given partition index " + - topicPartition.partition() + " is not valid.")); + topicPartition.partition() + " is not valid.")); } else { Map> partitionReassignments = - topicsToReassignments.get(topicPartition.topic()); + topicsToReassignments.get(topicPartition.topic()); if (partitionReassignments == null) { partitionReassignments = new TreeMap<>(); topicsToReassignments.put(topic, partitionReassignments); @@ -3942,32 +3938,32 @@ public class KafkaAdminClient extends AdminClient { final long now = time.milliseconds(); Call call = new Call("alterPartitionReassignments", calcDeadlineMs(now, options.timeoutMs()), - new ControllerNodeProvider(true)) { + new ControllerNodeProvider(true)) { @Override public AlterPartitionReassignmentsRequest.Builder createRequest(int timeoutMs) { AlterPartitionReassignmentsRequestData data = - new AlterPartitionReassignmentsRequestData(); + new AlterPartitionReassignmentsRequestData(); for (Map.Entry>> entry : - topicsToReassignments.entrySet()) { + topicsToReassignments.entrySet()) { String topicName = entry.getKey(); Map> partitionsToReassignments = entry.getValue(); List reassignablePartitions = new ArrayList<>(); for (Map.Entry> partitionEntry : - partitionsToReassignments.entrySet()) { + partitionsToReassignments.entrySet()) { int partitionIndex = partitionEntry.getKey(); Optional reassignment = partitionEntry.getValue(); ReassignablePartition reassignablePartition = new ReassignablePartition() - .setPartitionIndex(partitionIndex) - .setReplicas(reassignment.map(NewPartitionReassignment::targetReplicas).orElse(null)); + .setPartitionIndex(partitionIndex) + .setReplicas(reassignment.map(NewPartitionReassignment::targetReplicas).orElse(null)); reassignablePartitions.add(reassignablePartition); } ReassignableTopic reassignableTopic = new ReassignableTopic() - .setName(topicName) - .setPartitions(reassignablePartitions); + .setName(topicName) + .setPartitions(reassignablePartitions); data.topics().add(reassignableTopic); } data.setTimeoutMs(timeoutMs); @@ -3994,8 +3990,8 @@ public class KafkaAdminClient extends AdminClient { String topicName = topicResponse.name(); for (ReassignablePartitionResponse partition : topicResponse.partitions()) { errors.put( - new TopicPartition(topicName, partition.partitionIndex()), - new ApiError(topLevelError, response.data().errorMessage()).exception() + new TopicPartition(topicName, partition.partitionIndex()), + new ApiError(topLevelError, response.data().errorMessage()).exception() ); receivedResponsesCount += 1; } @@ -4067,10 +4063,10 @@ public class KafkaAdminClient extends AdminClient { int partition = tp.partition(); if (topicNameIsUnrepresentable(topic)) { partitionReassignmentsFuture.completeExceptionally(new InvalidTopicException("The given topic name '" - + topic + "' cannot be represented in a request.")); + + topic + "' cannot be represented in a request.")); } else if (partition < 0) { partitionReassignmentsFuture.completeExceptionally(new InvalidTopicException("The given partition index " + - partition + " is not valid.")); + partition + " is not valid.")); } if (partitionReassignmentsFuture.isCompletedExceptionally()) return new ListPartitionReassignmentsResult(partitionReassignmentsFuture); @@ -4159,7 +4155,7 @@ public class KafkaAdminClient extends AdminClient { */ private Integer nodeFor(ConfigResource resource) { if ((resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) - || resource.type() == ConfigResource.Type.BROKER_LOGGER) { + || resource.type() == ConfigResource.Type.BROKER_LOGGER) { return Integer.valueOf(resource.name()); } else { return null; @@ -4175,8 +4171,8 @@ public class KafkaAdminClient extends AdminClient { } else { List membersToRemove = res.members().stream().map(member -> member.groupInstanceId().map(id -> new MemberIdentity().setGroupInstanceId(id)) - .orElseGet(() -> new MemberIdentity().setMemberId(member.consumerId())) - .setReason(reason) + .orElseGet(() -> new MemberIdentity().setMemberId(member.consumerId())) + .setReason(reason) ).collect(Collectors.toList()); future.complete(membersToRemove); @@ -4209,7 +4205,7 @@ public class KafkaAdminClient extends AdminClient { DEFAULT_LEAVE_GROUP_REASON : JoinGroupRequest.maybeTruncateReason(options.reason()); final SimpleAdminApiFuture> adminFuture = - RemoveMembersFromConsumerGroupHandler.newFuture(groupId); + RemoveMembersFromConsumerGroupHandler.newFuture(groupId); KafkaFutureImpl> memFuture; if (options.removeAll()) { @@ -4217,8 +4213,8 @@ public class KafkaAdminClient extends AdminClient { } else { memFuture = new KafkaFutureImpl<>(); memFuture.complete(options.members().stream() - .map(m -> m.toMemberIdentity().setReason(reason)) - .collect(Collectors.toList())); + .map(m -> m.toMemberIdentity().setReason(reason)) + .collect(Collectors.toList())); } memFuture.whenComplete((members, ex) -> { @@ -4240,7 +4236,7 @@ public class KafkaAdminClient extends AdminClient { AlterConsumerGroupOffsetsOptions options ) { SimpleAdminApiFuture> future = - AlterConsumerGroupOffsetsHandler.newFuture(groupId); + AlterConsumerGroupOffsetsHandler.newFuture(groupId); AlterConsumerGroupOffsetsHandler handler = new AlterConsumerGroupOffsetsHandler(groupId, offsets, logContext); invokeDriver(handler, future, options.timeoutMs); return new AlterConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId))); @@ -4275,24 +4271,24 @@ public class KafkaAdminClient extends AdminClient { final long now = time.milliseconds(); runnable.call(new Call("describeClientQuotas", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedNodeProvider()) { + new LeastLoadedNodeProvider()) { - @Override - DescribeClientQuotasRequest.Builder createRequest(int timeoutMs) { - return new DescribeClientQuotasRequest.Builder(filter); - } + @Override + DescribeClientQuotasRequest.Builder createRequest(int timeoutMs) { + return new DescribeClientQuotasRequest.Builder(filter); + } - @Override - void handleResponse(AbstractResponse abstractResponse) { - DescribeClientQuotasResponse response = (DescribeClientQuotasResponse) abstractResponse; - response.complete(future); - } + @Override + void handleResponse(AbstractResponse abstractResponse) { + DescribeClientQuotasResponse response = (DescribeClientQuotasResponse) abstractResponse; + response.complete(future); + } - @Override - void handleFailure(Throwable throwable) { - future.completeExceptionally(throwable); - } - }, now); + @Override + void handleFailure(Throwable throwable) { + future.completeExceptionally(throwable); + } + }, now); return new DescribeClientQuotasResult(future); } @@ -4306,24 +4302,24 @@ public class KafkaAdminClient extends AdminClient { final long now = time.milliseconds(); runnable.call(new Call("alterClientQuotas", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedNodeProvider()) { + new LeastLoadedNodeProvider()) { - @Override - AlterClientQuotasRequest.Builder createRequest(int timeoutMs) { - return new AlterClientQuotasRequest.Builder(entries, options.validateOnly()); - } + @Override + AlterClientQuotasRequest.Builder createRequest(int timeoutMs) { + return new AlterClientQuotasRequest.Builder(entries, options.validateOnly()); + } - @Override - void handleResponse(AbstractResponse abstractResponse) { - AlterClientQuotasResponse response = (AlterClientQuotasResponse) abstractResponse; - response.complete(futures); - } + @Override + void handleResponse(AbstractResponse abstractResponse) { + AlterClientQuotasResponse response = (AlterClientQuotasResponse) abstractResponse; + response.complete(futures); + } - @Override - void handleFailure(Throwable throwable) { - completeAllExceptionally(futures.values(), throwable); - } - }, now); + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(futures.values(), throwable); + } + }, now); return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } @@ -4333,7 +4329,7 @@ public class KafkaAdminClient extends AdminClient { final KafkaFutureImpl dataFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedNodeProvider()) { + new LeastLoadedNodeProvider()) { @Override public DescribeUserScramCredentialsRequest.Builder createRequest(final int timeoutMs) { final DescribeUserScramCredentialsRequestData requestData = new DescribeUserScramCredentialsRequestData(); @@ -4379,7 +4375,7 @@ public class KafkaAdminClient extends AdminClient { AlterUserScramCredentialsOptions options) { final long now = time.milliseconds(); final Map> futures = new HashMap<>(); - for (UserScramCredentialAlteration alteration: alterations) { + for (UserScramCredentialAlteration alteration : alterations) { futures.put(alteration.user(), new KafkaFutureImpl<>()); } final Map userIllegalAlterationExceptions = new HashMap<>(); @@ -4403,55 +4399,55 @@ public class KafkaAdminClient extends AdminClient { // so keep track of which users are affected by such a failure so we can fail all their alterations later final Map> userInsertions = new HashMap<>(); alterations.stream().filter(a -> a instanceof UserScramCredentialUpsertion) - .filter(alteration -> !userIllegalAlterationExceptions.containsKey(alteration.user())) - .forEach(alteration -> { - final String user = alteration.user(); - if (user == null || user.isEmpty()) { - userIllegalAlterationExceptions.put(alteration.user(), new UnacceptableCredentialException(usernameMustNotBeEmptyMsg)); - } else { - UserScramCredentialUpsertion upsertion = (UserScramCredentialUpsertion) alteration; - try { - byte[] password = upsertion.password(); - if (password == null || password.length == 0) { - userIllegalAlterationExceptions.put(user, new UnacceptableCredentialException(passwordMustNotBeEmptyMsg)); + .filter(alteration -> !userIllegalAlterationExceptions.containsKey(alteration.user())) + .forEach(alteration -> { + final String user = alteration.user(); + if (user == null || user.isEmpty()) { + userIllegalAlterationExceptions.put(alteration.user(), new UnacceptableCredentialException(usernameMustNotBeEmptyMsg)); + } else { + UserScramCredentialUpsertion upsertion = (UserScramCredentialUpsertion) alteration; + try { + byte[] password = upsertion.password(); + if (password == null || password.length == 0) { + userIllegalAlterationExceptions.put(user, new UnacceptableCredentialException(passwordMustNotBeEmptyMsg)); + } else { + ScramMechanism mechanism = upsertion.credentialInfo().mechanism(); + if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) { + userIllegalAlterationExceptions.put(user, new UnsupportedSaslMechanismException(unknownScramMechanismMsg)); } else { - ScramMechanism mechanism = upsertion.credentialInfo().mechanism(); - if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) { - userIllegalAlterationExceptions.put(user, new UnsupportedSaslMechanismException(unknownScramMechanismMsg)); - } else { - userInsertions.putIfAbsent(user, new HashMap<>()); - userInsertions.get(user).put(mechanism, getScramCredentialUpsertion(upsertion)); - } + userInsertions.putIfAbsent(user, new HashMap<>()); + userInsertions.get(user).put(mechanism, getScramCredentialUpsertion(upsertion)); } - } catch (NoSuchAlgorithmException e) { - // we might overwrite an exception from a previous alteration, but we don't really care - // since we just need to mark this user as having at least one illegal alteration - // and make an exception instance available for completing the corresponding future exceptionally - userIllegalAlterationExceptions.put(user, new UnsupportedSaslMechanismException(unknownScramMechanismMsg)); - } catch (InvalidKeyException e) { - // generally shouldn't happen since we deal with the empty password case above, - // but we still need to catch/handle it - userIllegalAlterationExceptions.put(user, new UnacceptableCredentialException(e.getMessage(), e)); } + } catch (NoSuchAlgorithmException e) { + // we might overwrite an exception from a previous alteration, but we don't really care + // since we just need to mark this user as having at least one illegal alteration + // and make an exception instance available for completing the corresponding future exceptionally + userIllegalAlterationExceptions.put(user, new UnsupportedSaslMechanismException(unknownScramMechanismMsg)); + } catch (InvalidKeyException e) { + // generally shouldn't happen since we deal with the empty password case above, + // but we still need to catch/handle it + userIllegalAlterationExceptions.put(user, new UnacceptableCredentialException(e.getMessage(), e)); } - }); + } + }); // submit alterations only for users that do not have an illegal alteration as identified above Call call = new Call("alterUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), - new ControllerNodeProvider()) { + new ControllerNodeProvider()) { @Override public AlterUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { return new AlterUserScramCredentialsRequest.Builder( - new AlterUserScramCredentialsRequestData().setUpsertions(alterations.stream() - .filter(a -> a instanceof UserScramCredentialUpsertion) - .filter(a -> !userIllegalAlterationExceptions.containsKey(a.user())) - .map(a -> userInsertions.get(a.user()).get(((UserScramCredentialUpsertion) a).credentialInfo().mechanism())) - .collect(Collectors.toList())) + new AlterUserScramCredentialsRequestData().setUpsertions(alterations.stream() + .filter(a -> a instanceof UserScramCredentialUpsertion) + .filter(a -> !userIllegalAlterationExceptions.containsKey(a.user())) + .map(a -> userInsertions.get(a.user()).get(((UserScramCredentialUpsertion) a).credentialInfo().mechanism())) + .collect(Collectors.toList())) .setDeletions(alterations.stream() - .filter(a -> a instanceof UserScramCredentialDeletion) - .filter(a -> !userIllegalAlterationExceptions.containsKey(a.user())) - .map(d -> getScramCredentialDeletion((UserScramCredentialDeletion) d)) - .collect(Collectors.toList()))); + .filter(a -> a instanceof UserScramCredentialDeletion) + .filter(a -> !userIllegalAlterationExceptions.containsKey(a.user())) + .map(d -> getScramCredentialDeletion((UserScramCredentialDeletion) d)) + .collect(Collectors.toList()))); } @Override @@ -4501,10 +4497,10 @@ public class KafkaAdminClient extends AdminClient { private static AlterUserScramCredentialsRequestData.ScramCredentialUpsertion getScramCredentialUpsertion(UserScramCredentialUpsertion u) throws InvalidKeyException, NoSuchAlgorithmException { AlterUserScramCredentialsRequestData.ScramCredentialUpsertion retval = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion(); return retval.setName(u.user()) - .setMechanism(u.credentialInfo().mechanism().type()) - .setIterations(u.credentialInfo().iterations()) - .setSalt(u.salt()) - .setSaltedPassword(getSaltedPassword(u.credentialInfo().mechanism(), u.password(), u.salt(), u.credentialInfo().iterations())); + .setMechanism(u.credentialInfo().mechanism().type()) + .setIterations(u.credentialInfo().iterations()) + .setSalt(u.salt()) + .setSaltedPassword(getSaltedPassword(u.credentialInfo().mechanism(), u.password(), u.salt(), u.credentialInfo().iterations())); } private static AlterUserScramCredentialsRequestData.ScramCredentialDeletion getScramCredentialDeletion(UserScramCredentialDeletion d) { @@ -4513,7 +4509,7 @@ public class KafkaAdminClient extends AdminClient { private static byte[] getSaltedPassword(ScramMechanism publicScramMechanism, byte[] password, byte[] salt, int iterations) throws NoSuchAlgorithmException, InvalidKeyException { return new ScramFormatter(org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(publicScramMechanism.mechanismName())) - .hi(password, salt, iterations); + .hi(password, salt, iterations); } @Override @@ -4639,7 +4635,7 @@ public class KafkaAdminClient extends AdminClient { } // The server should send back a response for every feature, but we do a sanity check anyway. completeUnrealizedFutures(updateFutures.entrySet().stream(), - feature -> "The controller response did not contain a result for feature " + feature); + feature -> "The controller response did not contain a result for feature " + feature); } break; case NOT_CONTROLLER: @@ -4670,15 +4666,15 @@ public class KafkaAdminClient extends AdminClient { final KafkaFutureImpl future = new KafkaFutureImpl<>(); final long now = time.milliseconds(); final Call call = new Call( - "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { + "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { private QuorumInfo.ReplicaState translateReplicaState(DescribeQuorumResponseData.ReplicaState replica) { return new QuorumInfo.ReplicaState( - replica.replicaId(), - replica.replicaDirectoryId() == null ? Uuid.ZERO_UUID : replica.replicaDirectoryId(), - replica.logEndOffset(), - replica.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(replica.lastFetchTimestamp()), - replica.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(replica.lastCaughtUpTimestamp())); + replica.replicaId(), + replica.replicaDirectoryId() == null ? Uuid.ZERO_UUID : replica.replicaDirectoryId(), + replica.logEndOffset(), + replica.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(replica.lastFetchTimestamp()), + replica.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(replica.lastCaughtUpTimestamp())); } private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition, DescribeQuorumResponseData.NodeCollection nodeCollection) { @@ -4711,7 +4707,7 @@ public class KafkaAdminClient extends AdminClient { @Override DescribeQuorumRequest.Builder createRequest(int timeoutMs) { return new Builder(DescribeQuorumRequest.singletonRequest( - new TopicPartition(CLUSTER_METADATA_TOPIC_NAME, CLUSTER_METADATA_TOPIC_PARTITION.partition()))); + new TopicPartition(CLUSTER_METADATA_TOPIC_NAME, CLUSTER_METADATA_TOPIC_PARTITION.partition()))); } @Override @@ -4723,27 +4719,27 @@ public class KafkaAdminClient extends AdminClient { } if (quorumResponse.data().topics().size() != 1) { String msg = String.format("DescribeMetadataQuorum received %d topics when 1 was expected", - quorumResponse.data().topics().size()); + quorumResponse.data().topics().size()); log.debug(msg); throw new UnknownServerException(msg); } DescribeQuorumResponseData.TopicData topic = quorumResponse.data().topics().get(0); if (!topic.topicName().equals(CLUSTER_METADATA_TOPIC_NAME)) { String msg = String.format("DescribeMetadataQuorum received a topic with name %s when %s was expected", - topic.topicName(), CLUSTER_METADATA_TOPIC_NAME); + topic.topicName(), CLUSTER_METADATA_TOPIC_NAME); log.debug(msg); throw new UnknownServerException(msg); } if (topic.partitions().size() != 1) { String msg = String.format("DescribeMetadataQuorum received a topic %s with %d partitions when 1 was expected", - topic.topicName(), topic.partitions().size()); + topic.topicName(), topic.partitions().size()); log.debug(msg); throw new UnknownServerException(msg); } DescribeQuorumResponseData.PartitionData partition = topic.partitions().get(0); if (partition.partitionIndex() != CLUSTER_METADATA_TOPIC_PARTITION.partition()) { String msg = String.format("DescribeMetadataQuorum received a single partition with index %d when %d was expected", - partition.partitionIndex(), CLUSTER_METADATA_TOPIC_PARTITION.partition()); + partition.partitionIndex(), CLUSTER_METADATA_TOPIC_PARTITION.partition()); log.debug(msg); throw new UnknownServerException(msg); } @@ -4768,19 +4764,19 @@ public class KafkaAdminClient extends AdminClient { final KafkaFutureImpl future = new KafkaFutureImpl<>(); final long now = time.milliseconds(); final Call call = new Call("unregisterBroker", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedBrokerOrActiveKController()) { + new LeastLoadedBrokerOrActiveKController()) { @Override UnregisterBrokerRequest.Builder createRequest(int timeoutMs) { UnregisterBrokerRequestData data = - new UnregisterBrokerRequestData().setBrokerId(brokerId); + new UnregisterBrokerRequestData().setBrokerId(brokerId); return new UnregisterBrokerRequest.Builder(data); } @Override void handleResponse(AbstractResponse abstractResponse) { final UnregisterBrokerResponse response = - (UnregisterBrokerResponse) abstractResponse; + (UnregisterBrokerResponse) abstractResponse; Errors error = Errors.forCode(response.data().errorCode()); switch (error) { case NONE: @@ -4840,7 +4836,7 @@ public class KafkaAdminClient extends AdminClient { * where a coordinator may need to unilaterally terminate a participant transaction that hasn't completed. *

* - * @param transactionalId The transactional ID whose active transaction should be forcefully terminated. + * @param transactionalId The transactional ID whose active transaction should be forcefully terminated. * @return a {@link TerminateTransactionResult} that can be used to await the operation result. */ @Override @@ -4879,6 +4875,45 @@ public class KafkaAdminClient extends AdminClient { return new FenceProducersResult(future.all()); } + @Override + public ListConfigResourcesResult listConfigResources(Set configResourceTypes, ListConfigResourcesOptions options) { + final long now = time.milliseconds(); + final KafkaFutureImpl> future = new KafkaFutureImpl<>(); + final Call call = new Call("listConfigResources", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { + + @Override + ListConfigResourcesRequest.Builder createRequest(int timeoutMs) { + return new ListConfigResourcesRequest.Builder( + new ListConfigResourcesRequestData() + .setResourceTypes( + configResourceTypes + .stream() + .map(ConfigResource.Type::id) + .collect(Collectors.toList()) + ) + ); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + ListConfigResourcesResponse response = (ListConfigResourcesResponse) abstractResponse; + if (response.error().isFailure()) { + future.completeExceptionally(response.error().exception()); + } else { + future.complete(response.configResources()); + } + } + + @Override + void handleFailure(Throwable throwable) { + future.completeExceptionally(throwable); + } + }; + runnable.call(call, now); + return new ListConfigResourcesResult(future); + } + + @SuppressWarnings({"deprecation", "removal"}) @Override public ListClientMetricsResourcesResult listClientMetricsResources(ListClientMetricsResourcesOptions options) { final long now = time.milliseconds(); @@ -4900,7 +4935,13 @@ public class KafkaAdminClient extends AdminClient { if (response.error().isFailure()) { future.completeExceptionally(response.error().exception()); } else { - future.complete(response.clientMetricsResources()); + future.complete(response + .data() + .configResources() + .stream() + .filter(entry -> entry.resourceType() == ConfigResource.Type.CLIENT_METRICS.id()) + .map(entry -> new ClientMetricsResourceListing(entry.resourceName())) + .collect(Collectors.toList())); } } @@ -4924,7 +4965,7 @@ public class KafkaAdminClient extends AdminClient { final KafkaFutureImpl future = new KafkaFutureImpl<>(); final long now = time.milliseconds(); final Call call = new Call( - "addRaftVoter", calcDeadlineMs(now, options.timeoutMs()), provider) { + "addRaftVoter", calcDeadlineMs(now, options.timeoutMs()), provider) { @Override AddRaftVoterRequest.Builder createRequest(int timeoutMs) { @@ -4936,12 +4977,12 @@ public class KafkaAdminClient extends AdminClient { setHost(endpoint.host()). setPort(endpoint.port()))); return new AddRaftVoterRequest.Builder( - new AddRaftVoterRequestData(). - setClusterId(options.clusterId().orElse(null)). - setTimeoutMs(timeoutMs). - setVoterId(voterId) . - setVoterDirectoryId(voterDirectoryId). - setListeners(listeners)); + new AddRaftVoterRequestData(). + setClusterId(options.clusterId().orElse(null)). + setTimeoutMs(timeoutMs). + setVoterId(voterId). + setVoterDirectoryId(voterDirectoryId). + setListeners(listeners)); } @Override @@ -4978,14 +5019,14 @@ public class KafkaAdminClient extends AdminClient { final KafkaFutureImpl future = new KafkaFutureImpl<>(); final long now = time.milliseconds(); final Call call = new Call( - "removeRaftVoter", calcDeadlineMs(now, options.timeoutMs()), provider) { + "removeRaftVoter", calcDeadlineMs(now, options.timeoutMs()), provider) { @Override RemoveRaftVoterRequest.Builder createRequest(int timeoutMs) { return new RemoveRaftVoterRequest.Builder( new RemoveRaftVoterRequestData(). setClusterId(options.clusterId().orElse(null)). - setVoterId(voterId) . + setVoterId(voterId). setVoterDirectoryId(voterDirectoryId)); } @@ -4995,8 +5036,8 @@ public class KafkaAdminClient extends AdminClient { RemoveRaftVoterResponse addResponse = (RemoveRaftVoterResponse) response; if (addResponse.data().errorCode() != Errors.NONE.code()) { ApiError error = new ApiError( - addResponse.data().errorCode(), - addResponse.data().errorMessage()); + addResponse.data().errorCode(), + addResponse.data().errorMessage()); future.completeExceptionally(error.exception()); } else { future.complete(null); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java index 7b6dbf302c6..f90778db12c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java @@ -19,6 +19,8 @@ package org.apache.kafka.clients.admin; /** * Options for {@link Admin#listClientMetricsResources()}. + * @deprecated Since 4.1. Use {@link ListConfigResourcesOptions} instead. */ +@Deprecated(since = "4.1") public class ListClientMetricsResourcesOptions extends AbstractOptions { } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java index 4a63e31c238..a4d0ed3cecb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java @@ -25,7 +25,9 @@ import java.util.Collection; /** * The result of the {@link Admin#listClientMetricsResources()} call. *

+ * @deprecated Since 4.1. Use {@link ListConfigResourcesResult} instead. */ +@Deprecated(since = "4.1") public class ListClientMetricsResourcesResult { private final KafkaFuture> future; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesOptions.java new file mode 100644 index 00000000000..dbd8581c795 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesOptions.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +/** + * Options for {@link Admin#listConfigResources()}. + */ +public class ListConfigResourcesOptions extends AbstractOptions { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesResult.java new file mode 100644 index 00000000000..fa9ad46a72c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesResult.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.internals.KafkaFutureImpl; + +import java.util.Collection; + +/** + * The result of the {@link Admin#listConfigResources()} call. + *

+ */ +public class ListConfigResourcesResult { + private final KafkaFuture> future; + + ListConfigResourcesResult(KafkaFuture> future) { + this.future = future; + } + + /** + * Returns a future that yields either an exception, or the full set of config resources. + * + * In the event of a failure, the future yields nothing but the first exception which + * occurred. + */ + public KafkaFuture> all() { + final KafkaFutureImpl> result = new KafkaFutureImpl<>(); + future.whenComplete((resources, throwable) -> { + if (throwable != null) { + result.completeExceptionally(throwable); + } else { + result.complete(resources); + } + }); + return result; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java index 3af70938843..436d08c4909 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java @@ -17,12 +17,14 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ListConfigResourcesRequestData; import org.apache.kafka.common.message.ListConfigResourcesResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; +import java.util.HashSet; import java.util.Set; public class ListConfigResourcesRequest extends AbstractRequest { @@ -36,6 +38,15 @@ public class ListConfigResourcesRequest extends AbstractRequest { @Override public ListConfigResourcesRequest build(short version) { + if (version == 0) { + // The v0 only supports CLIENT_METRICS resource type. + Set resourceTypes = new HashSet<>(data.resourceTypes()); + if (resourceTypes.size() != 1 || !resourceTypes.contains(ConfigResource.Type.CLIENT_METRICS.id())) { + throw new UnsupportedVersionException("The v0 ListConfigResources only supports CLIENT_METRICS"); + } + // The v0 request does not have resource types field, so creating a new request data. + return new ListConfigResourcesRequest(new ListConfigResourcesRequestData(), version); + } return new ListConfigResourcesRequest(data, version); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java index 36a4a807f7f..f9fa50d02a9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.clients.admin.ClientMetricsResourceListing; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.message.ListConfigResourcesResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -78,12 +77,4 @@ public class ListConfigResourcesResponse extends AbstractResponse { ) ).collect(Collectors.toList()); } - - public Collection clientMetricsResources() { - return data.configResources() - .stream() - .filter(entry -> entry.resourceType() == ConfigResource.Type.CLIENT_METRICS.id()) - .map(entry -> new ClientMetricsResourceListing(entry.resourceName())) - .collect(Collectors.toList()); - } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java index c98ffb9483f..36e7571d8dd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java @@ -163,17 +163,17 @@ public class AdminClientTestUtils { return new ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future)); } - public static ListClientMetricsResourcesResult listClientMetricsResourcesResult(String... names) { - return new ListClientMetricsResourcesResult( - KafkaFuture.completedFuture(Arrays.stream(names) - .map(ClientMetricsResourceListing::new) - .collect(Collectors.toList()))); + public static ListConfigResourcesResult listConfigResourcesResult(String... names) { + return new ListConfigResourcesResult( + KafkaFuture.completedFuture(Arrays.stream(names) + .map(name -> new ConfigResource(ConfigResource.Type.CLIENT_METRICS, name)) + .collect(Collectors.toList()))); } - public static ListClientMetricsResourcesResult listClientMetricsResourcesResult(KafkaException exception) { - final KafkaFutureImpl> future = new KafkaFutureImpl<>(); + public static ListConfigResourcesResult listConfigResourcesResult(KafkaException exception) { + final KafkaFutureImpl> future = new KafkaFutureImpl<>(); future.completeExceptionally(exception); - return new ListClientMetricsResourcesResult(future); + return new ListConfigResourcesResult(future); } public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map>> groupOffsets) { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 83356b68ff5..d7139be1698 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -10664,6 +10664,7 @@ public class KafkaAdminClientTest { member.memberEpoch()); } + @SuppressWarnings({"deprecation", "removal"}) @Test public void testListClientMetricsResources() throws Exception { try (AdminClientUnitTestEnv env = mockClientEnv()) { @@ -10697,6 +10698,7 @@ public class KafkaAdminClientTest { } } + @SuppressWarnings({"deprecation", "removal"}) @Test public void testListClientMetricsResourcesEmpty() throws Exception { try (AdminClientUnitTestEnv env = mockClientEnv()) { @@ -10714,6 +10716,7 @@ public class KafkaAdminClientTest { } } + @SuppressWarnings({"deprecation", "removal"}) @Test public void testListClientMetricsResourcesNotSupported() { try (AdminClientUnitTestEnv env = mockClientEnv()) { @@ -10729,6 +10732,70 @@ public class KafkaAdminClientTest { } } + @Test + public void testListConfigResources() throws Exception { + try (AdminClientUnitTestEnv env = mockClientEnv()) { + List expected = List.of( + new ConfigResource(ConfigResource.Type.CLIENT_METRICS, "client-metrics"), + new ConfigResource(ConfigResource.Type.BROKER, "1"), + new ConfigResource(ConfigResource.Type.BROKER_LOGGER, "1"), + new ConfigResource(ConfigResource.Type.TOPIC, "topic"), + new ConfigResource(ConfigResource.Type.GROUP, "group") + ); + + ListConfigResourcesResponseData responseData = + new ListConfigResourcesResponseData().setErrorCode(Errors.NONE.code()); + + expected.forEach(c -> + responseData.configResources() + .add(new ListConfigResourcesResponseData + .ConfigResource() + .setResourceName(c.name()) + .setResourceType(c.type().id()) + ) + ); + + env.kafkaClient().prepareResponse( + request -> request instanceof ListConfigResourcesRequest, + new ListConfigResourcesResponse(responseData)); + + ListConfigResourcesResult result = env.adminClient().listConfigResources(); + assertEquals(expected.size(), result.all().get().size()); + assertEquals(new HashSet<>(expected), new HashSet<>(result.all().get())); + } + } + + @Test + public void testListConfigResourcesEmpty() throws Exception { + try (AdminClientUnitTestEnv env = mockClientEnv()) { + ListConfigResourcesResponseData responseData = + new ListConfigResourcesResponseData().setErrorCode(Errors.NONE.code()); + + env.kafkaClient().prepareResponse( + request -> request instanceof ListConfigResourcesRequest, + new ListConfigResourcesResponse(responseData)); + + ListConfigResourcesResult result = env.adminClient().listConfigResources(); + assertTrue(result.all().get().isEmpty()); + } + } + + @Test + public void testListConfigResourcesNotSupported() { + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().prepareResponse( + request -> request instanceof ListConfigResourcesRequest, + new ListConfigResourcesResponse(new ListConfigResourcesResponseData() + .setErrorCode(Errors.UNSUPPORTED_VERSION.code()))); + + ListConfigResourcesResult result = env.adminClient().listConfigResources( + Set.of(ConfigResource.Type.UNKNOWN), new ListConfigResourcesOptions()); + + assertNotNull(result.all()); + TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + } + } + @Test public void testCallFailWithUnsupportedVersionExceptionDoesNotHaveConcurrentModificationException() throws InterruptedException { Cluster cluster = mockCluster(1, 0); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 02a9e628b7e..37e18e15983 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -1394,6 +1394,12 @@ public class MockAdminClient extends AdminClient { throw new UnsupportedOperationException("Not implemented yet"); } + @Override + public ListConfigResourcesResult listConfigResources(Set configResourceTypes, ListConfigResourcesOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @SuppressWarnings("deprecation") @Override public ListClientMetricsResourcesResult listClientMetricsResources(ListClientMetricsResourcesOptions options) { KafkaFutureImpl> future = new KafkaFutureImpl<>(); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index aa50f9db018..7a3be68ff78 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -3637,7 +3637,10 @@ public class RequestResponseTest { } private ListConfigResourcesRequest createListConfigResourcesRequest(short version) { - return new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(version); + return version == 0 ? + new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData() + .setResourceTypes(List.of(ConfigResource.Type.CLIENT_METRICS.id()))).build(version) : + new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(version); } private ListConfigResourcesResponse createListConfigResourcesResponse() { @@ -3951,4 +3954,25 @@ public class RequestResponseTest { parseRequest(SASL_AUTHENTICATE, SASL_AUTHENTICATE.latestVersion(), accessor)).getMessage(); assertEquals("Error reading byte array of 32767 byte(s): only 3 byte(s) available", msg); } + + @Test + public void testListConfigResourcesRequestV0FailsWithConfigResourceTypeOtherThanClientMetrics() { + // One type which is not CLIENT_METRICS + Arrays.stream(ConfigResource.Type.values()) + .filter(t -> t != ConfigResource.Type.CLIENT_METRICS) + .forEach(t -> { + ListConfigResourcesRequestData data = new ListConfigResourcesRequestData() + .setResourceTypes(List.of(t.id())); + assertThrows(UnsupportedVersionException.class, () -> new ListConfigResourcesRequest.Builder(data).build((short) 0)); + }); + + // Multiple types with CLIENT_METRICS + Arrays.stream(ConfigResource.Type.values()) + .filter(t -> t != ConfigResource.Type.CLIENT_METRICS) + .forEach(t -> { + ListConfigResourcesRequestData data = new ListConfigResourcesRequestData() + .setResourceTypes(List.of(t.id(), ConfigResource.Type.CLIENT_METRICS.id())); + assertThrows(UnsupportedVersionException.class, () -> new ListConfigResourcesRequest.Builder(data).build((short) 0)); + }); + } } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 423e869ffd0..be0c9dea17b 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -3866,6 +3866,92 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } finally client.close(time.Duration.ZERO) } + @Test + def testListConfigResources(): Unit = { + client = createAdminClient + + // Alter group and client metric config to add group and client metric config resource + val clientMetric = "client-metrics" + val group = "group" + val clientMetricResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetric) + val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group) + val alterResult = client.incrementalAlterConfigs(util.Map.of( + clientMetricResource, + util.Set.of(new AlterConfigOp(new ConfigEntry("interval.ms", "111"), AlterConfigOp.OpType.SET)), + groupResource, + util.Set.of(new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "50000"), AlterConfigOp.OpType.SET)) + )) + assertEquals(util.Set.of(clientMetricResource, groupResource), alterResult.values.keySet) + alterResult.all.get(15, TimeUnit.SECONDS) + + ensureConsistentKRaftMetadata() + + // non-specified config resource type retrieves all config resources + var configResources = client.listConfigResources().all().get() + assertEquals(9, configResources.size()) + brokerServers.foreach(b => { + assertTrue(configResources.contains(new ConfigResource(ConfigResource.Type.BROKER, b.config.nodeId.toString))) + assertTrue(configResources.contains(new ConfigResource(ConfigResource.Type.BROKER_LOGGER, b.config.nodeId.toString))) + }) + assertTrue(configResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME))) + assertTrue(configResources.contains(groupResource)) + assertTrue(configResources.contains(clientMetricResource)) + + // BROKER config resource type retrieves only broker config resources + configResources = client.listConfigResources(util.Set.of(ConfigResource.Type.BROKER), new ListConfigResourcesOptions()).all().get() + assertEquals(3, configResources.size()) + brokerServers.foreach(b => { + assertTrue(configResources.contains(new ConfigResource(ConfigResource.Type.BROKER, b.config.nodeId.toString))) + assertFalse(configResources.contains(new ConfigResource(ConfigResource.Type.BROKER_LOGGER, b.config.nodeId.toString))) + }) + assertFalse(configResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME))) + assertFalse(configResources.contains(groupResource)) + assertFalse(configResources.contains(clientMetricResource)) + + // BROKER_LOGGER config resource type retrieves only broker logger config resources + configResources = client.listConfigResources(util.Set.of(ConfigResource.Type.BROKER_LOGGER), new ListConfigResourcesOptions()).all().get() + assertEquals(3, configResources.size()) + brokerServers.foreach(b => { + assertFalse(configResources.contains(new ConfigResource(ConfigResource.Type.BROKER, b.config.nodeId.toString))) + assertTrue(configResources.contains(new ConfigResource(ConfigResource.Type.BROKER_LOGGER, b.config.nodeId.toString))) + }) + assertFalse(configResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME))) + assertFalse(configResources.contains(groupResource)) + assertFalse(configResources.contains(clientMetricResource)) + + // TOPIC config resource type retrieves only topic config resources + configResources = client.listConfigResources(util.Set.of(ConfigResource.Type.TOPIC), new ListConfigResourcesOptions()).all().get() + assertEquals(1, configResources.size()) + assertTrue(configResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME))) + + // GROUP config resource type retrieves only group config resources + configResources = client.listConfigResources(util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions()).all().get() + assertEquals(1, configResources.size()) + assertTrue(configResources.contains(groupResource)) + + // CLIENT_METRICS config resource type retrieves only client metric config resources + configResources = client.listConfigResources(util.Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions()).all().get() + assertEquals(1, configResources.size()) + assertTrue(configResources.contains(clientMetricResource)) + + // UNKNOWN config resource type gets UNSUPPORTED_VERSION error + assertThrows(classOf[ExecutionException], () => { + client.listConfigResources(util.Set.of(ConfigResource.Type.UNKNOWN), new ListConfigResourcesOptions()).all().get() + }) + } + + @Test + @Timeout(30) + def testListConfigResourcesTimeoutMs(): Unit = { + client = createInvalidAdminClient() + try { + val timeoutOption = new ListConfigResourcesOptions().timeoutMs(0) + val exception = assertThrows(classOf[ExecutionException], () => + client.listConfigResources(util.Set.of(), timeoutOption).all().get()) + assertInstanceOf(classOf[TimeoutException], exception.getCause) + } finally client.close(time.Duration.ZERO) + } + /** * Test that createTopics returns the dynamic configurations of the topics that were created. * diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index f698c8ef9ef..93d138e8946 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -11207,7 +11207,8 @@ class KafkaApisTest extends Logging { @Test def testListConfigResourcesV0(): Unit = { - val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(0)) + val request = buildRequest(new ListConfigResourcesRequest.Builder( + new ListConfigResourcesRequestData().setResourceTypes(util.List.of(ConfigResource.Type.CLIENT_METRICS.id))).build(0)) metadataCache = mock(classOf[KRaftMetadataCache]) val resources = util.Set.of("client-metric1", "client-metric2") diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java index 91871133d68..ae8e9f0924c 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java @@ -110,6 +110,8 @@ import org.apache.kafka.clients.admin.FenceProducersOptions; import org.apache.kafka.clients.admin.FenceProducersResult; import org.apache.kafka.clients.admin.ListClientMetricsResourcesOptions; import org.apache.kafka.clients.admin.ListClientMetricsResourcesResult; +import org.apache.kafka.clients.admin.ListConfigResourcesOptions; +import org.apache.kafka.clients.admin.ListConfigResourcesResult; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec; @@ -436,6 +438,12 @@ public class TestingMetricsInterceptingAdminClient extends AdminClient { return adminDelegate.fenceProducers(transactionalIds, options); } + @Override + public ListConfigResourcesResult listConfigResources(final Set configResourceTypes, final ListConfigResourcesOptions options) { + return adminDelegate.listConfigResources(configResourceTypes, options); + } + + @SuppressWarnings({"deprecation", "removal"}) @Override public ListClientMetricsResourcesResult listClientMetricsResources(final ListClientMetricsResourcesOptions options) { return adminDelegate.listClientMetricsResources(options); diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java index f9b02f78397..8dcb5e5a750 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java @@ -21,9 +21,9 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.AlterConfigsOptions; -import org.apache.kafka.clients.admin.ClientMetricsResourceListing; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.ListConfigResourcesOptions; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.utils.Exit; @@ -43,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -157,9 +158,11 @@ public class ClientMetricsCommand { if (entityNameOpt.isPresent()) { entities = Collections.singletonList(entityNameOpt.get()); } else { - Collection resources = adminClient.listClientMetricsResources() - .all().get(30, TimeUnit.SECONDS); - entities = resources.stream().map(ClientMetricsResourceListing::name).collect(Collectors.toList()); + Collection resources = adminClient + .listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions()) + .all() + .get(30, TimeUnit.SECONDS); + entities = resources.stream().map(ConfigResource::name).toList(); } for (String entity : entities) { @@ -170,9 +173,11 @@ public class ClientMetricsCommand { } public void listClientMetrics() throws Exception { - Collection resources = adminClient.listClientMetricsResources() - .all().get(30, TimeUnit.SECONDS); - String results = resources.stream().map(ClientMetricsResourceListing::name).collect(Collectors.joining("\n")); + Collection resources = adminClient + .listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions()) + .all() + .get(30, TimeUnit.SECONDS); + String results = resources.stream().map(ConfigResource::name).collect(Collectors.joining("\n")); System.out.println(results); } diff --git a/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java index c58748bf3c0..2fcf082f0a0 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java @@ -24,7 +24,7 @@ import org.apache.kafka.clients.admin.AlterConfigsResult; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.DescribeConfigsResult; -import org.apache.kafka.clients.admin.ListClientMetricsResourcesResult; +import org.apache.kafka.clients.admin.ListConfigResourcesResult; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.Exit; @@ -254,8 +254,8 @@ public class ClientMetricsCommandTest { Admin adminClient = mock(Admin.class); ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient); - ListClientMetricsResourcesResult result = AdminClientTestUtils.listClientMetricsResourcesResult(clientMetricsName); - when(adminClient.listClientMetricsResources()).thenReturn(result); + ListConfigResourcesResult result = AdminClientTestUtils.listConfigResourcesResult(clientMetricsName); + when(adminClient.listConfigResources(any(), any())).thenReturn(result); ConfigResource cr = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetricsName); Config cfg = new Config(Collections.singleton(new ConfigEntry("metrics", "org.apache.kafka.producer."))); DescribeConfigsResult describeResult = AdminClientTestUtils.describeConfigsResult(cr, cfg); @@ -278,8 +278,8 @@ public class ClientMetricsCommandTest { Admin adminClient = mock(Admin.class); ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient); - ListClientMetricsResourcesResult result = AdminClientTestUtils.listClientMetricsResourcesResult("one", "two"); - when(adminClient.listClientMetricsResources()).thenReturn(result); + ListConfigResourcesResult result = AdminClientTestUtils.listConfigResourcesResult("one", "two"); + when(adminClient.listConfigResources(any(), any())).thenReturn(result); String capturedOutput = ToolsTestUtils.captureStandardOut(() -> { try { @@ -296,8 +296,8 @@ public class ClientMetricsCommandTest { Admin adminClient = mock(Admin.class); ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient); - ListClientMetricsResourcesResult result = AdminClientTestUtils.listClientMetricsResourcesResult(Errors.UNSUPPORTED_VERSION.exception()); - when(adminClient.listClientMetricsResources()).thenReturn(result); + ListConfigResourcesResult result = AdminClientTestUtils.listConfigResourcesResult(Errors.UNSUPPORTED_VERSION.exception()); + when(adminClient.listConfigResources(any(), any())).thenReturn(result); assertThrows(ExecutionException.class, () -> service.listClientMetrics()); }