From a0fec75d3cee3d23bd517fe0acc65270a6cb0f88 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 29 Sep 2020 16:17:37 +0200 Subject: [PATCH] MINOR; Preserve ThrottlingQuotaExceededException when request timeouts after being retried due to a quota violation (KIP-599) (#9344) This PR adds the logic to preserve the ThrottlingQuotaExceededException when topics are retried. The throttleTimeMs is also adjusted accordingly as the request could remain pending or in-flight for quite a long time. Have run various tests on clusters with enabled quotas and I, indeed, find it better to preserve the exception. Otherwise, the caller does not really understand what is going on. This allows the caller to take the appropriate measure and also to take the throttleTimeMs into consideration. Reviewers: Rajini Sivaram --- .../kafka/clients/admin/KafkaAdminClient.java | 95 +++++++++++++++---- .../clients/admin/KafkaAdminClientTest.java | 12 ++- 2 files changed, 85 insertions(+), 22 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 9acd341250e..3f39bbe4696 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -1445,6 +1445,24 @@ public class KafkaAdminClient extends AdminClient { entry.getValue().completeExceptionally(new ApiException(messageFormatter.apply(entry.getKey())))); } + /** + * Fail futures in the given Map which were retried due to exceeding quota. We propagate + * the initial error back to the caller if the request timed out. + */ + private static void maybeCompleteQuotaExceededException( + boolean shouldRetryOnQuotaViolation, + Throwable throwable, + Map> futures, + Map quotaExceededExceptions, + int throttleTimeDelta) { + if (shouldRetryOnQuotaViolation && throwable instanceof TimeoutException) { + quotaExceededExceptions.forEach((key, value) -> futures.get(key).completeExceptionally( + new ThrottlingQuotaExceededException( + Math.max(0, value.throttleTimeMs() - throttleTimeDelta), + value.getMessage()))); + } + } + @Override public CreateTopicsResult createTopics(final Collection newTopics, final CreateTopicsOptions options) { @@ -1464,7 +1482,8 @@ public class KafkaAdminClient extends AdminClient { if (!topics.isEmpty()) { final long now = time.milliseconds(); final long deadline = calcDeadlineMs(now, options.timeoutMs()); - final Call call = getCreateTopicsCall(options, topicFutures, topics, deadline); + final Call call = getCreateTopicsCall(options, topicFutures, topics, + Collections.emptyMap(), now, deadline); runnable.call(call, now); } return new CreateTopicsResult(new HashMap<>(topicFutures)); @@ -1473,6 +1492,8 @@ public class KafkaAdminClient extends AdminClient { private Call getCreateTopicsCall(final CreateTopicsOptions options, final Map> futures, final CreatableTopicCollection topics, + final Map quotaExceededExceptions, + final long now, final long deadline) { return new Call("createTopics", deadline, new ControllerNodeProvider()) { @Override @@ -1491,6 +1512,7 @@ public class KafkaAdminClient extends AdminClient { // Handle server responses for particular topics. final CreateTopicsResponse response = (CreateTopicsResponse) abstractResponse; final CreatableTopicCollection retryTopics = new CreatableTopicCollection(); + final Map retryTopicQuotaExceededExceptions = new HashMap<>(); for (CreatableTopicResult result : response.data().topics()) { KafkaFutureImpl future = futures.get(result.name()); if (future == null) { @@ -1499,11 +1521,13 @@ public class KafkaAdminClient extends AdminClient { ApiError error = new ApiError(result.errorCode(), result.errorMessage()); if (error.isFailure()) { if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) { + ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException( + response.throttleTimeMs(), error.messageWithFallback()); if (options.shouldRetryOnQuotaViolation()) { retryTopics.add(topics.find(result.name()).duplicate()); + retryTopicQuotaExceededExceptions.put(result.name(), quotaExceededException); } else { - future.completeExceptionally(new ThrottlingQuotaExceededException( - response.throttleTimeMs(), error.messageWithFallback())); + future.completeExceptionally(quotaExceededException); } } else { future.completeExceptionally(error.exception()); @@ -1535,8 +1559,10 @@ public class KafkaAdminClient extends AdminClient { completeUnrealizedFutures(futures.entrySet().stream(), topic -> "The controller response did not contain a result for topic " + topic); } else { - final Call call = getCreateTopicsCall(options, futures, retryTopics, deadline); - runnable.call(call, time.milliseconds()); + final long now = time.milliseconds(); + final Call call = getCreateTopicsCall(options, futures, retryTopics, + retryTopicQuotaExceededExceptions, now, deadline); + runnable.call(call, now); } } @@ -1554,6 +1580,11 @@ public class KafkaAdminClient extends AdminClient { @Override void handleFailure(Throwable throwable) { + // If there were any topics retries due to a quota exceeded exception, we propagate + // the initial error back to the caller if the request timed out. + maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(), + throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now)); + // Fail all the other remaining futures completeAllExceptionally(futures.values(), throwable); } }; @@ -1578,7 +1609,8 @@ public class KafkaAdminClient extends AdminClient { if (!validTopicNames.isEmpty()) { final long now = time.milliseconds(); final long deadline = calcDeadlineMs(now, options.timeoutMs()); - final Call call = getDeleteTopicsCall(options, topicFutures, validTopicNames, deadline); + final Call call = getDeleteTopicsCall(options, topicFutures, validTopicNames, + Collections.emptyMap(), now, deadline); runnable.call(call, now); } return new DeleteTopicsResult(new HashMap<>(topicFutures)); @@ -1587,6 +1619,8 @@ public class KafkaAdminClient extends AdminClient { private Call getDeleteTopicsCall(final DeleteTopicsOptions options, final Map> futures, final List topics, + final Map quotaExceededExceptions, + final long now, final long deadline) { return new Call("deleteTopics", deadline, new ControllerNodeProvider()) { @Override @@ -1604,6 +1638,7 @@ public class KafkaAdminClient extends AdminClient { // Handle server responses for particular topics. final DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse; final List retryTopics = new ArrayList<>(); + final Map retryTopicQuotaExceededExceptions = new HashMap<>(); for (DeletableTopicResult result : response.data().responses()) { KafkaFutureImpl future = futures.get(result.name()); if (future == null) { @@ -1612,11 +1647,13 @@ public class KafkaAdminClient extends AdminClient { ApiError error = new ApiError(result.errorCode(), result.errorMessage()); if (error.isFailure()) { if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) { + ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException( + response.throttleTimeMs(), error.messageWithFallback()); if (options.shouldRetryOnQuotaViolation()) { retryTopics.add(result.name()); + retryTopicQuotaExceededExceptions.put(result.name(), quotaExceededException); } else { - future.completeExceptionally(new ThrottlingQuotaExceededException( - response.throttleTimeMs(), error.messageWithFallback())); + future.completeExceptionally(quotaExceededException); } } else { future.completeExceptionally(error.exception()); @@ -1632,13 +1669,20 @@ public class KafkaAdminClient extends AdminClient { completeUnrealizedFutures(futures.entrySet().stream(), topic -> "The controller response did not contain a result for topic " + topic); } else { - final Call call = getDeleteTopicsCall(options, futures, retryTopics, deadline); - runnable.call(call, time.milliseconds()); + final long now = time.milliseconds(); + final Call call = getDeleteTopicsCall(options, futures, retryTopics, + retryTopicQuotaExceededExceptions, now, deadline); + runnable.call(call, now); } } @Override void handleFailure(Throwable throwable) { + // If there were any topics retries due to a quota exceeded exception, we propagate + // the initial error back to the caller if the request timed out. + maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(), + throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now)); + // Fail all the other remaining futures completeAllExceptionally(futures.values(), throwable); } }; @@ -2478,7 +2522,7 @@ public class KafkaAdminClient extends AdminClient { @Override public CreatePartitionsResult createPartitions(final Map newPartitions, - final CreatePartitionsOptions options) { + final CreatePartitionsOptions options) { final Map> futures = new HashMap<>(newPartitions.size()); final CreatePartitionsTopicCollection topics = new CreatePartitionsTopicCollection(newPartitions.size()); for (Map.Entry entry : newPartitions.entrySet()) { @@ -2498,16 +2542,19 @@ public class KafkaAdminClient extends AdminClient { if (!topics.isEmpty()) { final long now = time.milliseconds(); final long deadline = calcDeadlineMs(now, options.timeoutMs()); - final Call call = getCreatePartitionsCall(options, futures, topics, deadline); + final Call call = getCreatePartitionsCall(options, futures, topics, + Collections.emptyMap(), now, deadline); runnable.call(call, now); } return new CreatePartitionsResult(new HashMap<>(futures)); } private Call getCreatePartitionsCall(final CreatePartitionsOptions options, - final Map> futures, - final CreatePartitionsTopicCollection topics, - final long deadline) { + final Map> futures, + final CreatePartitionsTopicCollection topics, + final Map quotaExceededExceptions, + final long now, + final long deadline) { return new Call("createPartitions", deadline, new ControllerNodeProvider()) { @Override public CreatePartitionsRequest.Builder createRequest(int timeoutMs) { @@ -2525,6 +2572,7 @@ public class KafkaAdminClient extends AdminClient { // Handle server responses for particular topics. final CreatePartitionsResponse response = (CreatePartitionsResponse) abstractResponse; final CreatePartitionsTopicCollection retryTopics = new CreatePartitionsTopicCollection(); + final Map retryTopicQuotaExceededExceptions = new HashMap<>(); for (CreatePartitionsTopicResult result : response.data().results()) { KafkaFutureImpl future = futures.get(result.name()); if (future == null) { @@ -2533,11 +2581,13 @@ public class KafkaAdminClient extends AdminClient { ApiError error = new ApiError(result.errorCode(), result.errorMessage()); if (error.isFailure()) { if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) { + ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException( + response.throttleTimeMs(), error.messageWithFallback()); if (options.shouldRetryOnQuotaViolation()) { retryTopics.add(topics.find(result.name()).duplicate()); + retryTopicQuotaExceededExceptions.put(result.name(), quotaExceededException); } else { - future.completeExceptionally(new ThrottlingQuotaExceededException( - response.throttleTimeMs(), error.messageWithFallback())); + future.completeExceptionally(quotaExceededException); } } else { future.completeExceptionally(error.exception()); @@ -2553,13 +2603,20 @@ public class KafkaAdminClient extends AdminClient { completeUnrealizedFutures(futures.entrySet().stream(), topic -> "The controller response did not contain a result for topic " + topic); } else { - final Call call = getCreatePartitionsCall(options, futures, retryTopics, deadline); - runnable.call(call, time.milliseconds()); + final long now = time.milliseconds(); + final Call call = getCreatePartitionsCall(options, futures, retryTopics, + retryTopicQuotaExceededExceptions, now, deadline); + runnable.call(call, now); } } @Override void handleFailure(Throwable throwable) { + // If there were any topics retries due to a quota exceeded exception, we propagate + // the initial error back to the caller if the request timed out. + maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(), + throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now)); + // Fail all the other remaining futures completeAllExceptionally(futures.values(), throwable); } }; diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 80d52947cd3..a4ae6f41da1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -733,7 +733,9 @@ public class KafkaAdminClientTest { time.sleep(defaultApiTimeout + 1); assertNull(result.values().get("topic1").get()); - TestUtils.assertFutureThrows(result.values().get("topic2"), TimeoutException.class); + ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.values().get("topic2"), + ThrottlingQuotaExceededException.class); + assertEquals(0, e.throttleTimeMs()); TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); } } @@ -895,7 +897,9 @@ public class KafkaAdminClientTest { time.sleep(defaultApiTimeout + 1); assertNull(result.values().get("topic1").get()); - TestUtils.assertFutureThrows(result.values().get("topic2"), TimeoutException.class); + ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.values().get("topic2"), + ThrottlingQuotaExceededException.class); + assertEquals(0, e.throttleTimeMs()); TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); } } @@ -1727,7 +1731,9 @@ public class KafkaAdminClientTest { time.sleep(defaultApiTimeout + 1); assertNull(result.values().get("topic1").get()); - TestUtils.assertFutureThrows(result.values().get("topic2"), TimeoutException.class); + ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.values().get("topic2"), + ThrottlingQuotaExceededException.class); + assertEquals(0, e.throttleTimeMs()); TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); } }