mirror of https://github.com/apache/kafka.git
MINOR; Preserve ThrottlingQuotaExceededException when request timeouts after being retried due to a quota violation (KIP-599) (#9344)
This PR adds the logic to preserve the ThrottlingQuotaExceededException when topics are retried. The throttleTimeMs is also adjusted accordingly as the request could remain pending or in-flight for quite a long time. Have run various tests on clusters with enabled quotas and I, indeed, find it better to preserve the exception. Otherwise, the caller does not really understand what is going on. This allows the caller to take the appropriate measure and also to take the throttleTimeMs into consideration. Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
parent
e98b38cb1e
commit
a0fec75d3c
|
|
@ -1445,6 +1445,24 @@ public class KafkaAdminClient extends AdminClient {
|
|||
entry.getValue().completeExceptionally(new ApiException(messageFormatter.apply(entry.getKey()))));
|
||||
}
|
||||
|
||||
/**
|
||||
* Fail futures in the given Map which were retried due to exceeding quota. We propagate
|
||||
* the initial error back to the caller if the request timed out.
|
||||
*/
|
||||
private static <K, V> void maybeCompleteQuotaExceededException(
|
||||
boolean shouldRetryOnQuotaViolation,
|
||||
Throwable throwable,
|
||||
Map<K, KafkaFutureImpl<V>> futures,
|
||||
Map<K, ThrottlingQuotaExceededException> quotaExceededExceptions,
|
||||
int throttleTimeDelta) {
|
||||
if (shouldRetryOnQuotaViolation && throwable instanceof TimeoutException) {
|
||||
quotaExceededExceptions.forEach((key, value) -> futures.get(key).completeExceptionally(
|
||||
new ThrottlingQuotaExceededException(
|
||||
Math.max(0, value.throttleTimeMs() - throttleTimeDelta),
|
||||
value.getMessage())));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
|
||||
final CreateTopicsOptions options) {
|
||||
|
|
@ -1464,7 +1482,8 @@ public class KafkaAdminClient extends AdminClient {
|
|||
if (!topics.isEmpty()) {
|
||||
final long now = time.milliseconds();
|
||||
final long deadline = calcDeadlineMs(now, options.timeoutMs());
|
||||
final Call call = getCreateTopicsCall(options, topicFutures, topics, deadline);
|
||||
final Call call = getCreateTopicsCall(options, topicFutures, topics,
|
||||
Collections.emptyMap(), now, deadline);
|
||||
runnable.call(call, now);
|
||||
}
|
||||
return new CreateTopicsResult(new HashMap<>(topicFutures));
|
||||
|
|
@ -1473,6 +1492,8 @@ public class KafkaAdminClient extends AdminClient {
|
|||
private Call getCreateTopicsCall(final CreateTopicsOptions options,
|
||||
final Map<String, KafkaFutureImpl<TopicMetadataAndConfig>> futures,
|
||||
final CreatableTopicCollection topics,
|
||||
final Map<String, ThrottlingQuotaExceededException> quotaExceededExceptions,
|
||||
final long now,
|
||||
final long deadline) {
|
||||
return new Call("createTopics", deadline, new ControllerNodeProvider()) {
|
||||
@Override
|
||||
|
|
@ -1491,6 +1512,7 @@ public class KafkaAdminClient extends AdminClient {
|
|||
// Handle server responses for particular topics.
|
||||
final CreateTopicsResponse response = (CreateTopicsResponse) abstractResponse;
|
||||
final CreatableTopicCollection retryTopics = new CreatableTopicCollection();
|
||||
final Map<String, ThrottlingQuotaExceededException> retryTopicQuotaExceededExceptions = new HashMap<>();
|
||||
for (CreatableTopicResult result : response.data().topics()) {
|
||||
KafkaFutureImpl<TopicMetadataAndConfig> future = futures.get(result.name());
|
||||
if (future == null) {
|
||||
|
|
@ -1499,11 +1521,13 @@ public class KafkaAdminClient extends AdminClient {
|
|||
ApiError error = new ApiError(result.errorCode(), result.errorMessage());
|
||||
if (error.isFailure()) {
|
||||
if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) {
|
||||
ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException(
|
||||
response.throttleTimeMs(), error.messageWithFallback());
|
||||
if (options.shouldRetryOnQuotaViolation()) {
|
||||
retryTopics.add(topics.find(result.name()).duplicate());
|
||||
retryTopicQuotaExceededExceptions.put(result.name(), quotaExceededException);
|
||||
} else {
|
||||
future.completeExceptionally(new ThrottlingQuotaExceededException(
|
||||
response.throttleTimeMs(), error.messageWithFallback()));
|
||||
future.completeExceptionally(quotaExceededException);
|
||||
}
|
||||
} else {
|
||||
future.completeExceptionally(error.exception());
|
||||
|
|
@ -1535,8 +1559,10 @@ public class KafkaAdminClient extends AdminClient {
|
|||
completeUnrealizedFutures(futures.entrySet().stream(),
|
||||
topic -> "The controller response did not contain a result for topic " + topic);
|
||||
} else {
|
||||
final Call call = getCreateTopicsCall(options, futures, retryTopics, deadline);
|
||||
runnable.call(call, time.milliseconds());
|
||||
final long now = time.milliseconds();
|
||||
final Call call = getCreateTopicsCall(options, futures, retryTopics,
|
||||
retryTopicQuotaExceededExceptions, now, deadline);
|
||||
runnable.call(call, now);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1554,6 +1580,11 @@ public class KafkaAdminClient extends AdminClient {
|
|||
|
||||
@Override
|
||||
void handleFailure(Throwable throwable) {
|
||||
// If there were any topics retries due to a quota exceeded exception, we propagate
|
||||
// the initial error back to the caller if the request timed out.
|
||||
maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(),
|
||||
throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now));
|
||||
// Fail all the other remaining futures
|
||||
completeAllExceptionally(futures.values(), throwable);
|
||||
}
|
||||
};
|
||||
|
|
@ -1578,7 +1609,8 @@ public class KafkaAdminClient extends AdminClient {
|
|||
if (!validTopicNames.isEmpty()) {
|
||||
final long now = time.milliseconds();
|
||||
final long deadline = calcDeadlineMs(now, options.timeoutMs());
|
||||
final Call call = getDeleteTopicsCall(options, topicFutures, validTopicNames, deadline);
|
||||
final Call call = getDeleteTopicsCall(options, topicFutures, validTopicNames,
|
||||
Collections.emptyMap(), now, deadline);
|
||||
runnable.call(call, now);
|
||||
}
|
||||
return new DeleteTopicsResult(new HashMap<>(topicFutures));
|
||||
|
|
@ -1587,6 +1619,8 @@ public class KafkaAdminClient extends AdminClient {
|
|||
private Call getDeleteTopicsCall(final DeleteTopicsOptions options,
|
||||
final Map<String, KafkaFutureImpl<Void>> futures,
|
||||
final List<String> topics,
|
||||
final Map<String, ThrottlingQuotaExceededException> quotaExceededExceptions,
|
||||
final long now,
|
||||
final long deadline) {
|
||||
return new Call("deleteTopics", deadline, new ControllerNodeProvider()) {
|
||||
@Override
|
||||
|
|
@ -1604,6 +1638,7 @@ public class KafkaAdminClient extends AdminClient {
|
|||
// Handle server responses for particular topics.
|
||||
final DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse;
|
||||
final List<String> retryTopics = new ArrayList<>();
|
||||
final Map<String, ThrottlingQuotaExceededException> retryTopicQuotaExceededExceptions = new HashMap<>();
|
||||
for (DeletableTopicResult result : response.data().responses()) {
|
||||
KafkaFutureImpl<Void> future = futures.get(result.name());
|
||||
if (future == null) {
|
||||
|
|
@ -1612,11 +1647,13 @@ public class KafkaAdminClient extends AdminClient {
|
|||
ApiError error = new ApiError(result.errorCode(), result.errorMessage());
|
||||
if (error.isFailure()) {
|
||||
if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) {
|
||||
ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException(
|
||||
response.throttleTimeMs(), error.messageWithFallback());
|
||||
if (options.shouldRetryOnQuotaViolation()) {
|
||||
retryTopics.add(result.name());
|
||||
retryTopicQuotaExceededExceptions.put(result.name(), quotaExceededException);
|
||||
} else {
|
||||
future.completeExceptionally(new ThrottlingQuotaExceededException(
|
||||
response.throttleTimeMs(), error.messageWithFallback()));
|
||||
future.completeExceptionally(quotaExceededException);
|
||||
}
|
||||
} else {
|
||||
future.completeExceptionally(error.exception());
|
||||
|
|
@ -1632,13 +1669,20 @@ public class KafkaAdminClient extends AdminClient {
|
|||
completeUnrealizedFutures(futures.entrySet().stream(),
|
||||
topic -> "The controller response did not contain a result for topic " + topic);
|
||||
} else {
|
||||
final Call call = getDeleteTopicsCall(options, futures, retryTopics, deadline);
|
||||
runnable.call(call, time.milliseconds());
|
||||
final long now = time.milliseconds();
|
||||
final Call call = getDeleteTopicsCall(options, futures, retryTopics,
|
||||
retryTopicQuotaExceededExceptions, now, deadline);
|
||||
runnable.call(call, now);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void handleFailure(Throwable throwable) {
|
||||
// If there were any topics retries due to a quota exceeded exception, we propagate
|
||||
// the initial error back to the caller if the request timed out.
|
||||
maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(),
|
||||
throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now));
|
||||
// Fail all the other remaining futures
|
||||
completeAllExceptionally(futures.values(), throwable);
|
||||
}
|
||||
};
|
||||
|
|
@ -2478,7 +2522,7 @@ public class KafkaAdminClient extends AdminClient {
|
|||
|
||||
@Override
|
||||
public CreatePartitionsResult createPartitions(final Map<String, NewPartitions> newPartitions,
|
||||
final CreatePartitionsOptions options) {
|
||||
final CreatePartitionsOptions options) {
|
||||
final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(newPartitions.size());
|
||||
final CreatePartitionsTopicCollection topics = new CreatePartitionsTopicCollection(newPartitions.size());
|
||||
for (Map.Entry<String, NewPartitions> entry : newPartitions.entrySet()) {
|
||||
|
|
@ -2498,16 +2542,19 @@ public class KafkaAdminClient extends AdminClient {
|
|||
if (!topics.isEmpty()) {
|
||||
final long now = time.milliseconds();
|
||||
final long deadline = calcDeadlineMs(now, options.timeoutMs());
|
||||
final Call call = getCreatePartitionsCall(options, futures, topics, deadline);
|
||||
final Call call = getCreatePartitionsCall(options, futures, topics,
|
||||
Collections.emptyMap(), now, deadline);
|
||||
runnable.call(call, now);
|
||||
}
|
||||
return new CreatePartitionsResult(new HashMap<>(futures));
|
||||
}
|
||||
|
||||
private Call getCreatePartitionsCall(final CreatePartitionsOptions options,
|
||||
final Map<String, KafkaFutureImpl<Void>> futures,
|
||||
final CreatePartitionsTopicCollection topics,
|
||||
final long deadline) {
|
||||
final Map<String, KafkaFutureImpl<Void>> futures,
|
||||
final CreatePartitionsTopicCollection topics,
|
||||
final Map<String, ThrottlingQuotaExceededException> quotaExceededExceptions,
|
||||
final long now,
|
||||
final long deadline) {
|
||||
return new Call("createPartitions", deadline, new ControllerNodeProvider()) {
|
||||
@Override
|
||||
public CreatePartitionsRequest.Builder createRequest(int timeoutMs) {
|
||||
|
|
@ -2525,6 +2572,7 @@ public class KafkaAdminClient extends AdminClient {
|
|||
// Handle server responses for particular topics.
|
||||
final CreatePartitionsResponse response = (CreatePartitionsResponse) abstractResponse;
|
||||
final CreatePartitionsTopicCollection retryTopics = new CreatePartitionsTopicCollection();
|
||||
final Map<String, ThrottlingQuotaExceededException> retryTopicQuotaExceededExceptions = new HashMap<>();
|
||||
for (CreatePartitionsTopicResult result : response.data().results()) {
|
||||
KafkaFutureImpl<Void> future = futures.get(result.name());
|
||||
if (future == null) {
|
||||
|
|
@ -2533,11 +2581,13 @@ public class KafkaAdminClient extends AdminClient {
|
|||
ApiError error = new ApiError(result.errorCode(), result.errorMessage());
|
||||
if (error.isFailure()) {
|
||||
if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) {
|
||||
ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException(
|
||||
response.throttleTimeMs(), error.messageWithFallback());
|
||||
if (options.shouldRetryOnQuotaViolation()) {
|
||||
retryTopics.add(topics.find(result.name()).duplicate());
|
||||
retryTopicQuotaExceededExceptions.put(result.name(), quotaExceededException);
|
||||
} else {
|
||||
future.completeExceptionally(new ThrottlingQuotaExceededException(
|
||||
response.throttleTimeMs(), error.messageWithFallback()));
|
||||
future.completeExceptionally(quotaExceededException);
|
||||
}
|
||||
} else {
|
||||
future.completeExceptionally(error.exception());
|
||||
|
|
@ -2553,13 +2603,20 @@ public class KafkaAdminClient extends AdminClient {
|
|||
completeUnrealizedFutures(futures.entrySet().stream(),
|
||||
topic -> "The controller response did not contain a result for topic " + topic);
|
||||
} else {
|
||||
final Call call = getCreatePartitionsCall(options, futures, retryTopics, deadline);
|
||||
runnable.call(call, time.milliseconds());
|
||||
final long now = time.milliseconds();
|
||||
final Call call = getCreatePartitionsCall(options, futures, retryTopics,
|
||||
retryTopicQuotaExceededExceptions, now, deadline);
|
||||
runnable.call(call, now);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void handleFailure(Throwable throwable) {
|
||||
// If there were any topics retries due to a quota exceeded exception, we propagate
|
||||
// the initial error back to the caller if the request timed out.
|
||||
maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(),
|
||||
throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now));
|
||||
// Fail all the other remaining futures
|
||||
completeAllExceptionally(futures.values(), throwable);
|
||||
}
|
||||
};
|
||||
|
|
|
|||
|
|
@ -733,7 +733,9 @@ public class KafkaAdminClientTest {
|
|||
time.sleep(defaultApiTimeout + 1);
|
||||
|
||||
assertNull(result.values().get("topic1").get());
|
||||
TestUtils.assertFutureThrows(result.values().get("topic2"), TimeoutException.class);
|
||||
ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.values().get("topic2"),
|
||||
ThrottlingQuotaExceededException.class);
|
||||
assertEquals(0, e.throttleTimeMs());
|
||||
TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class);
|
||||
}
|
||||
}
|
||||
|
|
@ -895,7 +897,9 @@ public class KafkaAdminClientTest {
|
|||
time.sleep(defaultApiTimeout + 1);
|
||||
|
||||
assertNull(result.values().get("topic1").get());
|
||||
TestUtils.assertFutureThrows(result.values().get("topic2"), TimeoutException.class);
|
||||
ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.values().get("topic2"),
|
||||
ThrottlingQuotaExceededException.class);
|
||||
assertEquals(0, e.throttleTimeMs());
|
||||
TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class);
|
||||
}
|
||||
}
|
||||
|
|
@ -1727,7 +1731,9 @@ public class KafkaAdminClientTest {
|
|||
time.sleep(defaultApiTimeout + 1);
|
||||
|
||||
assertNull(result.values().get("topic1").get());
|
||||
TestUtils.assertFutureThrows(result.values().get("topic2"), TimeoutException.class);
|
||||
ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.values().get("topic2"),
|
||||
ThrottlingQuotaExceededException.class);
|
||||
assertEquals(0, e.throttleTimeMs());
|
||||
TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue