diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 9acd341250e..3f39bbe4696 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -1445,6 +1445,24 @@ public class KafkaAdminClient extends AdminClient { entry.getValue().completeExceptionally(new ApiException(messageFormatter.apply(entry.getKey())))); } + /** + * Fail futures in the given Map which were retried due to exceeding quota. We propagate + * the initial error back to the caller if the request timed out. + */ + private static void maybeCompleteQuotaExceededException( + boolean shouldRetryOnQuotaViolation, + Throwable throwable, + Map> futures, + Map quotaExceededExceptions, + int throttleTimeDelta) { + if (shouldRetryOnQuotaViolation && throwable instanceof TimeoutException) { + quotaExceededExceptions.forEach((key, value) -> futures.get(key).completeExceptionally( + new ThrottlingQuotaExceededException( + Math.max(0, value.throttleTimeMs() - throttleTimeDelta), + value.getMessage()))); + } + } + @Override public CreateTopicsResult createTopics(final Collection newTopics, final CreateTopicsOptions options) { @@ -1464,7 +1482,8 @@ public class KafkaAdminClient extends AdminClient { if (!topics.isEmpty()) { final long now = time.milliseconds(); final long deadline = calcDeadlineMs(now, options.timeoutMs()); - final Call call = getCreateTopicsCall(options, topicFutures, topics, deadline); + final Call call = getCreateTopicsCall(options, topicFutures, topics, + Collections.emptyMap(), now, deadline); runnable.call(call, now); } return new CreateTopicsResult(new HashMap<>(topicFutures)); @@ -1473,6 +1492,8 @@ public class KafkaAdminClient extends AdminClient { private Call getCreateTopicsCall(final CreateTopicsOptions options, final Map> futures, final CreatableTopicCollection topics, + final Map quotaExceededExceptions, + final long now, final long deadline) { return new Call("createTopics", deadline, new ControllerNodeProvider()) { @Override @@ -1491,6 +1512,7 @@ public class KafkaAdminClient extends AdminClient { // Handle server responses for particular topics. final CreateTopicsResponse response = (CreateTopicsResponse) abstractResponse; final CreatableTopicCollection retryTopics = new CreatableTopicCollection(); + final Map retryTopicQuotaExceededExceptions = new HashMap<>(); for (CreatableTopicResult result : response.data().topics()) { KafkaFutureImpl future = futures.get(result.name()); if (future == null) { @@ -1499,11 +1521,13 @@ public class KafkaAdminClient extends AdminClient { ApiError error = new ApiError(result.errorCode(), result.errorMessage()); if (error.isFailure()) { if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) { + ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException( + response.throttleTimeMs(), error.messageWithFallback()); if (options.shouldRetryOnQuotaViolation()) { retryTopics.add(topics.find(result.name()).duplicate()); + retryTopicQuotaExceededExceptions.put(result.name(), quotaExceededException); } else { - future.completeExceptionally(new ThrottlingQuotaExceededException( - response.throttleTimeMs(), error.messageWithFallback())); + future.completeExceptionally(quotaExceededException); } } else { future.completeExceptionally(error.exception()); @@ -1535,8 +1559,10 @@ public class KafkaAdminClient extends AdminClient { completeUnrealizedFutures(futures.entrySet().stream(), topic -> "The controller response did not contain a result for topic " + topic); } else { - final Call call = getCreateTopicsCall(options, futures, retryTopics, deadline); - runnable.call(call, time.milliseconds()); + final long now = time.milliseconds(); + final Call call = getCreateTopicsCall(options, futures, retryTopics, + retryTopicQuotaExceededExceptions, now, deadline); + runnable.call(call, now); } } @@ -1554,6 +1580,11 @@ public class KafkaAdminClient extends AdminClient { @Override void handleFailure(Throwable throwable) { + // If there were any topics retries due to a quota exceeded exception, we propagate + // the initial error back to the caller if the request timed out. + maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(), + throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now)); + // Fail all the other remaining futures completeAllExceptionally(futures.values(), throwable); } }; @@ -1578,7 +1609,8 @@ public class KafkaAdminClient extends AdminClient { if (!validTopicNames.isEmpty()) { final long now = time.milliseconds(); final long deadline = calcDeadlineMs(now, options.timeoutMs()); - final Call call = getDeleteTopicsCall(options, topicFutures, validTopicNames, deadline); + final Call call = getDeleteTopicsCall(options, topicFutures, validTopicNames, + Collections.emptyMap(), now, deadline); runnable.call(call, now); } return new DeleteTopicsResult(new HashMap<>(topicFutures)); @@ -1587,6 +1619,8 @@ public class KafkaAdminClient extends AdminClient { private Call getDeleteTopicsCall(final DeleteTopicsOptions options, final Map> futures, final List topics, + final Map quotaExceededExceptions, + final long now, final long deadline) { return new Call("deleteTopics", deadline, new ControllerNodeProvider()) { @Override @@ -1604,6 +1638,7 @@ public class KafkaAdminClient extends AdminClient { // Handle server responses for particular topics. final DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse; final List retryTopics = new ArrayList<>(); + final Map retryTopicQuotaExceededExceptions = new HashMap<>(); for (DeletableTopicResult result : response.data().responses()) { KafkaFutureImpl future = futures.get(result.name()); if (future == null) { @@ -1612,11 +1647,13 @@ public class KafkaAdminClient extends AdminClient { ApiError error = new ApiError(result.errorCode(), result.errorMessage()); if (error.isFailure()) { if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) { + ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException( + response.throttleTimeMs(), error.messageWithFallback()); if (options.shouldRetryOnQuotaViolation()) { retryTopics.add(result.name()); + retryTopicQuotaExceededExceptions.put(result.name(), quotaExceededException); } else { - future.completeExceptionally(new ThrottlingQuotaExceededException( - response.throttleTimeMs(), error.messageWithFallback())); + future.completeExceptionally(quotaExceededException); } } else { future.completeExceptionally(error.exception()); @@ -1632,13 +1669,20 @@ public class KafkaAdminClient extends AdminClient { completeUnrealizedFutures(futures.entrySet().stream(), topic -> "The controller response did not contain a result for topic " + topic); } else { - final Call call = getDeleteTopicsCall(options, futures, retryTopics, deadline); - runnable.call(call, time.milliseconds()); + final long now = time.milliseconds(); + final Call call = getDeleteTopicsCall(options, futures, retryTopics, + retryTopicQuotaExceededExceptions, now, deadline); + runnable.call(call, now); } } @Override void handleFailure(Throwable throwable) { + // If there were any topics retries due to a quota exceeded exception, we propagate + // the initial error back to the caller if the request timed out. + maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(), + throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now)); + // Fail all the other remaining futures completeAllExceptionally(futures.values(), throwable); } }; @@ -2478,7 +2522,7 @@ public class KafkaAdminClient extends AdminClient { @Override public CreatePartitionsResult createPartitions(final Map newPartitions, - final CreatePartitionsOptions options) { + final CreatePartitionsOptions options) { final Map> futures = new HashMap<>(newPartitions.size()); final CreatePartitionsTopicCollection topics = new CreatePartitionsTopicCollection(newPartitions.size()); for (Map.Entry entry : newPartitions.entrySet()) { @@ -2498,16 +2542,19 @@ public class KafkaAdminClient extends AdminClient { if (!topics.isEmpty()) { final long now = time.milliseconds(); final long deadline = calcDeadlineMs(now, options.timeoutMs()); - final Call call = getCreatePartitionsCall(options, futures, topics, deadline); + final Call call = getCreatePartitionsCall(options, futures, topics, + Collections.emptyMap(), now, deadline); runnable.call(call, now); } return new CreatePartitionsResult(new HashMap<>(futures)); } private Call getCreatePartitionsCall(final CreatePartitionsOptions options, - final Map> futures, - final CreatePartitionsTopicCollection topics, - final long deadline) { + final Map> futures, + final CreatePartitionsTopicCollection topics, + final Map quotaExceededExceptions, + final long now, + final long deadline) { return new Call("createPartitions", deadline, new ControllerNodeProvider()) { @Override public CreatePartitionsRequest.Builder createRequest(int timeoutMs) { @@ -2525,6 +2572,7 @@ public class KafkaAdminClient extends AdminClient { // Handle server responses for particular topics. final CreatePartitionsResponse response = (CreatePartitionsResponse) abstractResponse; final CreatePartitionsTopicCollection retryTopics = new CreatePartitionsTopicCollection(); + final Map retryTopicQuotaExceededExceptions = new HashMap<>(); for (CreatePartitionsTopicResult result : response.data().results()) { KafkaFutureImpl future = futures.get(result.name()); if (future == null) { @@ -2533,11 +2581,13 @@ public class KafkaAdminClient extends AdminClient { ApiError error = new ApiError(result.errorCode(), result.errorMessage()); if (error.isFailure()) { if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) { + ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException( + response.throttleTimeMs(), error.messageWithFallback()); if (options.shouldRetryOnQuotaViolation()) { retryTopics.add(topics.find(result.name()).duplicate()); + retryTopicQuotaExceededExceptions.put(result.name(), quotaExceededException); } else { - future.completeExceptionally(new ThrottlingQuotaExceededException( - response.throttleTimeMs(), error.messageWithFallback())); + future.completeExceptionally(quotaExceededException); } } else { future.completeExceptionally(error.exception()); @@ -2553,13 +2603,20 @@ public class KafkaAdminClient extends AdminClient { completeUnrealizedFutures(futures.entrySet().stream(), topic -> "The controller response did not contain a result for topic " + topic); } else { - final Call call = getCreatePartitionsCall(options, futures, retryTopics, deadline); - runnable.call(call, time.milliseconds()); + final long now = time.milliseconds(); + final Call call = getCreatePartitionsCall(options, futures, retryTopics, + retryTopicQuotaExceededExceptions, now, deadline); + runnable.call(call, now); } } @Override void handleFailure(Throwable throwable) { + // If there were any topics retries due to a quota exceeded exception, we propagate + // the initial error back to the caller if the request timed out. + maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(), + throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now)); + // Fail all the other remaining futures completeAllExceptionally(futures.values(), throwable); } }; diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 80d52947cd3..a4ae6f41da1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -733,7 +733,9 @@ public class KafkaAdminClientTest { time.sleep(defaultApiTimeout + 1); assertNull(result.values().get("topic1").get()); - TestUtils.assertFutureThrows(result.values().get("topic2"), TimeoutException.class); + ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.values().get("topic2"), + ThrottlingQuotaExceededException.class); + assertEquals(0, e.throttleTimeMs()); TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); } } @@ -895,7 +897,9 @@ public class KafkaAdminClientTest { time.sleep(defaultApiTimeout + 1); assertNull(result.values().get("topic1").get()); - TestUtils.assertFutureThrows(result.values().get("topic2"), TimeoutException.class); + ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.values().get("topic2"), + ThrottlingQuotaExceededException.class); + assertEquals(0, e.throttleTimeMs()); TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); } } @@ -1727,7 +1731,9 @@ public class KafkaAdminClientTest { time.sleep(defaultApiTimeout + 1); assertNull(result.values().get("topic1").get()); - TestUtils.assertFutureThrows(result.values().get("topic2"), TimeoutException.class); + ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.values().get("topic2"), + ThrottlingQuotaExceededException.class); + assertEquals(0, e.throttleTimeMs()); TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); } }