KAFKA-14821 Implement the listOffsets API with AdminApiDriver (#13432)

We are handling complex workflows ListOffsets by chaining together MetadataCall instances and ListOffsetsCall instances, there are many complex and error-prone logic. In this PR we rewrote it with the `AdminApiDriver` infra, notable changes better than old logic:
1. Retry lookup stage on receiving `NOT_LEADER_OR_FOLLOWER` and `LEADER_NOT_AVAILABLE`, whereas in the past we failed the partition directly without retry.
2. Removing class field `supportsMaxTimestamp` and calculating it on the fly to avoid the mutable state, this won't change any behavior of  the client.
3. Retry fulfillment stage on `RetriableException`, whereas in the past we just retry fulfillment stage on `InvalidMetadataException`, this means we will retry on `TimeoutException` and other `RetriableException`.

We also `handleUnsupportedVersionException` to `AdminApiHandler` and `AdminApiLookupStrategy`, they are used to keep consistency with old logic, and we can continue improvise them. 

Reviewers: Ziming Deng <dengziming1993@gmail.com>, David Jacot <djacot@confluent.io>
This commit is contained in:
Dimitar Dimitrov 2023-04-20 05:29:27 +02:00 committed by GitHub
parent f5de4daa71
commit e14dd8024a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 770 additions and 482 deletions

View File

@ -49,8 +49,8 @@ import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler;
import org.apache.kafka.clients.admin.internals.FenceProducersHandler;
import org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.ListOffsetsHandler;
import org.apache.kafka.clients.admin.internals.ListTransactionsHandler;
import org.apache.kafka.clients.admin.internals.MetadataOperationContext;
import org.apache.kafka.clients.admin.internals.RemoveMembersFromConsumerGroupHandler;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
@ -143,10 +143,6 @@ import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
@ -221,7 +217,6 @@ import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest;
import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
@ -269,7 +264,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -1818,7 +1812,7 @@ public class KafkaAdminClient extends AdminClient {
}
};
}
private Call getDeleteTopicsWithIdsCall(final DeleteTopicsOptions options,
final Map<Uuid, KafkaFutureImpl<Void>> futures,
final List<Uuid> topicIds,
@ -3198,14 +3192,6 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeDelegationTokenResult(tokensFuture);
}
private void rescheduleMetadataTask(MetadataOperationContext<?, ?> context, Supplier<List<Call>> nextCalls) {
log.info("Retrying to fetch metadata.");
// Requeue the task so that we can re-attempt fetching metadata
context.setResponse(Optional.empty());
Call metadataCall = getMetadataCall(context, nextCalls);
runnable.call(metadataCall, time.milliseconds());
}
@Override
public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<String> groupIds,
final DescribeConsumerGroupsOptions options) {
@ -3217,46 +3203,6 @@ public class KafkaAdminClient extends AdminClient {
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}
/**
* Returns a {@code Call} object to fetch the cluster metadata. Takes a List of Calls
* parameter to schedule actions that need to be taken using the metadata. The param is a Supplier
* so that it can be lazily created, so that it can use the results of the metadata call in its
* construction.
*
* @param <T> The type of return value of the KafkaFuture, like ListOffsetsResultInfo, etc.
* @param <O> The type of configuration option, like ListOffsetsOptions, etc
*/
private <T, O extends AbstractOptions<O>> Call getMetadataCall(MetadataOperationContext<T, O> context,
Supplier<List<Call>> nextCalls) {
return new Call("metadata", context.deadline(), new LeastLoadedNodeProvider()) {
@Override
MetadataRequest.Builder createRequest(int timeoutMs) {
return new MetadataRequest.Builder(new MetadataRequestData()
.setTopics(convertToMetadataRequestTopic(context.topics()))
.setAllowAutoTopicCreation(false));
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse response = (MetadataResponse) abstractResponse;
MetadataOperationContext.handleMetadataErrors(response);
context.setResponse(Optional.of(response));
for (Call call : nextCalls.get()) {
runnable.call(call, time.milliseconds());
}
}
@Override
void handleFailure(Throwable throwable) {
for (KafkaFutureImpl<T> future : context.futures().values()) {
future.completeExceptionally(throwable);
}
}
};
}
private Set<AclOperation> validAclOperations(final int authorizedOperations) {
if (authorizedOperations == MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) {
return null;
@ -3790,164 +3736,13 @@ public class KafkaAdminClient extends AdminClient {
@Override
public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets,
ListOffsetsOptions options) {
// preparing topics list for asking metadata about them
final Map<TopicPartition, KafkaFutureImpl<ListOffsetsResultInfo>> futures = new HashMap<>(topicPartitionOffsets.size());
final Set<String> topics = new HashSet<>();
for (TopicPartition topicPartition : topicPartitionOffsets.keySet()) {
topics.add(topicPartition.topic());
futures.put(topicPartition, new KafkaFutureImpl<>());
}
final long nowMetadata = time.milliseconds();
final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
MetadataOperationContext<ListOffsetsResultInfo, ListOffsetsOptions> context =
new MetadataOperationContext<>(topics, options, deadline, futures);
Call metadataCall = getMetadataCall(context,
() -> KafkaAdminClient.this.getListOffsetsCalls(context, topicPartitionOffsets, futures));
runnable.call(metadataCall, nowMetadata);
return new ListOffsetsResult(new HashMap<>(futures));
}
// visible for benchmark
List<Call> getListOffsetsCalls(MetadataOperationContext<ListOffsetsResultInfo, ListOffsetsOptions> context,
Map<TopicPartition, OffsetSpec> topicPartitionOffsets,
Map<TopicPartition, KafkaFutureImpl<ListOffsetsResultInfo>> futures) {
MetadataResponse mr = context.response().orElseThrow(() -> new IllegalStateException("No Metadata response"));
Cluster clusterSnapshot = mr.buildCluster();
List<Call> calls = new ArrayList<>();
// grouping topic partitions per leader
Map<Node, Map<String, ListOffsetsTopic>> leaders = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetSpec> entry: topicPartitionOffsets.entrySet()) {
OffsetSpec offsetSpec = entry.getValue();
TopicPartition tp = entry.getKey();
KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
long offsetQuery = getOffsetFromOffsetSpec(offsetSpec);
// avoid sending listOffsets request for topics with errors
if (!mr.errors().containsKey(tp.topic())) {
Node node = clusterSnapshot.leaderFor(tp);
if (node != null) {
Map<String, ListOffsetsTopic> leadersOnNode = leaders.computeIfAbsent(node, k -> new HashMap<>());
ListOffsetsTopic topic = leadersOnNode.computeIfAbsent(tp.topic(), k -> new ListOffsetsTopic().setName(tp.topic()));
topic.partitions().add(new ListOffsetsPartition().setPartitionIndex(tp.partition()).setTimestamp(offsetQuery));
} else {
future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception());
}
} else {
future.completeExceptionally(mr.errors().get(tp.topic()).exception());
}
}
for (final Map.Entry<Node, Map<String, ListOffsetsTopic>> entry : leaders.entrySet()) {
final int brokerId = entry.getKey().id();
calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
private boolean supportsMaxTimestamp = partitionsToQuery.stream()
.flatMap(t -> t.partitions().stream())
.anyMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP);
@Override
ListOffsetsRequest.Builder createRequest(int timeoutMs) {
return ListOffsetsRequest.Builder
.forConsumer(true, context.options().isolationLevel(), supportsMaxTimestamp)
.setTargetTimes(partitionsToQuery);
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
ListOffsetsResponse response = (ListOffsetsResponse) abstractResponse;
Map<TopicPartition, OffsetSpec> retryTopicPartitionOffsets = new HashMap<>();
for (ListOffsetsTopicResponse topic : response.topics()) {
for (ListOffsetsPartitionResponse partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
Errors error = Errors.forCode(partition.errorCode());
OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp);
if (offsetRequestSpec == null) {
log.warn("Server response mentioned unknown topic partition {}", tp);
} else if (MetadataOperationContext.shouldRefreshMetadata(error)) {
retryTopicPartitionOffsets.put(tp, offsetRequestSpec);
} else if (error == Errors.NONE) {
Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH)
? Optional.empty()
: Optional.of(partition.leaderEpoch());
future.complete(new ListOffsetsResultInfo(partition.offset(), partition.timestamp(), leaderEpoch));
} else {
future.completeExceptionally(error.exception());
}
}
}
if (retryTopicPartitionOffsets.isEmpty()) {
// The server should send back a response for every topic partition. But do a sanity check anyway.
for (ListOffsetsTopic topic : partitionsToQuery) {
for (ListOffsetsPartition partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
ApiException error = new ApiException("The response from broker " + brokerId +
" did not contain a result for topic partition " + tp);
futures.get(tp).completeExceptionally(error);
}
}
} else {
Set<String> retryTopics = retryTopicPartitionOffsets.keySet().stream().map(
TopicPartition::topic).collect(Collectors.toSet());
MetadataOperationContext<ListOffsetsResultInfo, ListOffsetsOptions> retryContext =
new MetadataOperationContext<>(retryTopics, context.options(), context.deadline(), futures);
rescheduleMetadataTask(retryContext, () -> getListOffsetsCalls(retryContext, retryTopicPartitionOffsets, futures));
}
}
@Override
void handleFailure(Throwable throwable) {
for (ListOffsetsTopic topic : entry.getValue().values()) {
for (ListOffsetsPartition partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
future.completeExceptionally(throwable);
}
}
}
@Override
boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
if (supportsMaxTimestamp) {
supportsMaxTimestamp = false;
// fail any unsupported futures and remove partitions from the downgraded retry
Iterator<ListOffsetsTopic> topicIterator = partitionsToQuery.iterator();
while (topicIterator.hasNext()) {
ListOffsetsTopic topic = topicIterator.next();
Iterator<ListOffsetsPartition> partitionIterator = topic.partitions().iterator();
while (partitionIterator.hasNext()) {
ListOffsetsPartition partition = partitionIterator.next();
if (partition.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) {
futures.get(new TopicPartition(topic.name(), partition.partitionIndex()))
.completeExceptionally(new UnsupportedVersionException(
"Broker " + brokerId + " does not support MAX_TIMESTAMP offset spec"));
partitionIterator.remove();
}
}
if (topic.partitions().isEmpty()) {
topicIterator.remove();
}
}
return !partitionsToQuery.isEmpty();
}
return false;
}
});
}
return calls;
AdminApiFuture.SimpleAdminApiFuture<TopicPartition, ListOffsetsResultInfo> future =
ListOffsetsHandler.newFuture(topicPartitionOffsets.keySet());
Map<TopicPartition, Long> offsetQueriesByPartition = topicPartitionOffsets.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> getOffsetFromSpec(e.getValue())));
ListOffsetsHandler handler = new ListOffsetsHandler(offsetQueriesByPartition, options, logContext);
invokeDriver(handler, future, options.timeoutMs);
return new ListOffsetsResult(future.all());
}
@Override
@ -4579,7 +4374,7 @@ public class KafkaAdminClient extends AdminClient {
};
}
private long getOffsetFromOffsetSpec(OffsetSpec offsetSpec) {
private static long getOffsetFromSpec(OffsetSpec offsetSpec) {
if (offsetSpec instanceof TimestampSpec) {
return ((TimestampSpec) offsetSpec).timestamp();
} else if (offsetSpec instanceof OffsetSpec.EarliestSpec) {

View File

@ -18,6 +18,7 @@ package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest.NoBatchedFindCoordinatorsException;
@ -260,12 +261,31 @@ public class AdminApiDriver<K, V> {
.filter(future.lookupKeys()::contains)
.collect(Collectors.toSet());
retryLookup(keysToUnmap);
} else if (t instanceof UnsupportedVersionException) {
if (spec.scope instanceof FulfillmentScope) {
int brokerId = ((FulfillmentScope) spec.scope).destinationBrokerId;
Map<K, Throwable> unrecoverableFailures =
handler.handleUnsupportedVersionException(
brokerId,
(UnsupportedVersionException) t,
spec.keys);
completeExceptionally(unrecoverableFailures);
} else {
Map<K, Throwable> unrecoverableLookupFailures =
handler.lookupStrategy().handleUnsupportedVersionException(
(UnsupportedVersionException) t,
spec.keys);
completeLookupExceptionally(unrecoverableLookupFailures);
Set<K> keysToUnmap = spec.keys.stream()
.filter(k -> !unrecoverableLookupFailures.containsKey(k))
.collect(Collectors.toSet());
retryLookup(keysToUnmap);
}
} else {
Map<K, Throwable> errors = spec.keys.stream().collect(Collectors.toMap(
Function.identity(),
key -> t
));
if (spec.scope instanceof FulfillmentScope) {
completeExceptionally(errors);
} else {

View File

@ -17,6 +17,7 @@
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
@ -70,6 +71,23 @@ public interface AdminApiHandler<K, V> {
*/
ApiResult<K, V> handleResponse(Node broker, Set<K> keys, AbstractResponse response);
/**
* Callback that is invoked when a fulfillment request hits an UnsupportedVersionException.
* Keys for which the exception cannot be handled and the request shouldn't be retried must be mapped
* to an error and returned. The request will then be retried for the remainder of the keys.
*
* @return The failure mappings for the keys for which the exception cannot be handled and the
* request shouldn't be retried. If the exception cannot be handled all initial keys will be in
* the returned map.
*/
default Map<K, Throwable> handleUnsupportedVersionException(
int brokerId,
UnsupportedVersionException exception,
Set<K> keys
) {
return keys.stream().collect(Collectors.toMap(k -> k, k -> exception));
}
/**
* Get the lookup strategy that is responsible for finding the brokerId
* which will handle each respective key.

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.clients.admin.internals;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
@ -81,6 +83,23 @@ public interface AdminApiLookupStrategy<T> {
*/
LookupResult<T> handleResponse(Set<T> keys, AbstractResponse response);
/**
* Callback that is invoked when a lookup request hits an UnsupportedVersionException.
* Keys for which the exception cannot be handled and the request shouldn't be retried must be mapped
* to an error and returned. The remainder of the keys will then be unmapped and the lookup request will
* be retried for them.
*
* @return The failure mappings for the keys for which the exception cannot be handled and the
* request shouldn't be retried. If the exception cannot be handled all initial keys will be in
* the returned map.
*/
default Map<T, Throwable> handleUnsupportedVersionException(
UnsupportedVersionException exception,
Set<T> keys
) {
return keys.stream().collect(Collectors.toMap(k -> k, k -> exception));
}
class LookupResult<K> {
// This is the set of keys that have been completed by the lookup phase itself.
// The driver will not attempt lookup or fulfillment for completed keys.

View File

@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin.internals;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.utils.CollectionUtils;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
public final class ListOffsetsHandler extends Batched<TopicPartition, ListOffsetsResultInfo> {
private final Map<TopicPartition, Long> offsetTimestampsByPartition;
private final ListOffsetsOptions options;
private final Logger log;
private final AdminApiLookupStrategy<TopicPartition> lookupStrategy;
public ListOffsetsHandler(
Map<TopicPartition, Long> offsetTimestampsByPartition,
ListOffsetsOptions options,
LogContext logContext
) {
this.offsetTimestampsByPartition = offsetTimestampsByPartition;
this.options = options;
this.log = logContext.logger(ListOffsetsHandler.class);
this.lookupStrategy = new PartitionLeaderStrategy(logContext);
}
@Override
public String apiName() {
return "listOffsets";
}
@Override
public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {
return this.lookupStrategy;
}
@Override
ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<TopicPartition> keys) {
Map<String, ListOffsetsTopic> topicsByName = CollectionUtils.groupPartitionsByTopic(
keys,
topicName -> new ListOffsetsTopic().setName(topicName),
(listOffsetsTopic, partitionId) -> {
TopicPartition topicPartition = new TopicPartition(listOffsetsTopic.name(), partitionId);
long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition);
listOffsetsTopic.partitions().add(
new ListOffsetsPartition()
.setPartitionIndex(partitionId)
.setTimestamp(offsetTimestamp));
});
boolean supportsMaxTimestamp = keys
.stream()
.anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.MAX_TIMESTAMP);
return ListOffsetsRequest.Builder
.forConsumer(true, options.isolationLevel(), supportsMaxTimestamp)
.setTargetTimes(new ArrayList<>(topicsByName.values()));
}
@Override
public ApiResult<TopicPartition, ListOffsetsResultInfo> handleResponse(
Node broker,
Set<TopicPartition> keys,
AbstractResponse abstractResponse
) {
ListOffsetsResponse response = (ListOffsetsResponse) abstractResponse;
Map<TopicPartition, ListOffsetsResultInfo> completed = new HashMap<>();
Map<TopicPartition, Throwable> failed = new HashMap<>();
List<TopicPartition> unmapped = new ArrayList<>();
Set<TopicPartition> retriable = new HashSet<>();
for (ListOffsetsTopicResponse topic : response.topics()) {
for (ListOffsetsPartitionResponse partition : topic.partitions()) {
TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
Errors error = Errors.forCode(partition.errorCode());
if (!offsetTimestampsByPartition.containsKey(topicPartition)) {
log.warn("ListOffsets response includes unknown topic partition {}", topicPartition);
} else if (error == Errors.NONE) {
Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH)
? Optional.empty()
: Optional.of(partition.leaderEpoch());
completed.put(
topicPartition,
new ListOffsetsResultInfo(partition.offset(), partition.timestamp(), leaderEpoch));
} else {
handlePartitionError(topicPartition, error, failed, unmapped, retriable);
}
}
}
// Sanity-check if the current leader for these partitions returned results for all of them
for (TopicPartition topicPartition : keys) {
if (unmapped.isEmpty()
&& !completed.containsKey(topicPartition)
&& !failed.containsKey(topicPartition)
&& !retriable.contains(topicPartition)
) {
ApiException sanityCheckException = new ApiException(
"The response from broker " + broker.id() +
" did not contain a result for topic partition " + topicPartition);
log.error(
"ListOffsets request for topic partition {} failed sanity check",
topicPartition,
sanityCheckException);
failed.put(topicPartition, sanityCheckException);
}
}
return new ApiResult<>(completed, failed, unmapped);
}
private void handlePartitionError(
TopicPartition topicPartition,
Errors error,
Map<TopicPartition, Throwable> failed,
List<TopicPartition> unmapped,
Set<TopicPartition> retriable
) {
if (error == Errors.NOT_LEADER_OR_FOLLOWER || error == Errors.LEADER_NOT_AVAILABLE) {
log.debug(
"ListOffsets lookup request for topic partition {} will be retried due to invalid leader metadata {}",
topicPartition,
error);
unmapped.add(topicPartition);
} else if (error.exception() instanceof RetriableException) {
log.debug(
"ListOffsets fulfillment request for topic partition {} will be retried due to {}",
topicPartition,
error);
retriable.add(topicPartition);
} else {
log.error(
"ListOffsets request for topic partition {} failed due to an unexpected error {}",
topicPartition,
error);
failed.put(topicPartition, error.exception());
}
}
@Override
public Map<TopicPartition, Throwable> handleUnsupportedVersionException(
int brokerId, UnsupportedVersionException exception, Set<TopicPartition> keys
) {
log.warn("Broker " + brokerId + " does not support MAX_TIMESTAMP offset specs");
Map<TopicPartition, Throwable> maxTimestampPartitions = new HashMap<>();
for (TopicPartition topicPartition : keys) {
Long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition);
if (offsetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
maxTimestampPartitions.put(topicPartition, exception);
}
}
// If there are no partitions with MAX_TIMESTAMP specs the UnsupportedVersionException cannot be handled
// and all partitions should be failed here.
// Otherwise, just the partitions with MAX_TIMESTAMP specs should be failed here and the fulfillment stage
// will later be retried for the potentially empty set of partitions with non-MAX_TIMESTAMP specs.
if (maxTimestampPartitions.isEmpty()) {
return keys.stream().collect(Collectors.toMap(k -> k, k -> exception));
} else {
return maxTimestampPartitions;
}
}
public static SimpleAdminApiFuture<TopicPartition, ListOffsetsResultInfo> newFuture(
Collection<TopicPartition> topicPartitions
) {
return AdminApiFuture.forKeys(new HashSet<>(topicPartitions));
}
}

View File

@ -1,96 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin.internals;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.admin.AbstractOptions;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
/**
* Context class to encapsulate parameters of a call to fetch and use cluster metadata.
* Some of the parameters are provided at construction and are immutable whereas others are provided
* as "Call" are completed and values are available.
*
* @param <T> The type of return value of the KafkaFuture
* @param <O> The type of configuration option.
*/
public final class MetadataOperationContext<T, O extends AbstractOptions<O>> {
final private Collection<String> topics;
final private O options;
final private long deadline;
final private Map<TopicPartition, KafkaFutureImpl<T>> futures;
private Optional<MetadataResponse> response;
public MetadataOperationContext(Collection<String> topics,
O options,
long deadline,
Map<TopicPartition, KafkaFutureImpl<T>> futures) {
this.topics = topics;
this.options = options;
this.deadline = deadline;
this.futures = futures;
this.response = Optional.empty();
}
public void setResponse(Optional<MetadataResponse> response) {
this.response = response;
}
public Optional<MetadataResponse> response() {
return response;
}
public O options() {
return options;
}
public long deadline() {
return deadline;
}
public Map<TopicPartition, KafkaFutureImpl<T>> futures() {
return futures;
}
public Collection<String> topics() {
return topics;
}
public static void handleMetadataErrors(MetadataResponse response) {
for (TopicMetadata tm : response.topicMetadata()) {
for (PartitionMetadata pm : tm.partitionMetadata()) {
if (shouldRefreshMetadata(pm.error)) {
throw pm.error.exception();
}
}
}
}
public static boolean shouldRefreshMetadata(Errors error) {
return error.exception() instanceof InvalidMetadataException;
}
}

View File

@ -17,13 +17,11 @@
package org.apache.kafka.clients.admin;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.clients.HostResolver;
import org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
import org.apache.kafka.clients.admin.internals.MetadataOperationContext;
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
@ -143,17 +141,6 @@ public class AdminClientTestUtils {
return new ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future));
}
/**
* Used for benchmark. KafkaAdminClient.getListOffsetsCalls is only accessible
* from within the admin package.
*/
public static List<KafkaAdminClient.Call> getListOffsetsCalls(KafkaAdminClient adminClient,
MetadataOperationContext<ListOffsetsResult.ListOffsetsResultInfo, ListOffsetsOptions> context,
Map<TopicPartition, OffsetSpec> topicPartitionOffsets,
Map<TopicPartition, KafkaFutureImpl<ListOffsetsResult.ListOffsetsResultInfo>> futures) {
return adminClient.getListOffsetsCalls(context, topicPartitionOffsets, futures);
}
/**
* Helper to create a KafkaAdminClient with a custom HostResolver accessible to tests outside this package.
*/

View File

@ -4964,6 +4964,93 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testListOffsetsHandlesFulfillmentTimeouts() throws Exception {
Node node = new Node(0, "localhost", 8120);
List<Node> nodes = Collections.singletonList(node);
List<PartitionInfo> pInfos = new ArrayList<>();
pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
final Cluster cluster = new Cluster(
"mockClusterId",
nodes,
pInfos,
Collections.emptySet(),
Collections.emptySet(),
node);
final TopicPartition tp0 = new TopicPartition("foo", 0);
final TopicPartition tp1 = new TopicPartition("foo", 1);
int numRetries = 2;
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
AdminClientConfig.RETRIES_CONFIG, Integer.toString(numRetries))) {
ListOffsetsTopicResponse tp0ErrorResponse =
ListOffsetsResponse.singletonListOffsetsTopicResponse(tp0, Errors.REQUEST_TIMED_OUT, -1L, -1L, -1);
ListOffsetsTopicResponse tp1Response =
ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543);
ListOffsetsResponseData responseDataWithError = new ListOffsetsResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(tp0ErrorResponse, tp1Response));
ListOffsetsTopicResponse tp0Response =
ListOffsetsResponse.singletonListOffsetsTopicResponse(tp0, Errors.NONE, -1L, 789L, 987);
ListOffsetsResponseData responseData = new ListOffsetsResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(tp0Response, tp1Response));
// Test that one-too-many timeouts for partition 0 result in partial success overall -
// timeout for partition 0 and success for partition 1.
// It might be desirable to have the AdminApiDriver mechanism also handle all retriable
// exceptions like TimeoutException during the lookup stage (it currently doesn't).
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
for (int i = 0; i < numRetries + 1; i++) {
env.kafkaClient().prepareResponseFrom(
request -> request instanceof ListOffsetsRequest,
new ListOffsetsResponse(responseDataWithError), node);
}
ListOffsetsResult result = env.adminClient().listOffsets(
new HashMap<TopicPartition, OffsetSpec>() {
{
put(tp0, OffsetSpec.latest());
put(tp1, OffsetSpec.latest());
}
});
TestUtils.assertFutureThrows(result.partitionResult(tp0), TimeoutException.class);
ListOffsetsResultInfo tp1Result = result.partitionResult(tp1).get();
assertEquals(345L, tp1Result.offset());
assertEquals(543, tp1Result.leaderEpoch().get().intValue());
assertEquals(-1L, tp1Result.timestamp());
// Now test that only numRetries timeouts for partition 0 result in success for both
// partition 0 and partition 1.
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
for (int i = 0; i < numRetries; i++) {
env.kafkaClient().prepareResponseFrom(
request -> request instanceof ListOffsetsRequest,
new ListOffsetsResponse(responseDataWithError), node);
}
env.kafkaClient().prepareResponseFrom(
request -> request instanceof ListOffsetsRequest, new ListOffsetsResponse(responseData), node);
result = env.adminClient().listOffsets(
new HashMap<TopicPartition, OffsetSpec>() {
{
put(tp0, OffsetSpec.latest());
put(tp1, OffsetSpec.latest());
}
});
ListOffsetsResultInfo tp0Result = result.partitionResult(tp0).get();
assertEquals(789L, tp0Result.offset());
assertEquals(987, tp0Result.leaderEpoch().get().intValue());
assertEquals(-1L, tp0Result.timestamp());
tp1Result = result.partitionResult(tp1).get();
assertEquals(345L, tp1Result.offset());
assertEquals(543, tp1Result.leaderEpoch().get().intValue());
assertEquals(-1L, tp1Result.timestamp());
}
}
@Test
public void testListOffsetsUnsupportedNonMaxTimestamp() {
Node node = new Node(0, "localhost", 8120);

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.admin.internals;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.admin.internals.AdminApiDriver.RequestSpec;
import org.apache.kafka.clients.admin.internals.AdminApiHandler.ApiResult;
import org.apache.kafka.clients.admin.internals.AdminApiLookupStrategy.LookupResult;
@ -23,6 +24,7 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
@ -107,25 +109,32 @@ class AdminApiDriverTest {
@Test
public void testKeyLookupFailure() {
TestContext ctx = TestContext.dynamicMapped(map(
"foo", "c1",
"bar", "c2"
));
// Ensure that both generic failures and unhandled UnsupportedVersionExceptions (which could be specifically
// handled in both the lookup and the fulfillment stages) result in the expected lookup failures.
Exception[] keyLookupExceptions = new Exception[] {
new UnknownServerException(), new UnsupportedVersionException("")
};
for (Exception keyLookupException : keyLookupExceptions) {
TestContext ctx = TestContext.dynamicMapped(map(
"foo", "c1",
"bar", "c2"
));
Map<Set<String>, LookupResult<String>> lookupRequests = map(
mkSet("foo"), failedLookup("foo", new UnknownServerException()),
mkSet("bar"), mapped("bar", 1)
);
Map<Set<String>, LookupResult<String>> lookupRequests = map(
mkSet("foo"), failedLookup("foo", keyLookupException),
mkSet("bar"), mapped("bar", 1)
);
ctx.poll(lookupRequests, emptyMap());
ctx.poll(lookupRequests, emptyMap());
Map<Set<String>, ApiResult<String, Long>> fulfillmentResults = map(
mkSet("bar"), completed("bar", 30L)
);
Map<Set<String>, ApiResult<String, Long>> fulfillmentResults = map(
mkSet("bar"), completed("bar", 30L)
);
ctx.poll(emptyMap(), fulfillmentResults);
ctx.poll(emptyMap(), fulfillmentResults);
ctx.poll(emptyMap(), emptyMap());
ctx.poll(emptyMap(), emptyMap());
}
}
@Test
@ -257,6 +266,59 @@ class AdminApiDriverTest {
ctx.poll(emptyMap(), emptyMap());
}
@Test
public void testFulfillmentFailureUnsupportedVersion() {
TestContext ctx = TestContext.staticMapped(map(
"foo", 0,
"bar", 1,
"baz", 1
));
Map<Set<String>, ApiResult<String, Long>> fulfillmentResults = map(
mkSet("foo"), failed("foo", new UnsupportedVersionException("")),
mkSet("bar", "baz"), completed("bar", 30L, "baz", 45L)
);
ctx.poll(emptyMap(), fulfillmentResults);
ctx.poll(emptyMap(), emptyMap());
}
@Test
public void testFulfillmentRetriableUnsupportedVersion() {
TestContext ctx = TestContext.staticMapped(map(
"foo", 0,
"bar", 1,
"baz", 2
));
ctx.handler.addRetriableUnsupportedVersionKey("foo");
// The mapped ApiResults are only used in the onResponse/handleResponse path - anything that needs
// to be handled in the onFailure path needs to be manually set up later.
ctx.handler.expectRequest(mkSet("foo"), failed("foo", new UnsupportedVersionException("")));
ctx.handler.expectRequest(mkSet("bar"), failed("bar", new UnsupportedVersionException("")));
ctx.handler.expectRequest(mkSet("baz"), completed("baz", 45L));
// Setting up specific fulfillment stage executions requires polling the driver in order to obtain
// the request specs needed for the onResponse/onFailure callbacks.
List<RequestSpec<String>> requestSpecs = ctx.driver.poll();
requestSpecs.forEach(requestSpec -> {
if (requestSpec.keys.contains("foo") || requestSpec.keys.contains("bar")) {
ctx.driver.onFailure(ctx.time.milliseconds(), requestSpec, new UnsupportedVersionException(""));
} else {
ctx.driver.onResponse(
ctx.time.milliseconds(),
requestSpec,
new MetadataResponse(new MetadataResponseData(), ApiKeys.METADATA.latestVersion()),
Node.noNode());
}
});
// Verify retry for "foo" but not for "bar" or "baz"
ctx.poll(emptyMap(), map(
mkSet("foo"), failed("foo", new UnsupportedVersionException(""))
));
ctx.poll(emptyMap(), emptyMap());
}
@Test
public void testRecoalescedLookup() {
TestContext ctx = TestContext.dynamicMapped(map(
@ -737,9 +799,11 @@ class AdminApiDriverTest {
private static class MockAdminApiHandler<K, V> extends AdminApiHandler.Batched<K, V> {
private final Map<Set<K>, ApiResult<K, V>> expectedRequests = new HashMap<>();
private final MockLookupStrategy<K> lookupStrategy;
private final Map<K, Boolean> retriableUnsupportedVersionKeys;
private MockAdminApiHandler(MockLookupStrategy<K> lookupStrategy) {
this.lookupStrategy = lookupStrategy;
this.retriableUnsupportedVersionKeys = new ConcurrentHashMap<>();
}
@Override
@ -770,9 +834,25 @@ class AdminApiDriverTest {
);
}
@Override
public Map<K, Throwable> handleUnsupportedVersionException(
int brokerId,
UnsupportedVersionException exception,
Set<K> keys
) {
return keys
.stream()
.filter(k -> !retriableUnsupportedVersionKeys.containsKey(k))
.collect(Collectors.toMap(k -> k, k -> exception));
}
public void reset() {
expectedRequests.clear();
}
public void addRetriableUnsupportedVersionKey(K key) {
retriableUnsupportedVersionKeys.put(key, Boolean.TRUE);
}
}
private static <K, V> Map<K, V> map(K key, V value) {

View File

@ -0,0 +1,311 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin.internals;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.internals.AdminApiHandler.ApiResult;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Test;
public final class ListOffsetsHandlerTest {
private final LogContext logContext = new LogContext();
private final TopicPartition t0p0 = new TopicPartition("t0", 0);
private final TopicPartition t0p1 = new TopicPartition("t0", 1);
private final TopicPartition t1p0 = new TopicPartition("t1", 0);
private final TopicPartition t1p1 = new TopicPartition("t1", 1);
private final Node node = new Node(1, "host", 1234);
private final Map<TopicPartition, Long> offsetTimestampsByPartition = new HashMap<TopicPartition, Long>() {
{
put(t0p0, ListOffsetsRequest.LATEST_TIMESTAMP);
put(t0p1, ListOffsetsRequest.EARLIEST_TIMESTAMP);
put(t1p0, 123L);
put(t1p1, ListOffsetsRequest.MAX_TIMESTAMP);
}
};
@Test
public void testBuildRequestSimple() {
ListOffsetsHandler handler =
new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext);
ListOffsetsRequest request = handler.buildBatchedRequest(node.id(), mkSet(t0p0, t0p1)).build();
List<ListOffsetsTopic> topics = request.topics();
assertEquals(1, topics.size());
ListOffsetsTopic topic = topics.get(0);
assertEquals(2, topic.partitions().size());
for (ListOffsetsPartition partition : topic.partitions()) {
TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
assertExpectedTimestamp(topicPartition, partition.timestamp());
}
assertEquals(IsolationLevel.READ_UNCOMMITTED, request.isolationLevel());
}
@Test
public void testBuildRequestMultipleTopicsWithReadCommitted() {
ListOffsetsHandler handler =
new ListOffsetsHandler(
offsetTimestampsByPartition, new ListOffsetsOptions(IsolationLevel.READ_COMMITTED), logContext);
ListOffsetsRequest request =
handler.buildBatchedRequest(node.id(), offsetTimestampsByPartition.keySet()).build();
List<ListOffsetsTopic> topics = request.topics();
assertEquals(2, topics.size());
Map<TopicPartition, ListOffsetsPartition> partitions = new HashMap<>();
for (ListOffsetsTopic topic : topics) {
for (ListOffsetsPartition partition : topic.partitions()) {
partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partition);
}
}
assertEquals(4, partitions.size());
for (Map.Entry<TopicPartition, ListOffsetsPartition> entry : partitions.entrySet()) {
assertExpectedTimestamp(entry.getKey(), entry.getValue().timestamp());
}
assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
}
@Test
public void testBuildRequestAllowedVersions() {
ListOffsetsHandler defaultOptionsHandler =
new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext);
ListOffsetsRequest.Builder builder =
defaultOptionsHandler.buildBatchedRequest(node.id(), mkSet(t0p0, t0p1, t1p0));
assertEquals(1, builder.oldestAllowedVersion());
ListOffsetsHandler readCommittedHandler =
new ListOffsetsHandler(
offsetTimestampsByPartition, new ListOffsetsOptions(IsolationLevel.READ_COMMITTED), logContext);
builder = readCommittedHandler.buildBatchedRequest(node.id(), mkSet(t0p0, t0p1, t1p0));
assertEquals(2, builder.oldestAllowedVersion());
builder = readCommittedHandler.buildBatchedRequest(node.id(), mkSet(t0p0, t0p1, t1p0, t1p1));
assertEquals(7, builder.oldestAllowedVersion());
}
@Test
public void testHandleSuccessfulResponse() {
ApiResult<TopicPartition, ListOffsetsResultInfo> result =
handleResponse(createResponse(emptyMap()));
assertResult(result, offsetTimestampsByPartition.keySet(), emptyMap(), emptyList(), emptySet());
}
@Test
public void testHandleRetriablePartitionTimeoutResponse() {
TopicPartition errorPartition = t0p0;
Map<TopicPartition, Short> errorsByPartition = new HashMap<>();
errorsByPartition.put(errorPartition, Errors.REQUEST_TIMED_OUT.code());
ApiResult<TopicPartition, ListOffsetsResultInfo> result =
handleResponse(createResponse(errorsByPartition));
// Timeouts should be retried within the fulfillment stage as they are a common type of
// retriable error.
Set<TopicPartition> retriable = singleton(errorPartition);
Set<TopicPartition> completed = new HashSet<>(offsetTimestampsByPartition.keySet());
completed.removeAll(retriable);
assertResult(result, completed, emptyMap(), emptyList(), retriable);
}
@Test
public void testHandleLookupRetriablePartitionInvalidMetadataResponse() {
TopicPartition errorPartition = t0p0;
Errors error = Errors.NOT_LEADER_OR_FOLLOWER;
Map<TopicPartition, Short> errorsByPartition = new HashMap<>();
errorsByPartition.put(errorPartition, error.code());
ApiResult<TopicPartition, ListOffsetsResultInfo> result =
handleResponse(createResponse(errorsByPartition));
// Some invalid metadata errors should be retried from the lookup stage as the partition-to-leader
// mappings should be recalculated.
List<TopicPartition> unmapped = new ArrayList<>();
unmapped.add(errorPartition);
Set<TopicPartition> completed = new HashSet<>(offsetTimestampsByPartition.keySet());
completed.removeAll(unmapped);
assertResult(result, completed, emptyMap(), unmapped, emptySet());
}
@Test
public void testHandleUnexpectedPartitionErrorResponse() {
TopicPartition errorPartition = t0p0;
Errors error = Errors.UNKNOWN_SERVER_ERROR;
Map<TopicPartition, Short> errorsByPartition = new HashMap<>();
errorsByPartition.put(errorPartition, error.code());
ApiResult<TopicPartition, ListOffsetsResultInfo> result =
handleResponse(createResponse(errorsByPartition));
Map<TopicPartition, Throwable> failed = new HashMap<>();
failed.put(errorPartition, error.exception());
Set<TopicPartition> completed = new HashSet<>(offsetTimestampsByPartition.keySet());
completed.removeAll(failed.keySet());
assertResult(result, completed, failed, emptyList(), emptySet());
}
@Test
public void testHandleResponseSanityCheck() {
TopicPartition errorPartition = t0p0;
Map<TopicPartition, Long> specsByPartition = new HashMap<>(offsetTimestampsByPartition);
specsByPartition.remove(errorPartition);
ApiResult<TopicPartition, ListOffsetsResultInfo> result =
handleResponse(createResponse(emptyMap(), specsByPartition));
assertEquals(offsetTimestampsByPartition.size() - 1, result.completedKeys.size());
assertEquals(1, result.failedKeys.size());
assertEquals(errorPartition, result.failedKeys.keySet().iterator().next());
String sanityCheckMessage = result.failedKeys.get(errorPartition).getMessage();
assertTrue(sanityCheckMessage.contains("did not contain a result for topic partition"));
assertTrue(result.unmappedKeys.isEmpty());
}
@Test
public void testHandleResponseUnsupportedVersion() {
int brokerId = 1;
UnsupportedVersionException uve = new UnsupportedVersionException("");
Map<TopicPartition, OffsetSpec> maxTimestampPartitions = new HashMap<>();
maxTimestampPartitions.put(t1p1, OffsetSpec.maxTimestamp());
ListOffsetsHandler handler =
new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext);
final Map<TopicPartition, Long> nonMaxTimestampPartitions = new HashMap<>(offsetTimestampsByPartition);
maxTimestampPartitions.forEach((k, v) -> nonMaxTimestampPartitions.remove(k));
// Unsupported version exceptions currently cannot be handled if there's no partition with a
// MAX_TIMESTAMP spec...
Set<TopicPartition> keysToTest = nonMaxTimestampPartitions.keySet();
Set<TopicPartition> expectedFailures = keysToTest;
assertEquals(
mapToError(expectedFailures, uve),
handler.handleUnsupportedVersionException(brokerId, uve, keysToTest));
// ...or if there are only partitions with MAX_TIMESTAMP specs.
keysToTest = maxTimestampPartitions.keySet();
expectedFailures = keysToTest;
assertEquals(
mapToError(expectedFailures, uve),
handler.handleUnsupportedVersionException(brokerId, uve, keysToTest));
// What can be handled is a request with a mix of partitions with MAX_TIMESTAMP specs
// and partitions with non-MAX_TIMESTAMP specs.
keysToTest = offsetTimestampsByPartition.keySet();
expectedFailures = maxTimestampPartitions.keySet();
assertEquals(
mapToError(expectedFailures, uve),
handler.handleUnsupportedVersionException(brokerId, uve, keysToTest));
}
private static Map<TopicPartition, Throwable> mapToError(Set<TopicPartition> keys, Throwable t) {
return keys.stream().collect(Collectors.toMap(k -> k, k -> t));
}
private void assertExpectedTimestamp(TopicPartition topicPartition, long actualTimestamp) {
Long expectedTimestamp = offsetTimestampsByPartition.get(topicPartition);
assertEquals(expectedTimestamp, actualTimestamp);
}
private ListOffsetsResponse createResponse(Map<TopicPartition, Short> errorsByPartition) {
return createResponse(errorsByPartition, offsetTimestampsByPartition);
}
private static ListOffsetsResponse createResponse(
Map<TopicPartition, Short> errorsByPartition,
Map<TopicPartition, Long> specsByPartition
) {
Map<String, ListOffsetsTopicResponse> responsesByTopic = new HashMap<>();
for (Map.Entry<TopicPartition, Long> offsetSpecEntry : specsByPartition.entrySet()) {
TopicPartition topicPartition = offsetSpecEntry.getKey();
ListOffsetsTopicResponse topicResponse = responsesByTopic.computeIfAbsent(
topicPartition.topic(), t -> new ListOffsetsTopicResponse());
topicResponse.setName(topicPartition.topic());
ListOffsetsPartitionResponse partitionResponse = new ListOffsetsPartitionResponse();
partitionResponse.setPartitionIndex(topicPartition.partition());
partitionResponse.setOffset(getOffset(topicPartition, offsetSpecEntry.getValue()));
partitionResponse.setErrorCode(errorsByPartition.getOrDefault(topicPartition, (short) 0));
topicResponse.partitions().add(partitionResponse);
}
ListOffsetsResponseData responseData = new ListOffsetsResponseData();
responseData.setTopics(new ArrayList<>(responsesByTopic.values()));
return new ListOffsetsResponse(responseData);
}
private ApiResult<TopicPartition, ListOffsetsResultInfo> handleResponse(ListOffsetsResponse response) {
ListOffsetsHandler handler =
new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext);
return handler.handleResponse(node, offsetTimestampsByPartition.keySet(), response);
}
private void assertResult(
ApiResult<TopicPartition, ListOffsetsResultInfo> result,
Set<TopicPartition> expectedCompleted,
Map<TopicPartition, Throwable> expectedFailed,
List<TopicPartition> expectedUnmapped,
Set<TopicPartition> expectedRetriable
) {
assertEquals(expectedCompleted, result.completedKeys.keySet());
assertEquals(expectedFailed, result.failedKeys);
assertEquals(expectedUnmapped, result.unmappedKeys);
Set<TopicPartition> actualRetriable = new HashSet<>(offsetTimestampsByPartition.keySet());
actualRetriable.removeAll(result.completedKeys.keySet());
actualRetriable.removeAll(result.failedKeys.keySet());
actualRetriable.removeAll(new HashSet<>(result.unmappedKeys));
assertEquals(expectedRetriable, actualRetriable);
}
private static long getOffset(TopicPartition topicPartition, Long offsetQuery) {
long base = 1 << 10;
if (offsetQuery == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
return topicPartition.hashCode() & (base - 1);
} else if (offsetQuery >= 0L) {
return base;
} else if (offsetQuery == ListOffsetsRequest.LATEST_TIMESTAMP) {
return base + 1 + (topicPartition.hashCode() & (base - 1));
}
return 2 * base + 1;
}
}

View File

@ -579,7 +579,9 @@ public class TopicAdminTest {
Set<TopicPartition> tps = Collections.singleton(tp1);
Long offset = null; // response should use error
Cluster cluster = createCluster(1, topicName, 1);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(
new MockTime(), cluster, AdminClientConfig.RETRIES_CONFIG, "0"
)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
env.kafkaClient().prepareResponse(listOffsetsResultWithTimeout(tp1, offset));

View File

@ -1,142 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.admin;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.AdminClientUnitTestEnv;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.internals.MetadataOperationContext;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
import org.apache.kafka.common.requests.MetadataResponse;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 15)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public class GetListOffsetsCallsBenchmark {
@Param({"1", "10"})
private int topicCount;
@Param({"100", "1000", "10000"})
private int partitionCount;
private KafkaAdminClient admin;
private MetadataOperationContext<ListOffsetsResult.ListOffsetsResultInfo, ListOffsetsOptions> context;
private final Map<TopicPartition, OffsetSpec> topicPartitionOffsets = new HashMap<>();
private final Map<TopicPartition, KafkaFutureImpl<ListOffsetsResult.ListOffsetsResultInfo>> futures = new HashMap<>();
private final int numNodes = 3;
@Setup(Level.Trial)
public void setup() {
MetadataResponseData data = new MetadataResponseData();
List<MetadataResponseTopic> mrTopicList = new ArrayList<>();
Set<String> topics = new HashSet<>();
for (int topicIndex = 0; topicIndex < topicCount; topicIndex++) {
Uuid topicId = Uuid.randomUuid();
String topicName = "topic-" + topicIndex;
MetadataResponseTopic mrTopic = new MetadataResponseTopic()
.setTopicId(topicId)
.setName(topicName)
.setErrorCode((short) 0)
.setIsInternal(false);
List<MetadataResponsePartition> mrPartitionList = new ArrayList<>();
for (int partition = 0; partition < partitionCount; partition++) {
TopicPartition tp = new TopicPartition(topicName, partition);
topics.add(tp.topic());
futures.put(tp, new KafkaFutureImpl<>());
topicPartitionOffsets.put(tp, OffsetSpec.latest());
MetadataResponsePartition mrPartition = new MetadataResponsePartition()
.setLeaderId(partition % numNodes)
.setPartitionIndex(partition)
.setIsrNodes(Arrays.asList(0, 1, 2))
.setReplicaNodes(Arrays.asList(0, 1, 2))
.setOfflineReplicas(Collections.emptyList())
.setErrorCode((short) 0);
mrPartitionList.add(mrPartition);
}
mrTopic.setPartitions(mrPartitionList);
mrTopicList.add(mrTopic);
}
data.setTopics(new MetadataResponseData.MetadataResponseTopicCollection(mrTopicList.listIterator()));
long deadline = 0L;
short version = 0;
context = new MetadataOperationContext<>(topics, new ListOffsetsOptions(), deadline, futures);
context.setResponse(Optional.of(new MetadataResponse(data, version)));
AdminClientUnitTestEnv adminEnv = new AdminClientUnitTestEnv(mockCluster());
admin = (KafkaAdminClient) adminEnv.adminClient();
}
@Benchmark
public Object testGetListOffsetsCalls() {
return AdminClientTestUtils.getListOffsetsCalls(admin, context, topicPartitionOffsets, futures);
}
private Cluster mockCluster() {
final int controllerIndex = 0;
HashMap<Integer, Node> nodes = new HashMap<>();
for (int i = 0; i < numNodes; i++)
nodes.put(i, new Node(i, "localhost", 8121 + i));
return new Cluster("mockClusterId", nodes.values(),
Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), nodes.get(controllerIndex));
}
}