KAFKA-9627: Replace ListOffset request/response with automated protocol (#8295)

Reviewers: Boyang Chen <reluctanthero104@gmail.com>, David Jacot <djacot@confluent.io>

Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
This commit is contained in:
Mickael Maison 2020-09-24 15:53:59 +02:00 committed by GitHub
parent b36e3c22ab
commit 785de1e3d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1123 additions and 912 deletions

View File

@ -127,6 +127,10 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
@ -210,7 +214,6 @@ import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData;
import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest;
import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
@ -250,7 +253,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@ -2048,7 +2050,7 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeConfigsResult(new HashMap<>(brokerFutures.entrySet().stream()
.flatMap(x -> x.getValue().entrySet().stream())
.collect(Collectors.toMap(Entry::getKey, Entry::getValue))));
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));
}
private Config describeConfigResult(DescribeConfigsResponseData.DescribeConfigsResult describeConfigsResult) {
@ -2314,7 +2316,7 @@ public class KafkaAdminClient extends AdminClient {
completeAllExceptionally(
futures.entrySet().stream()
.filter(entry -> entry.getKey().brokerId() == brokerId)
.map(Entry::getValue),
.map(Map.Entry::getValue),
throwable);
}
}, now);
@ -2479,7 +2481,7 @@ public class KafkaAdminClient extends AdminClient {
final CreatePartitionsOptions options) {
final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(newPartitions.size());
final CreatePartitionsTopicCollection topics = new CreatePartitionsTopicCollection(newPartitions.size());
for (Entry<String, NewPartitions> entry : newPartitions.entrySet()) {
for (Map.Entry<String, NewPartitions> entry : newPartitions.entrySet()) {
final String topic = entry.getKey();
final NewPartitions newPartition = entry.getValue();
List<List<Integer>> newAssignments = newPartition.assignments();
@ -3941,7 +3943,7 @@ public class KafkaAdminClient extends AdminClient {
MetadataResponse mr = context.response().orElseThrow(() -> new IllegalStateException("No Metadata response"));
List<Call> calls = new ArrayList<>();
// grouping topic partitions per leader
Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> leaders = new HashMap<>();
Map<Node, Map<String, ListOffsetTopic>> leaders = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetSpec> entry: topicPartitionOffsets.entrySet()) {
@ -3957,8 +3959,9 @@ public class KafkaAdminClient extends AdminClient {
if (!mr.errors().containsKey(tp.topic())) {
Node node = mr.cluster().leaderFor(tp);
if (node != null) {
Map<TopicPartition, ListOffsetRequest.PartitionData> leadersOnNode = leaders.computeIfAbsent(node, k -> new HashMap<>());
leadersOnNode.put(tp, new ListOffsetRequest.PartitionData(offsetQuery, Optional.empty()));
Map<String, ListOffsetTopic> leadersOnNode = leaders.computeIfAbsent(node, k -> new HashMap<String, ListOffsetTopic>());
ListOffsetTopic topic = leadersOnNode.computeIfAbsent(tp.topic(), k -> new ListOffsetTopic().setName(tp.topic()));
topic.partitions().add(new ListOffsetPartition().setPartitionIndex(tp.partition()).setTimestamp(offsetQuery));
} else {
future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception());
}
@ -3967,12 +3970,13 @@ public class KafkaAdminClient extends AdminClient {
}
}
for (final Map.Entry<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> entry: leaders.entrySet()) {
for (final Map.Entry<Node, Map<String, ListOffsetTopic>> entry : leaders.entrySet()) {
final int brokerId = entry.getKey().id();
final Map<TopicPartition, ListOffsetRequest.PartitionData> partitionsToQuery = entry.getValue();
calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
final List<ListOffsetTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
@Override
ListOffsetRequest.Builder createRequest(int timeoutMs) {
return ListOffsetRequest.Builder
@ -3985,25 +3989,38 @@ public class KafkaAdminClient extends AdminClient {
ListOffsetResponse response = (ListOffsetResponse) abstractResponse;
Map<TopicPartition, OffsetSpec> retryTopicPartitionOffsets = new HashMap<>();
for (Entry<TopicPartition, PartitionData> result : response.responseData().entrySet()) {
TopicPartition tp = result.getKey();
PartitionData partitionData = result.getValue();
KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
Errors error = partitionData.error;
OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp);
if (offsetRequestSpec == null) {
future.completeExceptionally(new KafkaException("Unexpected topic partition " + tp + " in broker response!"));
} else if (MetadataOperationContext.shouldRefreshMetadata(error)) {
retryTopicPartitionOffsets.put(tp, offsetRequestSpec);
} else if (error == Errors.NONE) {
future.complete(new ListOffsetsResultInfo(partitionData.offset, partitionData.timestamp, partitionData.leaderEpoch));
} else {
future.completeExceptionally(error.exception());
for (ListOffsetTopicResponse topic : response.topics()) {
for (ListOffsetPartitionResponse partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
Errors error = Errors.forCode(partition.errorCode());
OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp);
if (offsetRequestSpec == null) {
log.warn("Server response mentioned unknown topic partition {}", tp);
} else if (MetadataOperationContext.shouldRefreshMetadata(error)) {
retryTopicPartitionOffsets.put(tp, offsetRequestSpec);
} else if (error == Errors.NONE) {
Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetResponse.UNKNOWN_EPOCH)
? Optional.empty()
: Optional.of(partition.leaderEpoch());
future.complete(new ListOffsetsResultInfo(partition.offset(), partition.timestamp(), leaderEpoch));
} else {
future.completeExceptionally(error.exception());
}
}
}
if (!retryTopicPartitionOffsets.isEmpty()) {
if (retryTopicPartitionOffsets.isEmpty()) {
// The server should send back a response for every topic partition. But do a sanity check anyway.
for (ListOffsetTopic topic : partitionsToQuery) {
for (ListOffsetPartition partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
ApiException error = new ApiException("The response from broker " + brokerId +
" did not contain a result for topic partition " + tp);
futures.get(tp).completeExceptionally(error);
}
}
} else {
Set<String> retryTopics = retryTopicPartitionOffsets.keySet().stream().map(
TopicPartition::topic).collect(Collectors.toSet());
MetadataOperationContext<ListOffsetsResultInfo, ListOffsetsOptions> retryContext =
@ -4014,9 +4031,12 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleFailure(Throwable throwable) {
for (TopicPartition tp : entry.getValue().keySet()) {
KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
future.completeExceptionally(throwable);
for (ListOffsetTopic topic : entry.getValue().values()) {
for (ListOffsetPartition partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
future.completeExceptionally(throwable);
}
}
}
});

View File

@ -48,6 +48,9 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
@ -615,7 +618,7 @@ public class Fetcher<K, V> implements Closeable {
// The first condition ensures that the completedFetches is not stuck with the same completedFetch
// in cases such as the TopicAuthorizationException, and the second condition ensures that no
// potential data loss due to an exception in a following record.
FetchResponse.PartitionData partition = records.partitionData;
FetchResponse.PartitionData<Records> partition = records.partitionData;
if (fetched.isEmpty() && (partition.records() == null || partition.records().sizeInBytes() == 0)) {
completedFetches.poll();
}
@ -730,11 +733,11 @@ public class Fetcher<K, V> implements Closeable {
}
private void resetOffsetsAsync(Map<TopicPartition, Long> partitionResetTimestamps) {
Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> timestampsToSearchByNode =
Map<Node, Map<TopicPartition, ListOffsetPartition>> timestampsToSearchByNode =
groupListOffsetRequests(partitionResetTimestamps, new HashSet<>());
for (Map.Entry<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> entry : timestampsToSearchByNode.entrySet()) {
for (Map.Entry<Node, Map<TopicPartition, ListOffsetPartition>> entry : timestampsToSearchByNode.entrySet()) {
Node node = entry.getKey();
final Map<TopicPartition, ListOffsetRequest.PartitionData> resetTimestamps = entry.getValue();
final Map<TopicPartition, ListOffsetPartition> resetTimestamps = entry.getValue();
subscriptions.setNextAllowedRetry(resetTimestamps.keySet(), time.milliseconds() + requestTimeoutMs);
RequestFuture<ListOffsetResult> future = sendListOffsetRequest(node, resetTimestamps, false);
@ -749,8 +752,8 @@ public class Fetcher<K, V> implements Closeable {
for (Map.Entry<TopicPartition, ListOffsetData> fetchedOffset : result.fetchedOffsets.entrySet()) {
TopicPartition partition = fetchedOffset.getKey();
ListOffsetData offsetData = fetchedOffset.getValue();
ListOffsetRequest.PartitionData requestedReset = resetTimestamps.get(partition);
resetOffsetIfNeeded(partition, timestampToOffsetResetStrategy(requestedReset.timestamp), offsetData);
ListOffsetPartition requestedReset = resetTimestamps.get(partition);
resetOffsetIfNeeded(partition, timestampToOffsetResetStrategy(requestedReset.timestamp()), offsetData);
}
}
@ -881,7 +884,7 @@ public class Fetcher<K, V> implements Closeable {
private RequestFuture<ListOffsetResult> sendListOffsetsRequests(final Map<TopicPartition, Long> timestampsToSearch,
final boolean requireTimestamps) {
final Set<TopicPartition> partitionsToRetry = new HashSet<>();
Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> timestampsToSearchByNode =
Map<Node, Map<TopicPartition, ListOffsetPartition>> timestampsToSearchByNode =
groupListOffsetRequests(timestampsToSearch, partitionsToRetry);
if (timestampsToSearchByNode.isEmpty())
return RequestFuture.failure(new StaleMetadataException());
@ -890,7 +893,7 @@ public class Fetcher<K, V> implements Closeable {
final Map<TopicPartition, ListOffsetData> fetchedTimestampOffsets = new HashMap<>();
final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size());
for (Map.Entry<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> entry : timestampsToSearchByNode.entrySet()) {
for (Map.Entry<Node, Map<TopicPartition, ListOffsetPartition>> entry : timestampsToSearchByNode.entrySet()) {
RequestFuture<ListOffsetResult> future =
sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps);
future.addListener(new RequestFutureListener<ListOffsetResult>() {
@ -927,10 +930,10 @@ public class Fetcher<K, V> implements Closeable {
* @param partitionsToRetry A set of topic partitions that will be extended with partitions
* that need metadata update or re-connect to the leader.
*/
private Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> groupListOffsetRequests(
private Map<Node, Map<TopicPartition, ListOffsetPartition>> groupListOffsetRequests(
Map<TopicPartition, Long> timestampsToSearch,
Set<TopicPartition> partitionsToRetry) {
final Map<TopicPartition, ListOffsetRequest.PartitionData> partitionDataMap = new HashMap<>();
final Map<TopicPartition, ListOffsetPartition> partitionDataMap = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
TopicPartition tp = entry.getKey();
Long offset = entry.getValue();
@ -952,7 +955,11 @@ public class Fetcher<K, V> implements Closeable {
leader, tp);
partitionsToRetry.add(tp);
} else {
partitionDataMap.put(tp, new ListOffsetRequest.PartitionData(offset, leaderAndEpoch.epoch));
int currentLeaderEpoch = leaderAndEpoch.epoch.orElse(ListOffsetResponse.UNKNOWN_EPOCH);
partitionDataMap.put(tp, new ListOffsetPartition()
.setPartitionIndex(tp.partition())
.setTimestamp(offset)
.setCurrentLeaderEpoch(currentLeaderEpoch));
}
}
}
@ -968,11 +975,11 @@ public class Fetcher<K, V> implements Closeable {
* @return A response which can be polled to obtain the corresponding timestamps and offsets.
*/
private RequestFuture<ListOffsetResult> sendListOffsetRequest(final Node node,
final Map<TopicPartition, ListOffsetRequest.PartitionData> timestampsToSearch,
final Map<TopicPartition, ListOffsetPartition> timestampsToSearch,
boolean requireTimestamp) {
ListOffsetRequest.Builder builder = ListOffsetRequest.Builder
.forConsumer(requireTimestamp, isolationLevel)
.setTargetTimes(timestampsToSearch);
.setTargetTimes(ListOffsetRequest.toListOffsetTopics(timestampsToSearch));
log.debug("Sending ListOffsetRequest {} to broker {}", builder, node);
return client.send(node, builder)
@ -981,14 +988,13 @@ public class Fetcher<K, V> implements Closeable {
public void onSuccess(ClientResponse response, RequestFuture<ListOffsetResult> future) {
ListOffsetResponse lor = (ListOffsetResponse) response.responseBody();
log.trace("Received ListOffsetResponse {} from broker {}", lor, node);
handleListOffsetResponse(timestampsToSearch, lor, future);
handleListOffsetResponse(lor, future);
}
});
}
/**
* Callback for the response of the list offset call above.
* @param timestampsToSearch The mapping from partitions to target timestamps
* @param listOffsetResponse The response from the server.
* @param future The future to be completed when the response returns. Note that any partition-level errors will
* generally fail the entire future result. The one exception is UNSUPPORTED_FOR_MESSAGE_FORMAT,
@ -997,77 +1003,78 @@ public class Fetcher<K, V> implements Closeable {
* value of each partition may be null only for v0. In v1 and later the ListOffset API would not
* return a null timestamp (-1 is returned instead when necessary).
*/
private void handleListOffsetResponse(Map<TopicPartition, ListOffsetRequest.PartitionData> timestampsToSearch,
ListOffsetResponse listOffsetResponse,
private void handleListOffsetResponse(ListOffsetResponse listOffsetResponse,
RequestFuture<ListOffsetResult> future) {
Map<TopicPartition, ListOffsetData> fetchedOffsets = new HashMap<>();
Set<TopicPartition> partitionsToRetry = new HashSet<>();
Set<String> unauthorizedTopics = new HashSet<>();
for (Map.Entry<TopicPartition, ListOffsetRequest.PartitionData> entry : timestampsToSearch.entrySet()) {
TopicPartition topicPartition = entry.getKey();
ListOffsetResponse.PartitionData partitionData = listOffsetResponse.responseData().get(topicPartition);
Errors error = partitionData.error;
switch (error) {
case NONE:
if (partitionData.offsets != null) {
// Handle v0 response
long offset;
if (partitionData.offsets.size() > 1) {
future.raise(new IllegalStateException("Unexpected partitionData response of length " +
partitionData.offsets.size()));
return;
} else if (partitionData.offsets.isEmpty()) {
offset = ListOffsetResponse.UNKNOWN_OFFSET;
for (ListOffsetTopicResponse topic : listOffsetResponse.topics()) {
for (ListOffsetPartitionResponse partition : topic.partitions()) {
TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
Errors error = Errors.forCode(partition.errorCode());
switch (error) {
case NONE:
if (!partition.oldStyleOffsets().isEmpty()) {
// Handle v0 response with offsets
long offset;
if (partition.oldStyleOffsets().size() > 1) {
future.raise(new IllegalStateException("Unexpected partitionData response of length " +
partition.oldStyleOffsets().size()));
return;
} else {
offset = partition.oldStyleOffsets().get(0);
}
log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}",
topicPartition, offset);
if (offset != ListOffsetResponse.UNKNOWN_OFFSET) {
ListOffsetData offsetData = new ListOffsetData(offset, null, Optional.empty());
fetchedOffsets.put(topicPartition, offsetData);
}
} else {
offset = partitionData.offsets.get(0);
// Handle v1 and later response or v0 without offsets
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",
topicPartition, partition.offset(), partition.timestamp());
if (partition.offset() != ListOffsetResponse.UNKNOWN_OFFSET) {
Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetResponse.UNKNOWN_EPOCH)
? Optional.empty()
: Optional.of(partition.leaderEpoch());
ListOffsetData offsetData = new ListOffsetData(partition.offset(), partition.timestamp(),
leaderEpoch);
fetchedOffsets.put(topicPartition, offsetData);
}
}
log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}",
topicPartition, offset);
if (offset != ListOffsetResponse.UNKNOWN_OFFSET) {
ListOffsetData offsetData = new ListOffsetData(offset, null, Optional.empty());
fetchedOffsets.put(topicPartition, offsetData);
}
} else {
// Handle v1 and later response
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",
topicPartition, partitionData.offset, partitionData.timestamp);
if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
ListOffsetData offsetData = new ListOffsetData(partitionData.offset, partitionData.timestamp,
partitionData.leaderEpoch);
fetchedOffsets.put(topicPartition, offsetData);
}
}
break;
case UNSUPPORTED_FOR_MESSAGE_FORMAT:
// The message format on the broker side is before 0.10.0, which means it does not
// support timestamps. We treat this case the same as if we weren't able to find an
// offset corresponding to the requested timestamp and leave it out of the result.
log.debug("Cannot search by timestamp for partition {} because the message format version " +
"is before 0.10.0", topicPartition);
break;
case NOT_LEADER_OR_FOLLOWER:
case REPLICA_NOT_AVAILABLE:
case KAFKA_STORAGE_ERROR:
case OFFSET_NOT_AVAILABLE:
case LEADER_NOT_AVAILABLE:
case FENCED_LEADER_EPOCH:
case UNKNOWN_LEADER_EPOCH:
log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
topicPartition, error);
partitionsToRetry.add(topicPartition);
break;
case UNKNOWN_TOPIC_OR_PARTITION:
log.warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition);
partitionsToRetry.add(topicPartition);
break;
case TOPIC_AUTHORIZATION_FAILED:
unauthorizedTopics.add(topicPartition.topic());
break;
default:
log.warn("Attempt to fetch offsets for partition {} failed due to unexpected exception: {}, retrying.",
topicPartition, error.message());
partitionsToRetry.add(topicPartition);
break;
case UNSUPPORTED_FOR_MESSAGE_FORMAT:
// The message format on the broker side is before 0.10.0, which means it does not
// support timestamps. We treat this case the same as if we weren't able to find an
// offset corresponding to the requested timestamp and leave it out of the result.
log.debug("Cannot search by timestamp for partition {} because the message format version " +
"is before 0.10.0", topicPartition);
break;
case NOT_LEADER_OR_FOLLOWER:
case REPLICA_NOT_AVAILABLE:
case KAFKA_STORAGE_ERROR:
case OFFSET_NOT_AVAILABLE:
case LEADER_NOT_AVAILABLE:
case FENCED_LEADER_EPOCH:
case UNKNOWN_LEADER_EPOCH:
log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
topicPartition, error);
partitionsToRetry.add(topicPartition);
break;
case UNKNOWN_TOPIC_OR_PARTITION:
log.warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition);
partitionsToRetry.add(topicPartition);
break;
case TOPIC_AUTHORIZATION_FAILED:
unauthorizedTopics.add(topicPartition.topic());
break;
default:
log.warn("Attempt to fetch offsets for partition {} failed due to unexpected exception: {}, retrying.",
topicPartition, error.message());
partitionsToRetry.add(topicPartition);
}
}
}

View File

@ -95,6 +95,8 @@ import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListOffsetRequestData;
import org.apache.kafka.common.message.ListOffsetResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.MetadataRequestData;
@ -128,8 +130,6 @@ import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.requests.ProduceRequest;
@ -153,7 +153,7 @@ import static org.apache.kafka.common.protocol.types.Type.RECORDS;
public enum ApiKeys {
PRODUCE(0, "Produce", ProduceRequest.schemaVersions(), ProduceResponse.schemaVersions()),
FETCH(1, "Fetch", FetchRequestData.SCHEMAS, FetchResponseData.SCHEMAS),
LIST_OFFSETS(2, "ListOffsets", ListOffsetRequest.schemaVersions(), ListOffsetResponse.schemaVersions()),
LIST_OFFSETS(2, "ListOffsets", ListOffsetRequestData.SCHEMAS, ListOffsetResponseData.SCHEMAS),
METADATA(3, "Metadata", MetadataRequestData.SCHEMAS, MetadataResponseData.SCHEMAS),
LEADER_AND_ISR(4, "LeaderAndIsr", true, LeaderAndIsrRequestData.SCHEMAS, LeaderAndIsrResponseData.SCHEMAS),
STOP_REPLICA(5, "StopReplica", true, StopReplicaRequestData.SCHEMAS, StopReplicaResponseData.SCHEMAS),

View File

@ -92,7 +92,7 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
case FETCH:
return new FetchResponse<>(new FetchResponseData(struct, version));
case LIST_OFFSETS:
return new ListOffsetResponse(struct);
return new ListOffsetResponse(struct, version);
case METADATA:
return new MetadataResponse(struct, version);
case OFFSET_COMMIT:

View File

@ -16,15 +16,6 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@ -32,13 +23,19 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import static org.apache.kafka.common.protocol.CommonFields.CURRENT_LEADER_EPOCH;
import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ListOffsetRequestData;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic;
import org.apache.kafka.common.message.ListOffsetResponseData;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
public class ListOffsetRequest extends AbstractRequest {
public static final long EARLIEST_TIMESTAMP = -2L;
@ -47,96 +44,11 @@ public class ListOffsetRequest extends AbstractRequest {
public static final int CONSUMER_REPLICA_ID = -1;
public static final int DEBUGGING_REPLICA_ID = -2;
// top level fields
private static final Field.Int32 REPLICA_ID = new Field.Int32("replica_id",
"Broker id of the follower. For normal consumers, use -1.");
private static final Field.Int8 ISOLATION_LEVEL = new Field.Int8("isolation_level",
"This setting controls the visibility of transactional records. " +
"Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED " +
"(isolation_level = 1), non-transactional and COMMITTED transactional records are visible. " +
"To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current " +
"LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the " +
"result, which allows consumers to discard ABORTED transactional records");
private static final Field.ComplexArray TOPICS = new Field.ComplexArray("topics",
"Topics to list offsets.");
// topic level fields
private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partitions",
"Partitions to list offsets.");
// partition level fields
private static final Field.Int64 TIMESTAMP = new Field.Int64("timestamp",
"The target timestamp for the partition.");
private static final Field.Int32 MAX_NUM_OFFSETS = new Field.Int32("max_num_offsets",
"Maximum offsets to return.");
private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
PARTITION_ID,
TIMESTAMP,
MAX_NUM_OFFSETS);
private static final Field TOPICS_V0 = TOPICS.withFields(
TOPIC_NAME,
PARTITIONS_V0);
private static final Schema LIST_OFFSET_REQUEST_V0 = new Schema(
REPLICA_ID,
TOPICS_V0);
// V1 removes max_num_offsets
private static final Field PARTITIONS_V1 = PARTITIONS.withFields(
PARTITION_ID,
TIMESTAMP);
private static final Field TOPICS_V1 = TOPICS.withFields(
TOPIC_NAME,
PARTITIONS_V1);
private static final Schema LIST_OFFSET_REQUEST_V1 = new Schema(
REPLICA_ID,
TOPICS_V1);
// V2 adds a field for the isolation level
private static final Schema LIST_OFFSET_REQUEST_V2 = new Schema(
REPLICA_ID,
ISOLATION_LEVEL,
TOPICS_V1);
// V3 bump used to indicate that on quota violation brokers send out responses before throttling.
private static final Schema LIST_OFFSET_REQUEST_V3 = LIST_OFFSET_REQUEST_V2;
// V4 introduces the current leader epoch, which is used for fencing
private static final Field PARTITIONS_V4 = PARTITIONS.withFields(
PARTITION_ID,
CURRENT_LEADER_EPOCH,
TIMESTAMP);
private static final Field TOPICS_V4 = TOPICS.withFields(
TOPIC_NAME,
PARTITIONS_V4);
private static final Schema LIST_OFFSET_REQUEST_V4 = new Schema(
REPLICA_ID,
ISOLATION_LEVEL,
TOPICS_V4);
// V5 bump to include new possible error code (OFFSET_NOT_AVAILABLE)
private static final Schema LIST_OFFSET_REQUEST_V5 = LIST_OFFSET_REQUEST_V4;
public static Schema[] schemaVersions() {
return new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2,
LIST_OFFSET_REQUEST_V3, LIST_OFFSET_REQUEST_V4, LIST_OFFSET_REQUEST_V5};
}
private final int replicaId;
private final IsolationLevel isolationLevel;
private final Map<TopicPartition, PartitionData> partitionTimestamps;
private final ListOffsetRequestData data;
private final Set<TopicPartition> duplicatePartitions;
public static class Builder extends AbstractRequest.Builder<ListOffsetRequest> {
private final int replicaId;
private final IsolationLevel isolationLevel;
private Map<TopicPartition, PartitionData> partitionTimestamps = new HashMap<>();
private final ListOffsetRequestData data;
public static Builder forReplica(short allowedVersion, int replicaId) {
return new Builder((short) 0, allowedVersion, replicaId, IsolationLevel.READ_UNCOMMITTED);
@ -156,145 +68,95 @@ public class ListOffsetRequest extends AbstractRequest {
int replicaId,
IsolationLevel isolationLevel) {
super(ApiKeys.LIST_OFFSETS, oldestAllowedVersion, latestAllowedVersion);
this.replicaId = replicaId;
this.isolationLevel = isolationLevel;
data = new ListOffsetRequestData()
.setIsolationLevel(isolationLevel.id())
.setReplicaId(replicaId);
}
public Builder setTargetTimes(Map<TopicPartition, PartitionData> partitionTimestamps) {
this.partitionTimestamps = partitionTimestamps;
public Builder setTargetTimes(List<ListOffsetTopic> topics) {
data.setTopics(topics);
return this;
}
@Override
public ListOffsetRequest build(short version) {
return new ListOffsetRequest(replicaId, partitionTimestamps, isolationLevel, version);
return new ListOffsetRequest(version, data);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(type=ListOffsetRequest")
.append(", replicaId=").append(replicaId);
if (partitionTimestamps != null) {
bld.append(", partitionTimestamps=").append(partitionTimestamps);
}
bld.append(", isolationLevel=").append(isolationLevel);
bld.append(")");
return bld.toString();
}
}
public static final class PartitionData {
public final long timestamp;
public final int maxNumOffsets; // only supported in v0
public final Optional<Integer> currentLeaderEpoch;
private PartitionData(long timestamp, int maxNumOffsets, Optional<Integer> currentLeaderEpoch) {
this.timestamp = timestamp;
this.maxNumOffsets = maxNumOffsets;
this.currentLeaderEpoch = currentLeaderEpoch;
}
// For V0
public PartitionData(long timestamp, int maxNumOffsets) {
this(timestamp, maxNumOffsets, Optional.empty());
}
public PartitionData(long timestamp, Optional<Integer> currentLeaderEpoch) {
this(timestamp, 1, currentLeaderEpoch);
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof PartitionData)) return false;
PartitionData other = (PartitionData) obj;
return this.timestamp == other.timestamp &&
this.currentLeaderEpoch.equals(other.currentLeaderEpoch);
}
@Override
public int hashCode() {
return Objects.hash(timestamp, currentLeaderEpoch);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("{timestamp: ").append(timestamp).
append(", maxNumOffsets: ").append(maxNumOffsets).
append(", currentLeaderEpoch: ").append(currentLeaderEpoch).
append("}");
return bld.toString();
return data.toString();
}
}
/**
* Private constructor with a specified version.
*/
private ListOffsetRequest(int replicaId,
Map<TopicPartition, PartitionData> targetTimes,
IsolationLevel isolationLevel,
short version) {
private ListOffsetRequest(short version, ListOffsetRequestData data) {
super(ApiKeys.LIST_OFFSETS, version);
this.replicaId = replicaId;
this.isolationLevel = isolationLevel;
this.partitionTimestamps = targetTimes;
this.data = data;
this.duplicatePartitions = Collections.emptySet();
}
public ListOffsetRequest(Struct struct, short version) {
super(ApiKeys.LIST_OFFSETS, version);
Set<TopicPartition> duplicatePartitions = new HashSet<>();
replicaId = struct.get(REPLICA_ID);
isolationLevel = struct.hasField(ISOLATION_LEVEL) ?
IsolationLevel.forId(struct.get(ISOLATION_LEVEL)) :
IsolationLevel.READ_UNCOMMITTED;
partitionTimestamps = new HashMap<>();
for (Object topicResponseObj : struct.get(TOPICS)) {
Struct topicResponse = (Struct) topicResponseObj;
String topic = topicResponse.get(TOPIC_NAME);
for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) {
Struct partitionResponse = (Struct) partitionResponseObj;
int partition = partitionResponse.get(PARTITION_ID);
long timestamp = partitionResponse.get(TIMESTAMP);
TopicPartition tp = new TopicPartition(topic, partition);
int maxNumOffsets = partitionResponse.getOrElse(MAX_NUM_OFFSETS, 1);
Optional<Integer> currentLeaderEpoch = RequestUtils.getLeaderEpoch(partitionResponse, CURRENT_LEADER_EPOCH);
PartitionData partitionData = new PartitionData(timestamp, maxNumOffsets, currentLeaderEpoch);
if (partitionTimestamps.put(tp, partitionData) != null)
data = new ListOffsetRequestData(struct, version);
duplicatePartitions = new HashSet<>();
Set<TopicPartition> partitions = new HashSet<>();
for (ListOffsetTopic topic : data.topics()) {
for (ListOffsetPartition partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
if (!partitions.add(tp)) {
duplicatePartitions.add(tp);
}
}
}
this.duplicatePartitions = duplicatePartitions;
}
@Override
@SuppressWarnings("deprecation")
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
short versionId = version();
short errorCode = Errors.forException(e).code();
ListOffsetResponse.PartitionData partitionError = versionId == 0 ?
new ListOffsetResponse.PartitionData(Errors.forException(e), Collections.emptyList()) :
new ListOffsetResponse.PartitionData(Errors.forException(e), -1L, -1L, Optional.empty());
for (TopicPartition partition : partitionTimestamps.keySet()) {
responseData.put(partition, partitionError);
List<ListOffsetTopicResponse> responses = new ArrayList<>();
for (ListOffsetTopic topic : data.topics()) {
ListOffsetTopicResponse topicResponse = new ListOffsetTopicResponse().setName(topic.name());
List<ListOffsetPartitionResponse> partitions = new ArrayList<>();
for (ListOffsetPartition partition : topic.partitions()) {
ListOffsetPartitionResponse partitionResponse = new ListOffsetPartitionResponse()
.setErrorCode(errorCode)
.setPartitionIndex(partition.partitionIndex());
if (versionId == 0) {
partitionResponse.setOldStyleOffsets(Collections.emptyList());
} else {
partitionResponse.setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
.setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP);
}
partitions.add(partitionResponse);
}
topicResponse.setPartitions(partitions);
responses.add(topicResponse);
}
ListOffsetResponseData responseData = new ListOffsetResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setTopics(responses);
return new ListOffsetResponse(responseData);
}
return new ListOffsetResponse(throttleTimeMs, responseData);
public ListOffsetRequestData data() {
return data;
}
public int replicaId() {
return replicaId;
return data.replicaId();
}
public IsolationLevel isolationLevel() {
return isolationLevel;
return IsolationLevel.forId(data.isolationLevel());
}
public Map<TopicPartition, PartitionData> partitionTimestamps() {
return partitionTimestamps;
public List<ListOffsetTopic> topics() {
return data.topics();
}
public Set<TopicPartition> duplicatePartitions() {
@ -307,32 +169,25 @@ public class ListOffsetRequest extends AbstractRequest {
@Override
protected Struct toStruct() {
short version = version();
Struct struct = new Struct(ApiKeys.LIST_OFFSETS.requestSchema(version));
Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupPartitionDataByTopic(partitionTimestamps);
return data.toStruct(version());
}
struct.set(REPLICA_ID, replicaId);
struct.setIfExists(ISOLATION_LEVEL, isolationLevel.id());
List<Struct> topicArray = new ArrayList<>();
for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
Struct topicData = struct.instance(TOPICS);
topicData.set(TOPIC_NAME, topicEntry.getKey());
List<Struct> partitionArray = new ArrayList<>();
for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
PartitionData offsetPartitionData = partitionEntry.getValue();
Struct partitionData = topicData.instance(PARTITIONS);
partitionData.set(PARTITION_ID, partitionEntry.getKey());
partitionData.set(TIMESTAMP, offsetPartitionData.timestamp);
partitionData.setIfExists(MAX_NUM_OFFSETS, offsetPartitionData.maxNumOffsets);
RequestUtils.setLeaderEpochIfExists(partitionData, CURRENT_LEADER_EPOCH,
offsetPartitionData.currentLeaderEpoch);
partitionArray.add(partitionData);
}
topicData.set(PARTITIONS, partitionArray.toArray());
topicArray.add(topicData);
public static List<ListOffsetTopic> toListOffsetTopics(Map<TopicPartition, ListOffsetPartition> timestampsToSearch) {
Map<String, ListOffsetTopic> topics = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetPartition> entry : timestampsToSearch.entrySet()) {
TopicPartition tp = entry.getKey();
ListOffsetTopic topic = topics.computeIfAbsent(tp.topic(), k -> new ListOffsetTopic().setName(tp.topic()));
topic.partitions().add(entry.getValue());
}
struct.set(TOPICS, topicArray.toArray());
return struct;
return new ArrayList<>(topics.values());
}
public static ListOffsetTopic singletonRequestData(String topic, int partitionIndex, long timestamp, int maxNumOffsets) {
return new ListOffsetTopic()
.setName(topic)
.setPartitions(Collections.singletonList(new ListOffsetPartition()
.setPartitionIndex(partitionIndex)
.setTimestamp(timestamp)
.setMaxNumOffsets(maxNumOffsets)));
}
}

View File

@ -16,28 +16,20 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.LEADER_EPOCH;
import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
import static org.apache.kafka.common.protocol.types.Type.INT64;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ListOffsetResponseData;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.RecordBatch;
/**
* Possible error codes:
@ -58,243 +50,69 @@ import static org.apache.kafka.common.protocol.types.Type.INT64;
public class ListOffsetResponse extends AbstractResponse {
public static final long UNKNOWN_TIMESTAMP = -1L;
public static final long UNKNOWN_OFFSET = -1L;
public static final int UNKNOWN_EPOCH = RecordBatch.NO_PARTITION_LEADER_EPOCH;
// top level fields
private static final Field.ComplexArray TOPICS = new Field.ComplexArray("responses",
"The listed offsets by topic");
private final ListOffsetResponseData data;
// topic level fields
private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partition_responses",
"The listed offsets by partition");
// partition level fields
// This key is only used by ListOffsetResponse v0
@Deprecated
private static final Field.Array OFFSETS = new Field.Array("offsets", INT64, "A list of offsets.");
private static final Field.Int64 TIMESTAMP = new Field.Int64("timestamp",
"The timestamp associated with the returned offset");
private static final Field.Int64 OFFSET = new Field.Int64("offset",
"The offset found");
private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
PARTITION_ID,
ERROR_CODE,
OFFSETS);
private static final Field TOPICS_V0 = TOPICS.withFields(
TOPIC_NAME,
PARTITIONS_V0);
private static final Schema LIST_OFFSET_RESPONSE_V0 = new Schema(
TOPICS_V0);
// V1 bumped for the removal of the offsets array
private static final Field PARTITIONS_V1 = PARTITIONS.withFields(
PARTITION_ID,
ERROR_CODE,
TIMESTAMP,
OFFSET);
private static final Field TOPICS_V1 = TOPICS.withFields(
TOPIC_NAME,
PARTITIONS_V1);
private static final Schema LIST_OFFSET_RESPONSE_V1 = new Schema(
TOPICS_V1);
// V2 bumped for the addition of the throttle time
private static final Schema LIST_OFFSET_RESPONSE_V2 = new Schema(
THROTTLE_TIME_MS,
TOPICS_V1);
// V3 bumped to indicate that on quota violation brokers send out responses before throttling.
private static final Schema LIST_OFFSET_RESPONSE_V3 = LIST_OFFSET_RESPONSE_V2;
// V4 bumped for the addition of the current leader epoch in the request schema and the
// leader epoch in the response partition data
private static final Field PARTITIONS_V4 = PARTITIONS.withFields(
PARTITION_ID,
ERROR_CODE,
TIMESTAMP,
OFFSET,
LEADER_EPOCH);
private static final Field TOPICS_V4 = TOPICS.withFields(
TOPIC_NAME,
PARTITIONS_V4);
private static final Schema LIST_OFFSET_RESPONSE_V4 = new Schema(
THROTTLE_TIME_MS,
TOPICS_V4);
private static final Schema LIST_OFFSET_RESPONSE_V5 = LIST_OFFSET_RESPONSE_V4;
public static Schema[] schemaVersions() {
return new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2,
LIST_OFFSET_RESPONSE_V3, LIST_OFFSET_RESPONSE_V4, LIST_OFFSET_RESPONSE_V5};
public ListOffsetResponse(ListOffsetResponseData data) {
this.data = data;
}
public static final class PartitionData {
public final Errors error;
// The offsets list is only used in ListOffsetResponse v0.
public final List<Long> offsets;
public final Long timestamp;
public final Long offset;
public final Optional<Integer> leaderEpoch;
/**
* Constructor for ListOffsetResponse v0
*/
public PartitionData(Errors error, List<Long> offsets) {
this.error = error;
this.offsets = offsets;
this.timestamp = null;
this.offset = null;
this.leaderEpoch = Optional.empty();
}
/**
* Constructor for ListOffsetResponse v1
*/
public PartitionData(Errors error, long timestamp, long offset, Optional<Integer> leaderEpoch) {
this.error = error;
this.timestamp = timestamp;
this.offset = offset;
this.offsets = null;
this.leaderEpoch = leaderEpoch;
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("PartitionData(").
append("errorCode: ").append(error.code());
if (offsets == null) {
bld.append(", timestamp: ").append(timestamp).
append(", offset: ").append(offset).
append(", leaderEpoch: ").append(leaderEpoch);
} else {
bld.append(", offsets: ").
append("[").
append(Utils.join(this.offsets, ",")).
append("]");
}
bld.append(")");
return bld.toString();
}
}
private final int throttleTimeMs;
private final Map<TopicPartition, PartitionData> responseData;
/**
* Constructor for all versions without throttle time
*/
public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) {
this(DEFAULT_THROTTLE_TIME, responseData);
}
public ListOffsetResponse(int throttleTimeMs, Map<TopicPartition, PartitionData> responseData) {
this.throttleTimeMs = throttleTimeMs;
this.responseData = responseData;
}
public ListOffsetResponse(Struct struct) {
this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
responseData = new HashMap<>();
for (Object topicResponseObj : struct.get(TOPICS)) {
Struct topicResponse = (Struct) topicResponseObj;
String topic = topicResponse.get(TOPIC_NAME);
for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) {
Struct partitionResponse = (Struct) partitionResponseObj;
int partition = partitionResponse.get(PARTITION_ID);
Errors error = Errors.forCode(partitionResponse.get(ERROR_CODE));
PartitionData partitionData;
if (partitionResponse.hasField(OFFSETS)) {
Object[] offsets = partitionResponse.get(OFFSETS);
List<Long> offsetsList = new ArrayList<>();
for (Object offset : offsets)
offsetsList.add((Long) offset);
partitionData = new PartitionData(error, offsetsList);
} else {
long timestamp = partitionResponse.get(TIMESTAMP);
long offset = partitionResponse.get(OFFSET);
Optional<Integer> leaderEpoch = RequestUtils.getLeaderEpoch(partitionResponse, LEADER_EPOCH);
partitionData = new PartitionData(error, timestamp, offset, leaderEpoch);
}
responseData.put(new TopicPartition(topic, partition), partitionData);
}
}
public ListOffsetResponse(Struct struct, short version) {
data = new ListOffsetResponseData(struct, version);
}
@Override
public int throttleTimeMs() {
return throttleTimeMs;
return data.throttleTimeMs();
}
public Map<TopicPartition, PartitionData> responseData() {
return responseData;
public ListOffsetResponseData data() {
return data;
}
public List<ListOffsetTopicResponse> topics() {
return data.topics();
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errorCounts = new HashMap<>();
responseData.values().forEach(response ->
updateErrorCounts(errorCounts, response.error)
topics().forEach(topic ->
topic.partitions().forEach(partition ->
updateErrorCounts(errorCounts, Errors.forCode(partition.errorCode()))
)
);
return errorCounts;
}
public static ListOffsetResponse parse(ByteBuffer buffer, short version) {
return new ListOffsetResponse(ApiKeys.LIST_OFFSETS.parseResponse(version, buffer));
return new ListOffsetResponse(ApiKeys.LIST_OFFSETS.parseResponse(version, buffer), version);
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.LIST_OFFSETS.responseSchema(version));
struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupPartitionDataByTopic(responseData);
List<Struct> topicArray = new ArrayList<>();
for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
Struct topicData = struct.instance(TOPICS);
topicData.set(TOPIC_NAME, topicEntry.getKey());
List<Struct> partitionArray = new ArrayList<>();
for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
PartitionData offsetPartitionData = partitionEntry.getValue();
Struct partitionData = topicData.instance(PARTITIONS);
partitionData.set(PARTITION_ID, partitionEntry.getKey());
partitionData.set(ERROR_CODE, offsetPartitionData.error.code());
if (version == 0) {
partitionData.set(OFFSETS, offsetPartitionData.offsets.toArray());
} else {
partitionData.set(TIMESTAMP, offsetPartitionData.timestamp);
partitionData.set(OFFSET, offsetPartitionData.offset);
RequestUtils.setLeaderEpochIfExists(partitionData, LEADER_EPOCH, offsetPartitionData.leaderEpoch);
}
partitionArray.add(partitionData);
}
topicData.set(PARTITIONS, partitionArray.toArray());
topicArray.add(topicData);
}
struct.set(TOPICS, topicArray.toArray());
return struct;
return data.toStruct(version);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(type=ListOffsetResponse")
.append(", throttleTimeMs=").append(throttleTimeMs)
.append(", responseData=").append(responseData)
.append(")");
return bld.toString();
return data.toString();
}
@Override
public boolean shouldClientThrottle(short version) {
return version >= 3;
}
public static ListOffsetTopicResponse singletonListOffsetTopicResponse(TopicPartition tp, Errors error, long timestamp, long offset, int epoch) {
return new ListOffsetTopicResponse()
.setName(tp.topic())
.setPartitions(Collections.singletonList(new ListOffsetPartitionResponse()
.setPartitionIndex(tp.partition())
.setErrorCode(error.code())
.setTimestamp(timestamp)
.setOffset(offset)
.setLeaderEpoch(epoch)));
}
}

View File

@ -42,7 +42,7 @@
"about": "Each partition in the request.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+",
{ "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", "default": "-1",
"about": "The current leader epoch." },
{ "name": "Timestamp", "type": "int64", "versions": "0+",
"about": "The current timestamp." },

View File

@ -48,7 +48,7 @@
"about": "The timestamp associated with the returned offset." },
{ "name": "Offset", "type": "int64", "versions": "1+", "default": "-1", "ignorable": false,
"about": "The returned offset." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "4+" }
{ "name": "LeaderEpoch", "type": "int32", "versions": "4+", "default": "-1" }
]}
]}
]

View File

@ -101,6 +101,8 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListOffsetResponseData;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
@ -144,7 +146,6 @@ import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData;
import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
@ -3737,41 +3738,43 @@ public class KafkaAdminClientTest {
Collections.<String>emptySet(),
node0);
final TopicPartition tp1 = new TopicPartition("foo", 0);
final TopicPartition tp2 = new TopicPartition("bar", 0);
final TopicPartition tp3 = new TopicPartition("baz", 0);
final TopicPartition tp0 = new TopicPartition("foo", 0);
final TopicPartition tp1 = new TopicPartition("bar", 0);
final TopicPartition tp2 = new TopicPartition("baz", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
Map<TopicPartition, PartitionData> responseData = new HashMap<>();
responseData.put(tp1, new PartitionData(Errors.NONE, -1L, 123L, Optional.of(321)));
responseData.put(tp2, new PartitionData(Errors.NONE, -1L, 234L, Optional.of(432)));
responseData.put(tp3, new PartitionData(Errors.NONE, 123456789L, 345L, Optional.of(543)));
ListOffsetTopicResponse t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.NONE, -1L, 123L, 321);
ListOffsetTopicResponse t1 = ListOffsetResponse.singletonListOffsetTopicResponse(tp1, Errors.NONE, -1L, 234L, 432);
ListOffsetTopicResponse t2 = ListOffsetResponse.singletonListOffsetTopicResponse(tp2, Errors.NONE, 123456789L, 345L, 543);
ListOffsetResponseData responseData = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(t0, t1, t2));
env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
partitions.put(tp1, OffsetSpec.latest());
partitions.put(tp2, OffsetSpec.earliest());
partitions.put(tp3, OffsetSpec.forTimestamp(System.currentTimeMillis()));
partitions.put(tp0, OffsetSpec.latest());
partitions.put(tp1, OffsetSpec.earliest());
partitions.put(tp2, OffsetSpec.forTimestamp(System.currentTimeMillis()));
ListOffsetsResult result = env.adminClient().listOffsets(partitions);
Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get();
assertFalse(offsets.isEmpty());
assertEquals(123L, offsets.get(tp1).offset());
assertEquals(321, offsets.get(tp1).leaderEpoch().get().intValue());
assertEquals(123L, offsets.get(tp0).offset());
assertEquals(321, offsets.get(tp0).leaderEpoch().get().intValue());
assertEquals(-1L, offsets.get(tp0).timestamp());
assertEquals(234L, offsets.get(tp1).offset());
assertEquals(432, offsets.get(tp1).leaderEpoch().get().intValue());
assertEquals(-1L, offsets.get(tp1).timestamp());
assertEquals(234L, offsets.get(tp2).offset());
assertEquals(432, offsets.get(tp2).leaderEpoch().get().intValue());
assertEquals(-1L, offsets.get(tp2).timestamp());
assertEquals(345L, offsets.get(tp3).offset());
assertEquals(543, offsets.get(tp3).leaderEpoch().get().intValue());
assertEquals(123456789L, offsets.get(tp3).timestamp());
assertEquals(345L, offsets.get(tp2).offset());
assertEquals(543, offsets.get(tp2).leaderEpoch().get().intValue());
assertEquals(123456789L, offsets.get(tp2).timestamp());
assertEquals(offsets.get(tp0), result.partitionResult(tp0).get());
assertEquals(offsets.get(tp1), result.partitionResult(tp1).get());
assertEquals(offsets.get(tp2), result.partitionResult(tp2).get());
assertEquals(offsets.get(tp3), result.partitionResult(tp3).get());
try {
result.partitionResult(new TopicPartition("unknown", 0)).get();
fail("should have thrown IllegalArgumentException");
@ -3798,48 +3801,53 @@ public class KafkaAdminClientTest {
Collections.<String>emptySet(),
node0);
final TopicPartition tp1 = new TopicPartition("foo", 0);
final TopicPartition tp2 = new TopicPartition("foo", 1);
final TopicPartition tp3 = new TopicPartition("bar", 0);
final TopicPartition tp0 = new TopicPartition("foo", 0);
final TopicPartition tp1 = new TopicPartition("foo", 1);
final TopicPartition tp2 = new TopicPartition("bar", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
// listoffsets response from broker 0
Map<TopicPartition, PartitionData> responseData = new HashMap<>();
responseData.put(tp1, new PartitionData(Errors.LEADER_NOT_AVAILABLE, -1L, 123L, Optional.of(321)));
responseData.put(tp3, new PartitionData(Errors.NONE, -1L, 987L, Optional.of(789)));
env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
ListOffsetTopicResponse t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.LEADER_NOT_AVAILABLE, -1L, 123L, 321);
ListOffsetTopicResponse t1 = ListOffsetResponse.singletonListOffsetTopicResponse(tp1, Errors.NONE, -1L, 987L, 789);
ListOffsetResponseData responseData = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(t0, t1));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0);
// listoffsets response from broker 1
responseData = new HashMap<>();
responseData.put(tp2, new PartitionData(Errors.NONE, -1L, 456L, Optional.of(654)));
env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
ListOffsetTopicResponse t2 = ListOffsetResponse.singletonListOffsetTopicResponse(tp2, Errors.NONE, -1L, 456L, 654);
responseData = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(t2));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node1);
// metadata refresh because of LEADER_NOT_AVAILABLE
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
// listoffsets response from broker 0
responseData = new HashMap<>();
responseData.put(tp1, new PartitionData(Errors.NONE, -1L, 345L, Optional.of(543)));
env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.NONE, -1L, 345L, 543);
responseData = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(t0));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0);
Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
partitions.put(tp0, OffsetSpec.latest());
partitions.put(tp1, OffsetSpec.latest());
partitions.put(tp2, OffsetSpec.latest());
partitions.put(tp3, OffsetSpec.latest());
ListOffsetsResult result = env.adminClient().listOffsets(partitions);
Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get();
assertFalse(offsets.isEmpty());
assertEquals(345L, offsets.get(tp1).offset());
assertEquals(543, offsets.get(tp1).leaderEpoch().get().intValue());
assertEquals(345L, offsets.get(tp0).offset());
assertEquals(543, offsets.get(tp0).leaderEpoch().get().intValue());
assertEquals(-1L, offsets.get(tp0).timestamp());
assertEquals(987L, offsets.get(tp1).offset());
assertEquals(789, offsets.get(tp1).leaderEpoch().get().intValue());
assertEquals(-1L, offsets.get(tp1).timestamp());
assertEquals(456, offsets.get(tp2).offset());
assertEquals(456L, offsets.get(tp2).offset());
assertEquals(654, offsets.get(tp2).leaderEpoch().get().intValue());
assertEquals(-1L, offsets.get(tp2).timestamp());
assertEquals(987, offsets.get(tp3).offset());
assertEquals(789, offsets.get(tp3).leaderEpoch().get().intValue());
assertEquals(-1L, offsets.get(tp3).timestamp());
}
}
@ -3860,19 +3868,21 @@ public class KafkaAdminClientTest {
Collections.<String>emptySet(),
node0);
final TopicPartition tp1 = new TopicPartition("foo", 0);
final TopicPartition tp0 = new TopicPartition("foo", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
Map<TopicPartition, PartitionData> responseData = new HashMap<>();
responseData.put(tp1, new PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, -1L, -1, Optional.empty()));
ListOffsetTopicResponse t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.TOPIC_AUTHORIZATION_FAILED, -1L, -1L, -1);
ListOffsetResponseData responseData = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(t0));
env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
partitions.put(tp1, OffsetSpec.latest());
partitions.put(tp0, OffsetSpec.latest());
ListOffsetsResult result = env.adminClient().listOffsets(partitions);
TestUtils.assertFutureError(result.all(), TopicAuthorizationException.class);
@ -3908,13 +3918,17 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
// listoffsets response from broker 0
Map<TopicPartition, PartitionData> responseData = new HashMap<>();
responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 345L, Optional.of(543)));
env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
ListOffsetTopicResponse t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.NONE, -1L, 345L, 543);
ListOffsetResponseData responseData = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(t0));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0);
// listoffsets response from broker 1
responseData = new HashMap<>();
responseData.put(tp1, new PartitionData(Errors.NONE, -1L, 789L, Optional.of(987)));
env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
ListOffsetTopicResponse t1 = ListOffsetResponse.singletonListOffsetTopicResponse(tp1, Errors.NONE, -1L, 789L, 987);
responseData = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(t1));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node1);
Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
partitions.put(tp0, OffsetSpec.latest());
@ -3955,9 +3969,11 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareMetadataResponse(oldCluster, Errors.NONE));
Map<TopicPartition, PartitionData> responseData = new HashMap<>();
responseData.put(tp0, new PartitionData(Errors.NOT_LEADER_OR_FOLLOWER, -1L, 345L, Optional.of(543)));
responseData.put(tp1, new PartitionData(Errors.LEADER_NOT_AVAILABLE, -2L, 123L, Optional.of(456)));
ListOffsetTopicResponse t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1L, 345L, 543);
ListOffsetTopicResponse t1 = ListOffsetResponse.singletonListOffsetTopicResponse(tp1, Errors.LEADER_NOT_AVAILABLE, -2L, 123L, 456);
ListOffsetResponseData responseData = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(t0, t1));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0);
final PartitionInfo newPInfo1 = new PartitionInfo("foo", 0, node1,
@ -3971,12 +3987,16 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareMetadataResponse(newCluster, Errors.NONE));
responseData = new HashMap<>();
responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 345L, Optional.of(543)));
t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.NONE, -1L, 345L, 543);
responseData = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(t0));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node1);
responseData = new HashMap<>();
responseData.put(tp1, new PartitionData(Errors.NONE, -2L, 123L, Optional.of(456)));
t1 = ListOffsetResponse.singletonListOffsetTopicResponse(tp1, Errors.NONE, -2L, 123L, 456);
responseData = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(t1));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node2);
Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
@ -4013,8 +4033,10 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareMetadataResponse(oldCluster, Errors.NONE));
Map<TopicPartition, PartitionData> responseData = new HashMap<>();
responseData.put(tp0, new PartitionData(Errors.NOT_LEADER_OR_FOLLOWER, -1L, 345L, Optional.of(543)));
ListOffsetTopicResponse t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1L, 345L, 543);
ListOffsetResponseData responseData = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(t0));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0);
// updating leader from node0 to node1 and metadata refresh because of NOT_LEADER_OR_FOLLOWER
@ -4025,8 +4047,10 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareMetadataResponse(newCluster, Errors.NONE));
responseData = new HashMap<>();
responseData.put(tp0, new PartitionData(Errors.NONE, -2L, 123L, Optional.of(456)));
t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.NONE, -2L, 123L, 456);
responseData = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(t0));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node1);
Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
@ -4073,6 +4097,47 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testListOffsetsPartialResponse() throws Exception {
Node node0 = new Node(0, "localhost", 8120);
Node node1 = new Node(1, "localhost", 8121);
List<Node> nodes = Arrays.asList(node0, node1);
List<PartitionInfo> pInfos = new ArrayList<>();
pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0, node1}, new Node[]{node0, node1}));
pInfos.add(new PartitionInfo("foo", 1, node0, new Node[]{node0, node1}, new Node[]{node0, node1}));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes,
pInfos,
Collections.<String>emptySet(),
Collections.<String>emptySet(),
node0);
final TopicPartition tp0 = new TopicPartition("foo", 0);
final TopicPartition tp1 = new TopicPartition("foo", 1);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
ListOffsetTopicResponse t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.NONE, -2L, 123L, 456);
ListOffsetResponseData data = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(t0));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(data), node0);
Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
partitions.put(tp0, OffsetSpec.latest());
partitions.put(tp1, OffsetSpec.latest());
ListOffsetsResult result = env.adminClient().listOffsets(partitions);
assertNotNull(result.partitionResult(tp0).get());
TestUtils.assertFutureThrows(result.partitionResult(tp1), ApiException.class);
TestUtils.assertFutureThrows(result.all(), ApiException.class);
}
}
@Test
public void testGetSubLevelError() {
List<MemberIdentity> memberIdentities = Arrays.asList(

View File

@ -48,8 +48,12 @@ import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListOffsetResponseData;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.Selectable;
@ -118,6 +122,8 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
@ -588,14 +594,22 @@ public class KafkaConsumerTest {
consumer.seekToEnd(singleton(tp0));
consumer.seekToBeginning(singleton(tp1));
client.prepareResponse(
body -> {
ListOffsetRequest request = (ListOffsetRequest) body;
Map<TopicPartition, ListOffsetRequest.PartitionData> timestamps = request.partitionTimestamps();
return timestamps.get(tp0).timestamp == ListOffsetRequest.LATEST_TIMESTAMP &&
timestamps.get(tp1).timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP;
}, listOffsetsResponse(Collections.singletonMap(tp0, 50L),
Collections.singletonMap(tp1, Errors.NOT_LEADER_OR_FOLLOWER)));
client.prepareResponse(body -> {
ListOffsetRequest request = (ListOffsetRequest) body;
List<ListOffsetPartition> partitions = request.topics().stream().flatMap(t -> {
if (t.name().equals(topic))
return Stream.of(t.partitions());
else
return Stream.empty();
}).flatMap(List::stream).collect(Collectors.toList());
ListOffsetPartition expectedTp0 = new ListOffsetPartition()
.setPartitionIndex(tp0.partition())
.setTimestamp(ListOffsetRequest.LATEST_TIMESTAMP);
ListOffsetPartition expectedTp1 = new ListOffsetPartition()
.setPartitionIndex(tp1.partition())
.setTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP);
return partitions.contains(expectedTp0) && partitions.contains(expectedTp1);
}, listOffsetsResponse(Collections.singletonMap(tp0, 50L), Collections.singletonMap(tp1, Errors.NOT_LEADER_OR_FOLLOWER)));
client.prepareResponse(
body -> {
FetchRequest request = (FetchRequest) body;
@ -1568,12 +1582,12 @@ public class KafkaConsumerTest {
public void testMetricConfigRecordingLevel() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
try (KafkaConsumer consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
assertEquals(Sensor.RecordingLevel.INFO, consumer.metrics.config().recordLevel());
}
props.put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
try (KafkaConsumer consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
assertEquals(Sensor.RecordingLevel.DEBUG, consumer.metrics.config().recordLevel());
}
}
@ -2150,20 +2164,29 @@ public class KafkaConsumerTest {
private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> partitionOffsets,
Map<TopicPartition, Errors> partitionErrors) {
Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
Map<String, ListOffsetTopicResponse> responses = new HashMap<>();
for (Map.Entry<TopicPartition, Long> partitionOffset : partitionOffsets.entrySet()) {
partitionData.put(partitionOffset.getKey(), new ListOffsetResponse.PartitionData(Errors.NONE,
ListOffsetResponse.UNKNOWN_TIMESTAMP, partitionOffset.getValue(),
Optional.empty()));
TopicPartition tp = partitionOffset.getKey();
ListOffsetTopicResponse topic = responses.computeIfAbsent(tp.topic(), k -> new ListOffsetTopicResponse().setName(tp.topic()));
topic.partitions().add(new ListOffsetPartitionResponse()
.setPartitionIndex(tp.partition())
.setErrorCode(Errors.NONE.code())
.setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
.setOffset(partitionOffset.getValue()));
}
for (Map.Entry<TopicPartition, Errors> partitionError : partitionErrors.entrySet()) {
partitionData.put(partitionError.getKey(), new ListOffsetResponse.PartitionData(
partitionError.getValue(), ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty()));
TopicPartition tp = partitionError.getKey();
ListOffsetTopicResponse topic = responses.computeIfAbsent(tp.topic(), k -> new ListOffsetTopicResponse().setName(tp.topic()));
topic.partitions().add(new ListOffsetPartitionResponse()
.setPartitionIndex(tp.partition())
.setErrorCode(partitionError.getValue().code())
.setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
.setOffset(ListOffsetResponse.UNKNOWN_OFFSET));
}
return new ListOffsetResponse(partitionData);
ListOffsetResponseData data = new ListOffsetResponseData()
.setTopics(new ArrayList<>(responses.values()));
return new ListOffsetResponse(data);
}
private FetchResponse<MemoryRecords> fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
@ -2189,7 +2212,7 @@ public class KafkaConsumerTest {
return new FetchResponse<>(Errors.NONE, tpResponses, 0, INVALID_SESSION_ID);
}
private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, int count) {
private FetchResponse<MemoryRecords> fetchResponse(TopicPartition partition, long fetchOffset, int count) {
FetchInfo fetchInfo = new FetchInfo(fetchOffset, count);
return fetchResponse(Collections.singletonMap(partition, fetchInfo));
}
@ -2450,7 +2473,7 @@ public class KafkaConsumerTest {
assertEquals((1.0d + 0.0d + 0.5d) / 3, consumer.metrics().get(pollIdleRatio).metricValue());
}
private static boolean consumerMetricPresent(KafkaConsumer consumer, String name) {
private static boolean consumerMetricPresent(KafkaConsumer<String, String> consumer, String name) {
MetricName metricName = new MetricName(name, "consumer-metrics", "", Collections.emptyMap());
return consumer.metrics.metrics().containsKey(metricName);
}

View File

@ -48,6 +48,11 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition;
import org.apache.kafka.common.message.ListOffsetResponseData;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@ -1435,7 +1440,7 @@ public class FetcherTest {
subscriptions.requestOffsetReset(tp0);
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP,
Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NONE, 1L, 5L));
validLeaderEpoch), listOffsetResponse(Errors.NONE, 1L, 5L));
fetcher.resetOffsetsIfNeeded();
consumerClient.pollNoWakeup();
assertFalse(subscriptions.isOffsetResetNeeded(tp0));
@ -1471,7 +1476,7 @@ public class FetcherTest {
// Fail with OFFSET_NOT_AVAILABLE
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP,
Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.OFFSET_NOT_AVAILABLE, 1L, 5L), false);
validLeaderEpoch), listOffsetResponse(Errors.OFFSET_NOT_AVAILABLE, 1L, 5L), false);
fetcher.resetOffsetsIfNeeded();
consumerClient.pollNoWakeup();
assertFalse(subscriptions.hasValidPosition(tp0));
@ -1481,7 +1486,7 @@ public class FetcherTest {
// Fail with LEADER_NOT_AVAILABLE
time.sleep(retryBackoffMs);
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP,
Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.LEADER_NOT_AVAILABLE, 1L, 5L), false);
validLeaderEpoch), listOffsetResponse(Errors.LEADER_NOT_AVAILABLE, 1L, 5L), false);
fetcher.resetOffsetsIfNeeded();
consumerClient.pollNoWakeup();
assertFalse(subscriptions.hasValidPosition(tp0));
@ -1563,7 +1568,7 @@ public class FetcherTest {
subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP,
Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NONE, 1L, 5L));
validLeaderEpoch), listOffsetResponse(Errors.NONE, 1L, 5L));
fetcher.resetOffsetsIfNeeded();
consumerClient.pollNoWakeup();
@ -1580,7 +1585,7 @@ public class FetcherTest {
// First fetch fails with stale metadata
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP,
Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NOT_LEADER_OR_FOLLOWER, 1L, 5L), false);
validLeaderEpoch), listOffsetResponse(Errors.NOT_LEADER_OR_FOLLOWER, 1L, 5L), false);
fetcher.resetOffsetsIfNeeded();
consumerClient.pollNoWakeup();
assertFalse(subscriptions.hasValidPosition(tp0));
@ -1657,7 +1662,7 @@ public class FetcherTest {
// First request gets a disconnect
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP,
Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NONE, 1L, 5L), true);
validLeaderEpoch), listOffsetResponse(Errors.NONE, 1L, 5L), true);
fetcher.resetOffsetsIfNeeded();
consumerClient.pollNoWakeup();
assertFalse(subscriptions.hasValidPosition(tp0));
@ -1835,7 +1840,7 @@ public class FetcherTest {
// First request gets a disconnect
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP,
Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.TOPIC_AUTHORIZATION_FAILED, -1, -1), false);
validLeaderEpoch), listOffsetResponse(Errors.TOPIC_AUTHORIZATION_FAILED, -1, -1), false);
fetcher.resetOffsetsIfNeeded();
consumerClient.pollNoWakeup();
assertFalse(subscriptions.hasValidPosition(tp0));
@ -1873,7 +1878,7 @@ public class FetcherTest {
subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP,
Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NONE, 1L, 10L));
validLeaderEpoch), listOffsetResponse(Errors.NONE, 1L, 10L));
fetcher.resetOffsetsIfNeeded();
consumerClient.pollNoWakeup();
@ -2496,36 +2501,65 @@ public class FetcherTest {
client.updateMetadata(initialUpdateResponse);
final long fetchTimestamp = 10L;
Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>();
allPartitionData.put(tp0, new ListOffsetResponse.PartitionData(
Errors.NONE, fetchTimestamp, 4L, Optional.empty()));
allPartitionData.put(tp1, new ListOffsetResponse.PartitionData(
retriableError, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty()));
ListOffsetPartitionResponse tp0NoError = new ListOffsetPartitionResponse()
.setPartitionIndex(tp0.partition())
.setErrorCode(Errors.NONE.code())
.setTimestamp(fetchTimestamp)
.setOffset(4L);
List<ListOffsetTopicResponse> topics = Collections.singletonList(
new ListOffsetTopicResponse()
.setName(tp0.topic())
.setPartitions(Arrays.asList(
tp0NoError,
new ListOffsetPartitionResponse()
.setPartitionIndex(tp1.partition())
.setErrorCode(retriableError.code())
.setTimestamp(ListOffsetRequest.LATEST_TIMESTAMP)
.setOffset(-1L))));
ListOffsetResponseData data = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(topics);
client.prepareResponseFrom(body -> {
boolean isListOffsetRequest = body instanceof ListOffsetRequest;
if (isListOffsetRequest) {
ListOffsetRequest request = (ListOffsetRequest) body;
Map<TopicPartition, ListOffsetRequest.PartitionData> expectedTopicPartitions = new HashMap<>();
expectedTopicPartitions.put(tp0, new ListOffsetRequest.PartitionData(
fetchTimestamp, Optional.empty()));
expectedTopicPartitions.put(tp1, new ListOffsetRequest.PartitionData(
fetchTimestamp, Optional.empty()));
return request.partitionTimestamps().equals(expectedTopicPartitions);
List<ListOffsetTopic> expectedTopics = Collections.singletonList(
new ListOffsetTopic()
.setName(tp0.topic())
.setPartitions(Arrays.asList(
new ListOffsetPartition()
.setPartitionIndex(tp1.partition())
.setTimestamp(fetchTimestamp)
.setCurrentLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH),
new ListOffsetPartition()
.setPartitionIndex(tp0.partition())
.setTimestamp(fetchTimestamp)
.setCurrentLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH))));
return request.topics().equals(expectedTopics);
} else {
return false;
}
}, new ListOffsetResponse(allPartitionData), originalLeader);
}, new ListOffsetResponse(data), originalLeader);
client.prepareMetadataUpdate(updatedMetadata);
// If the metadata wasn't updated before retrying, the fetcher would consult the original leader and hit a NOT_LEADER exception.
// We will count the answered future response in the end to verify if this is the case.
Map<TopicPartition, ListOffsetResponse.PartitionData> paritionDataWithFatalError = new HashMap<>(allPartitionData);
paritionDataWithFatalError.put(tp1, new ListOffsetResponse.PartitionData(
Errors.NOT_LEADER_OR_FOLLOWER, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty()));
client.prepareResponseFrom(new ListOffsetResponse(paritionDataWithFatalError), originalLeader);
List<ListOffsetTopicResponse> topicsWithFatalError = Collections.singletonList(
new ListOffsetTopicResponse()
.setName(tp0.topic())
.setPartitions(Arrays.asList(
tp0NoError,
new ListOffsetPartitionResponse()
.setPartitionIndex(tp1.partition())
.setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code())
.setTimestamp(ListOffsetRequest.LATEST_TIMESTAMP)
.setOffset(-1L))));
ListOffsetResponseData dataWithFatalError = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(topicsWithFatalError);
client.prepareResponseFrom(new ListOffsetResponse(dataWithFatalError), originalLeader);
// The request to new leader must only contain one partition tp1 with error.
client.prepareResponseFrom(body -> {
@ -2533,9 +2567,12 @@ public class FetcherTest {
if (isListOffsetRequest) {
ListOffsetRequest request = (ListOffsetRequest) body;
return request.partitionTimestamps().equals(
Collections.singletonMap(tp1, new ListOffsetRequest.PartitionData(
fetchTimestamp, Optional.of(newLeaderEpoch))));
ListOffsetTopic requestTopic = request.topics().get(0);
ListOffsetPartition expectedPartition = new ListOffsetPartition()
.setPartitionIndex(tp1.partition())
.setTimestamp(fetchTimestamp)
.setCurrentLeaderEpoch(newLeaderEpoch);
return expectedPartition.equals(requestTopic.partitions().get(0));
} else {
return false;
}
@ -2594,9 +2631,9 @@ public class FetcherTest {
MockClient.RequestMatcher matcher = body -> {
if (body instanceof ListOffsetRequest) {
ListOffsetRequest offsetRequest = (ListOffsetRequest) body;
Optional<Integer> epoch = offsetRequest.partitionTimestamps().get(tp0).currentLeaderEpoch;
assertTrue("Expected Fetcher to set leader epoch in request", epoch.isPresent());
assertEquals("Expected leader epoch to match epoch from metadata update", epoch.get().longValue(), 99);
int epoch = offsetRequest.topics().get(0).partitions().get(0).currentLeaderEpoch();
assertTrue("Expected Fetcher to set leader epoch in request", epoch != ListOffsetResponse.UNKNOWN_EPOCH);
assertEquals("Expected leader epoch to match epoch from metadata update", epoch, 99);
return true;
} else {
fail("Should have seen ListOffsetRequest");
@ -2692,14 +2729,22 @@ public class FetcherTest {
public void testBatchedListOffsetsMetadataErrors() {
buildFetcher();
Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_OR_FOLLOWER,
ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET,
Optional.empty()));
partitionData.put(tp1, new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET,
Optional.empty()));
client.prepareResponse(new ListOffsetResponse(0, partitionData));
ListOffsetResponseData data = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(Collections.singletonList(new ListOffsetTopicResponse()
.setName(tp0.topic())
.setPartitions(Arrays.asList(
new ListOffsetPartitionResponse()
.setPartitionIndex(tp0.partition())
.setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code())
.setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
.setOffset(ListOffsetResponse.UNKNOWN_OFFSET),
new ListOffsetPartitionResponse()
.setPartitionIndex(tp1.partition())
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
.setOffset(ListOffsetResponse.UNKNOWN_OFFSET)))));
client.prepareResponse(new ListOffsetResponse(data));
Map<TopicPartition, Long> offsetsToSearch = new HashMap<>();
offsetsToSearch.put(tp0, ListOffsetRequest.EARLIEST_TIMESTAMP);
@ -3544,16 +3589,58 @@ public class FetcherTest {
private void testGetOffsetsForTimesWithUnknownOffset() {
client.reset();
// Ensure metadata has both partition.
// Ensure metadata has both partitions.
MetadataResponse initialMetadataUpdate = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1));
client.updateMetadata(initialMetadataUpdate);
Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NONE,
ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET,
Optional.empty()));
ListOffsetResponseData data = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(Collections.singletonList(new ListOffsetTopicResponse()
.setName(tp0.topic())
.setPartitions(Collections.singletonList(new ListOffsetPartitionResponse()
.setPartitionIndex(tp0.partition())
.setErrorCode(Errors.NONE.code())
.setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
.setOffset(ListOffsetResponse.UNKNOWN_OFFSET)))));
client.prepareResponseFrom(new ListOffsetResponse(0, partitionData),
client.prepareResponseFrom(new ListOffsetResponse(data),
metadata.fetch().leaderFor(tp0));
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
timestampToSearch.put(tp0, 0L);
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
fetcher.offsetsForTimes(timestampToSearch, time.timer(Long.MAX_VALUE));
assertTrue(offsetAndTimestampMap.containsKey(tp0));
assertNull(offsetAndTimestampMap.get(tp0));
}
@Test
public void testGetOffsetsForTimesWithUnknownOffsetV0() {
buildFetcher();
// Empty map
assertTrue(fetcher.offsetsForTimes(new HashMap<>(), time.timer(100L)).isEmpty());
// Unknown Offset
client.reset();
// Ensure metadata has both partition.
MetadataResponse initialMetadataUpdate = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1));
client.updateMetadata(initialMetadataUpdate);
// Force LIST_OFFSETS version 0
Node node = metadata.fetch().nodes().get(0);
apiVersions.update(node.idString(), NodeApiVersions.create(
ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 0));
ListOffsetResponseData data = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(Collections.singletonList(new ListOffsetTopicResponse()
.setName(tp0.topic())
.setPartitions(Collections.singletonList(new ListOffsetPartitionResponse()
.setPartitionIndex(tp0.partition())
.setErrorCode(Errors.NONE.code())
.setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
.setOldStyleOffsets(Collections.emptyList())))));
client.prepareResponseFrom(new ListOffsetResponse(data),
metadata.fetch().leaderFor(tp0));
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
@ -4230,7 +4317,7 @@ public class FetcherTest {
expectedOffsets.put(tp1, 4L);
expectedOffsets.put(tp2, 6L);
assignFromUser(expectedOffsets.keySet());
client.prepareResponse(listOffsetResponse(Errors.NONE, ListOffsetRequest.EARLIEST_TIMESTAMP, expectedOffsets));
client.prepareResponse(listOffsetResponse(expectedOffsets, Errors.NONE, ListOffsetRequest.EARLIEST_TIMESTAMP, ListOffsetResponse.UNKNOWN_EPOCH));
assertEquals(expectedOffsets, fetcher.beginningOffsets(asList(tp0, tp1, tp2), time.timer(5000L)));
}
@ -4264,7 +4351,7 @@ public class FetcherTest {
expectedOffsets.put(tp1, 7L);
expectedOffsets.put(tp2, 9L);
assignFromUser(expectedOffsets.keySet());
client.prepareResponse(listOffsetResponse(Errors.NONE, ListOffsetRequest.LATEST_TIMESTAMP, expectedOffsets));
client.prepareResponse(listOffsetResponse(expectedOffsets, Errors.NONE, ListOffsetRequest.LATEST_TIMESTAMP, ListOffsetResponse.UNKNOWN_EPOCH));
assertEquals(expectedOffsets, fetcher.endOffsets(asList(tp0, tp1, tp2), time.timer(5000L)));
}
@ -4275,24 +4362,19 @@ public class FetcherTest {
}
private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
return listOffsetRequestMatcher(timestamp, Optional.empty());
}
private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp, Optional<Integer> leaderEpoch) {
// matches any list offset request with the provided timestamp
return body -> {
ListOffsetRequest req = (ListOffsetRequest) body;
return req.partitionTimestamps().equals(Collections.singletonMap(
tp0, new ListOffsetRequest.PartitionData(timestamp, leaderEpoch)));
};
return listOffsetRequestMatcher(timestamp, ListOffsetResponse.UNKNOWN_EPOCH);
}
private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp, final int leaderEpoch) {
// matches any list offset request with the provided timestamp
return body -> {
ListOffsetRequest req = (ListOffsetRequest) body;
return req.partitionTimestamps().equals(Collections.singletonMap(
tp0, new ListOffsetRequest.PartitionData(timestamp, Optional.of(leaderEpoch))));
ListOffsetTopic topic = req.topics().get(0);
ListOffsetPartition partition = topic.partitions().get(0);
return tp0.topic().equals(topic.name())
&& tp0.partition() == partition.partitionIndex()
&& timestamp == partition.timestamp()
&& leaderEpoch == partition.currentLeaderEpoch();
};
}
@ -4301,26 +4383,35 @@ public class FetcherTest {
}
private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
return listOffsetResponse(tp, error, timestamp, offset, null);
return listOffsetResponse(tp, error, timestamp, offset, ListOffsetResponse.UNKNOWN_EPOCH);
}
private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset,
Integer leaderEpoch) {
ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(
error, timestamp, offset, Optional.ofNullable(leaderEpoch));
Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>();
allPartitionData.put(tp, partitionData);
return new ListOffsetResponse(allPartitionData);
private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset, int leaderEpoch) {
Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(tp, offset);
return listOffsetResponse(offsets, error, timestamp, leaderEpoch);
}
private ListOffsetResponse listOffsetResponse(Errors error, long timestamp, Map<TopicPartition, Long> offsets) {
Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>();
private ListOffsetResponse listOffsetResponse(Map<TopicPartition, Long> offsets, Errors error, long timestamp, int leaderEpoch) {
Map<String, List<ListOffsetPartitionResponse>> responses = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error, timestamp,
entry.getValue(), Optional.empty());
allPartitionData.put(entry.getKey(), partitionData);
TopicPartition tp = entry.getKey();
responses.putIfAbsent(tp.topic(), new ArrayList<>());
responses.get(tp.topic()).add(new ListOffsetPartitionResponse()
.setPartitionIndex(tp.partition())
.setErrorCode(error.code())
.setOffset(entry.getValue())
.setTimestamp(timestamp)
.setLeaderEpoch(leaderEpoch));
}
return new ListOffsetResponse(allPartitionData);
List<ListOffsetTopicResponse> topics = new ArrayList<>();
for (Map.Entry<String, List<ListOffsetPartitionResponse>> response : responses.entrySet()) {
topics.add(new ListOffsetTopicResponse()
.setName(response.getKey())
.setPartitions(response.getValue()));
}
ListOffsetResponseData data = new ListOffsetResponseData().setTopics(topics);
return new ListOffsetResponse(data);
}
private FetchResponse<MemoryRecords> fullFetchResponseWithAbortedTransactions(MemoryRecords records,

View File

@ -18,6 +18,8 @@
package org.apache.kafka.common.message;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
@ -25,6 +27,10 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
@ -162,6 +168,49 @@ public final class MessageTest {
testAllMessageRoundTripsFromVersion((short) 5, newRequest.get().setGroupInstanceId("instanceId"));
}
@Test
public void testListOffsetsRequestVersions() throws Exception {
List<ListOffsetTopic> v = Collections.singletonList(new ListOffsetTopic()
.setName("topic")
.setPartitions(Collections.singletonList(new ListOffsetPartition()
.setPartitionIndex(0)
.setTimestamp(123L))));
Supplier<ListOffsetRequestData> newRequest = () -> new ListOffsetRequestData()
.setTopics(v)
.setReplicaId(0);
testAllMessageRoundTrips(newRequest.get());
testAllMessageRoundTripsFromVersion((short) 2, newRequest.get().setIsolationLevel(IsolationLevel.READ_COMMITTED.id()));
}
@Test
public void testListOffsetsResponseVersions() throws Exception {
ListOffsetPartitionResponse partition = new ListOffsetPartitionResponse()
.setErrorCode(Errors.NONE.code())
.setPartitionIndex(0)
.setOldStyleOffsets(Collections.singletonList(321L));
List<ListOffsetTopicResponse> topics = Collections.singletonList(new ListOffsetTopicResponse()
.setName("topic")
.setPartitions(Collections.singletonList(partition)));
Supplier<ListOffsetResponseData> response = () -> new ListOffsetResponseData()
.setTopics(topics);
for (short version = 0; version <= ApiKeys.LIST_OFFSETS.latestVersion(); version++) {
ListOffsetResponseData responseData = response.get();
if (version > 0) {
responseData.topics().get(0).partitions().get(0)
.setOldStyleOffsets(Collections.emptyList())
.setOffset(456L)
.setTimestamp(123L);
}
if (version > 1) {
responseData.setThrottleTimeMs(1000);
}
if (version > 3) {
partition.setLeaderEpoch(1);
}
testEquivalentMessageRoundTrip(version, responseData);
}
}
@Test
public void testJoinGroupResponseVersions() throws Exception {
Supplier<JoinGroupResponseData> newResponse = () -> new JoinGroupResponseData()

View File

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ListOffsetRequestData;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic;
import org.apache.kafka.common.message.ListOffsetResponseData;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.junit.Test;
public class ListOffsetRequestTest {
@Test
public void testDuplicatePartitions() {
List<ListOffsetTopic> topics = Collections.singletonList(
new ListOffsetTopic()
.setName("topic")
.setPartitions(Arrays.asList(
new ListOffsetPartition()
.setPartitionIndex(0),
new ListOffsetPartition()
.setPartitionIndex(0))));
ListOffsetRequestData data = new ListOffsetRequestData()
.setTopics(topics)
.setReplicaId(-1);
ListOffsetRequest request = new ListOffsetRequest(data.toStruct((short) 0), (short) 0);
assertEquals(Collections.singleton(new TopicPartition("topic", 0)), request.duplicatePartitions());
}
@Test
public void testGetErrorResponse() {
for (short version = 1; version <= ApiKeys.LIST_OFFSETS.latestVersion(); version++) {
List<ListOffsetTopic> topics = Arrays.asList(
new ListOffsetTopic()
.setName("topic")
.setPartitions(Collections.singletonList(
new ListOffsetPartition()
.setPartitionIndex(0))));
ListOffsetRequest request = ListOffsetRequest.Builder
.forConsumer(true, IsolationLevel.READ_COMMITTED)
.setTargetTimes(topics)
.build(version);
ListOffsetResponse response = (ListOffsetResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
List<ListOffsetTopicResponse> v = Collections.singletonList(
new ListOffsetTopicResponse()
.setName("topic")
.setPartitions(Collections.singletonList(
new ListOffsetPartitionResponse()
.setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code())
.setLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH)
.setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
.setPartitionIndex(0)
.setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP))));
ListOffsetResponseData data = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(v);
ListOffsetResponse expectedResponse = new ListOffsetResponse(data);
assertEquals(expectedResponse.data().topics(), response.data().topics());
assertEquals(expectedResponse.throttleTimeMs(), response.throttleTimeMs());
}
}
@Test
public void testGetErrorResponseV0() {
List<ListOffsetTopic> topics = Arrays.asList(
new ListOffsetTopic()
.setName("topic")
.setPartitions(Collections.singletonList(
new ListOffsetPartition()
.setPartitionIndex(0))));
ListOffsetRequest request = ListOffsetRequest.Builder
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(topics)
.build((short) 0);
ListOffsetResponse response = (ListOffsetResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
List<ListOffsetTopicResponse> v = Collections.singletonList(
new ListOffsetTopicResponse()
.setName("topic")
.setPartitions(Collections.singletonList(
new ListOffsetPartitionResponse()
.setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code())
.setOldStyleOffsets(Collections.emptyList())
.setPartitionIndex(0))));
ListOffsetResponseData data = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(v);
ListOffsetResponse expectedResponse = new ListOffsetResponse(data);
assertEquals(expectedResponse.data().topics(), response.data().topics());
assertEquals(expectedResponse.throttleTimeMs(), response.throttleTimeMs());
}
@Test
public void testToListOffsetTopics() {
ListOffsetPartition lop0 = new ListOffsetPartition()
.setPartitionIndex(0)
.setCurrentLeaderEpoch(1)
.setMaxNumOffsets(2)
.setTimestamp(123L);
ListOffsetPartition lop1 = new ListOffsetPartition()
.setPartitionIndex(1)
.setCurrentLeaderEpoch(3)
.setMaxNumOffsets(4)
.setTimestamp(567L);
Map<TopicPartition, ListOffsetPartition> timestampsToSearch = new HashMap<>();
timestampsToSearch.put(new TopicPartition("topic", 0), lop0);
timestampsToSearch.put(new TopicPartition("topic", 1), lop1);
List<ListOffsetTopic> listOffsetTopics = ListOffsetRequest.toListOffsetTopics(timestampsToSearch);
assertEquals(1, listOffsetTopics.size());
ListOffsetTopic topic = listOffsetTopics.get(0);
assertEquals("topic", topic.name());
assertEquals(2, topic.partitions().size());
assertTrue(topic.partitions().contains(lop0));
assertTrue(topic.partitions().contains(lop1));
}
}

View File

@ -113,6 +113,11 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic;
import org.apache.kafka.common.message.ListOffsetResponseData;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
@ -1221,28 +1226,39 @@ public class RequestResponseTest {
private ListOffsetRequest createListOffsetRequest(int version) {
if (version == 0) {
Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = Collections.singletonMap(
new TopicPartition("test", 0),
new ListOffsetRequest.PartitionData(1000000L, 10));
ListOffsetTopic topic = new ListOffsetTopic()
.setName("test")
.setPartitions(Arrays.asList(new ListOffsetPartition()
.setPartitionIndex(0)
.setTimestamp(1000000L)
.setMaxNumOffsets(10)));
return ListOffsetRequest.Builder
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(offsetData)
.setTargetTimes(Collections.singletonList(topic))
.build((short) version);
} else if (version == 1) {
Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = Collections.singletonMap(
new TopicPartition("test", 0),
new ListOffsetRequest.PartitionData(1000000L, Optional.empty()));
ListOffsetTopic topic = new ListOffsetTopic()
.setName("test")
.setPartitions(Arrays.asList(new ListOffsetPartition()
.setPartitionIndex(0)
.setTimestamp(1000000L)));
return ListOffsetRequest.Builder
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(offsetData)
.setTargetTimes(Collections.singletonList(topic))
.build((short) version);
} else if (version >= 2 && version <= 5) {
Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = Collections.singletonMap(
new TopicPartition("test", 0),
new ListOffsetRequest.PartitionData(1000000L, Optional.of(5)));
ListOffsetPartition partition = new ListOffsetPartition()
.setPartitionIndex(0)
.setTimestamp(1000000L);
if (version >= 4) {
partition.setCurrentLeaderEpoch(5);
}
ListOffsetTopic topic = new ListOffsetTopic()
.setName("test")
.setPartitions(Arrays.asList(partition));
return ListOffsetRequest.Builder
.forConsumer(true, IsolationLevel.READ_COMMITTED)
.setTargetTimes(offsetData)
.setTargetTimes(Collections.singletonList(topic))
.build((short) version);
} else {
throw new IllegalArgumentException("Illegal ListOffsetRequest version " + version);
@ -1251,15 +1267,28 @@ public class RequestResponseTest {
private ListOffsetResponse createListOffsetResponse(int version) {
if (version == 0) {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0),
new ListOffsetResponse.PartitionData(Errors.NONE, asList(100L)));
return new ListOffsetResponse(responseData);
ListOffsetResponseData data = new ListOffsetResponseData()
.setTopics(Collections.singletonList(new ListOffsetTopicResponse()
.setName("test")
.setPartitions(Collections.singletonList(new ListOffsetPartitionResponse()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code())
.setOldStyleOffsets(asList(100L))))));
return new ListOffsetResponse(data);
} else if (version >= 1 && version <= 5) {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0),
new ListOffsetResponse.PartitionData(Errors.NONE, 10000L, 100L, Optional.of(27)));
return new ListOffsetResponse(responseData);
ListOffsetPartitionResponse partition = new ListOffsetPartitionResponse()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code())
.setTimestamp(10000L)
.setOffset(100L);
if (version >= 4) {
partition.setLeaderEpoch(27);
}
ListOffsetResponseData data = new ListOffsetResponseData()
.setTopics(Collections.singletonList(new ListOffsetTopicResponse()
.setName("test")
.setPartitions(Collections.singletonList(partition))));
return new ListOffsetResponse(data);
} else {
throw new IllegalArgumentException("Illegal ListOffsetResponse version " + version);
}

View File

@ -29,7 +29,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Base64.Encoder;
import java.util.Set;
@ -53,7 +52,6 @@ import javax.security.sasl.SaslException;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.config.types.Password;
@ -62,6 +60,9 @@ import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection;
import org.apache.kafka.common.message.ListOffsetResponseData;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
@ -1581,8 +1582,17 @@ public class SaslAuthenticatorTest {
@Test
public void testConvertListOffsetResponseToSaslHandshakeResponse() {
ListOffsetResponse response = new ListOffsetResponse(0, Collections.singletonMap(new TopicPartition("topic", 0),
new ListOffsetResponse.PartitionData(Errors.NONE, 0, 0, Optional.empty())));
ListOffsetResponseData data = new ListOffsetResponseData()
.setThrottleTimeMs(0)
.setTopics(Collections.singletonList(new ListOffsetTopicResponse()
.setName("topic")
.setPartitions(Collections.singletonList(new ListOffsetPartitionResponse()
.setErrorCode(Errors.NONE.code())
.setLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH)
.setPartitionIndex(0)
.setOffset(0)
.setTimestamp(0)))));
ListOffsetResponse response = new ListOffsetResponse(data);
ByteBuffer buffer = response.serialize(ApiKeys.LIST_OFFSETS, LIST_OFFSETS.latestVersion(), 0);
final RequestHeader header0 = new RequestHeader(LIST_OFFSETS, LIST_OFFSETS.latestVersion(), "id", SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID);
Assert.assertThrows(SchemaException.class, () -> NetworkClient.parseResponse(buffer.duplicate(), header0));

View File

@ -378,7 +378,7 @@ class Partition(val topicPartition: TopicPartition,
getLocalLog(currentLeaderEpoch, requireLeader) match {
case Left(localLog) => localLog
case Right(error) =>
throw error.exception(s"Failed to find ${if (requireLeader) "leader " else ""} log for " +
throw error.exception(s"Failed to find ${if (requireLeader) "leader" else ""} log for " +
s"partition $topicPartition with leader epoch $currentLeaderEpoch. The current leader " +
s"is $leaderReplicaIdOpt and the current epoch $leaderEpoch")
}
@ -1071,7 +1071,7 @@ class Partition(val topicPartition: TopicPartition,
case None => localLog.logEndOffset
}
val epochLogString = if(currentLeaderEpoch.isPresent) {
val epochLogString = if (currentLeaderEpoch.isPresent) {
s"epoch ${currentLeaderEpoch.get}"
} else {
"unknown epoch"

View File

@ -62,6 +62,9 @@ import org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsP
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition
import org.apache.kafka.common.message.ListOffsetResponseData
import org.apache.kafka.common.message.ListOffsetResponseData.{ListOffsetPartitionResponse, ListOffsetTopicResponse}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@ -934,136 +937,161 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
val version = request.header.apiVersion
val mergedResponseMap = if (version == 0)
val topics = if (version == 0)
handleListOffsetRequestV0(request)
else
handleListOffsetRequestV1AndAbove(request)
sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setTopics(topics.asJava)))
}
private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = {
private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = {
val correlationId = request.header.correlationId
val clientId = request.header.clientId
val offsetRequest = request.body[ListOffsetRequest]
val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context,
DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context,
DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name)
val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) =>
k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava)
}
val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
new ListOffsetTopicResponse()
.setName(topic.name)
.setPartitions(topic.partitions.asScala.map(partition =>
new ListOffsetPartitionResponse()
.setPartitionIndex(partition.partitionIndex)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)).asJava)
)
val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) =>
try {
val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
topicPartition = topicPartition,
timestamp = partitionData.timestamp,
maxNumOffsets = partitionData.maxNumOffsets,
isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID,
fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID)
(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava))
} catch {
// NOTE: UnknownTopicOrPartitionException and NotLeaderOrFollowerException are special cased since these error messages
// are typically transient and there is no value in logging the entire stack trace for the same
case e @ (_ : UnknownTopicOrPartitionException |
_ : NotLeaderOrFollowerException |
_ : KafkaStorageException) =>
debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
correlationId, clientId, topicPartition, e.getMessage))
(topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
case e: Throwable =>
error("Error while responding to offset request", e)
(topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
}
}
responseMap ++ unauthorizedResponseStatus
}
private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = {
val correlationId = request.header.correlationId
val clientId = request.header.clientId
val offsetRequest = request.body[ListOffsetRequest]
val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context,
DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) =>
k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET,
Optional.empty())
}
val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) =>
if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " +
s"failed because the partition is duplicated in the request.")
(topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET,
Optional.empty()))
} else {
def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = {
(topicPartition, new ListOffsetResponse.PartitionData(
e,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET,
Optional.empty()))
}
val responseTopics = authorizedRequestInfo.map { topic =>
val responsePartitions = topic.partitions.asScala.map { partition =>
val topicPartition = new TopicPartition(topic.name, partition.partitionIndex)
try {
val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID
val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID
val isolationLevelOpt = if (isClientRequest)
Some(offsetRequest.isolationLevel)
else
None
val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
partitionData.timestamp,
isolationLevelOpt,
partitionData.currentLeaderEpoch,
fetchOnlyFromLeader)
val response = foundOpt match {
case Some(found) =>
new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset, found.leaderEpoch)
case None =>
new ListOffsetResponse.PartitionData(Errors.NONE, ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())
}
(topicPartition, response)
val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
topicPartition = topicPartition,
timestamp = partition.timestamp,
maxNumOffsets = partition.maxNumOffsets,
isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID,
fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID)
new ListOffsetPartitionResponse()
.setPartitionIndex(partition.partitionIndex)
.setErrorCode(Errors.NONE.code)
.setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
} catch {
// NOTE: These exceptions are special cased since these error messages are typically transient or the client
// would have received a clear exception and there is no value in logging the entire stack trace for the same
// NOTE: UnknownTopicOrPartitionException and NotLeaderOrFollowerException are special cases since these error messages
// are typically transient and there is no value in logging the entire stack trace for the same
case e @ (_ : UnknownTopicOrPartitionException |
_ : NotLeaderOrFollowerException |
_ : UnknownLeaderEpochException |
_ : FencedLeaderEpochException |
_ : KafkaStorageException |
_ : UnsupportedForMessageFormatException) =>
debug(s"Offset request with correlation id $correlationId from client $clientId on " +
s"partition $topicPartition failed due to ${e.getMessage}")
buildErrorResponse(Errors.forException(e))
// Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
case e: OffsetNotAvailableException =>
if(request.header.apiVersion >= 5) {
buildErrorResponse(Errors.forException(e))
} else {
buildErrorResponse(Errors.LEADER_NOT_AVAILABLE)
}
_ : KafkaStorageException) =>
debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
correlationId, clientId, topicPartition, e.getMessage))
new ListOffsetPartitionResponse()
.setPartitionIndex(partition.partitionIndex)
.setErrorCode(Errors.forException(e).code)
case e: Throwable =>
error("Error while responding to offset request", e)
buildErrorResponse(Errors.forException(e))
new ListOffsetPartitionResponse()
.setPartitionIndex(partition.partitionIndex)
.setErrorCode(Errors.forException(e).code)
}
}
new ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava)
}
responseMap ++ unauthorizedResponseStatus
(responseTopics ++ unauthorizedResponseStatus).toList
}
private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): List[ListOffsetTopicResponse] = {
val correlationId = request.header.correlationId
val clientId = request.header.clientId
val offsetRequest = request.body[ListOffsetRequest]
def buildErrorResponse(e: Errors, partition: ListOffsetPartition): ListOffsetPartitionResponse = {
new ListOffsetPartitionResponse()
.setPartitionIndex(partition.partitionIndex)
.setErrorCode(e.code)
.setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
.setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
}
val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context,
DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name)
val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
new ListOffsetTopicResponse()
.setName(topic.name)
.setPartitions(topic.partitions.asScala.map(partition =>
buildErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED, partition)).asJava)
)
val responseTopics = authorizedRequestInfo.map { topic =>
val responsePartitions = topic.partitions.asScala.map { partition =>
val topicPartition = new TopicPartition(topic.name, partition.partitionIndex)
if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " +
s"failed because the partition is duplicated in the request.")
buildErrorResponse(Errors.INVALID_REQUEST, partition)
} else {
try {
val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID
val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID
val isolationLevelOpt = if (isClientRequest)
Some(offsetRequest.isolationLevel)
else
None
val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
partition.timestamp,
isolationLevelOpt,
if (partition.currentLeaderEpoch == ListOffsetResponse.UNKNOWN_EPOCH) Optional.empty() else Optional.of(partition.currentLeaderEpoch),
fetchOnlyFromLeader)
val response = foundOpt match {
case Some(found) =>
val partitionResponse = new ListOffsetPartitionResponse()
.setPartitionIndex(partition.partitionIndex)
.setErrorCode(Errors.NONE.code)
.setTimestamp(found.timestamp)
.setOffset(found.offset)
if (found.leaderEpoch.isPresent)
partitionResponse.setLeaderEpoch(found.leaderEpoch.get)
partitionResponse
case None =>
buildErrorResponse(Errors.NONE, partition)
}
response
} catch {
// NOTE: These exceptions are special cases since these error messages are typically transient or the client
// would have received a clear exception and there is no value in logging the entire stack trace for the same
case e @ (_ : UnknownTopicOrPartitionException |
_ : NotLeaderForPartitionException |
_ : UnknownLeaderEpochException |
_ : FencedLeaderEpochException |
_ : KafkaStorageException |
_ : UnsupportedForMessageFormatException) =>
e.printStackTrace()
debug(s"Offset request with correlation id $correlationId from client $clientId on " +
s"partition $topicPartition failed due to ${e.getMessage}")
buildErrorResponse(Errors.forException(e), partition)
// Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
case e: OffsetNotAvailableException =>
if (request.header.apiVersion >= 5) {
buildErrorResponse(Errors.forException(e), partition)
} else {
buildErrorResponse(Errors.LEADER_NOT_AVAILABLE, partition)
}
case e: Throwable =>
error("Error while responding to offset request", e)
buildErrorResponse(Errors.forException(e), partition)
}
}
}
new ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava)
}
(responseTopics ++ unauthorizedResponseStatus).toList
}
private def createTopic(topic: String,

View File

@ -17,6 +17,7 @@
package kafka.server
import java.util.Collections
import java.util.Optional
import kafka.api._
@ -28,6 +29,7 @@ import kafka.utils.Implicits._
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.message.ListOffsetRequestData.{ListOffsetPartition, ListOffsetTopic}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, Records}
@ -229,22 +231,27 @@ class ReplicaFetcherThread(name: String,
}
private def fetchOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int, earliestOrLatest: Long): Long = {
val requestPartitionData = new ListOffsetRequest.PartitionData(earliestOrLatest,
Optional.of[Integer](currentLeaderEpoch))
val requestPartitions = Map(topicPartition -> requestPartitionData)
val topic = new ListOffsetTopic()
.setName(topicPartition.topic)
.setPartitions(Collections.singletonList(
new ListOffsetPartition()
.setPartitionIndex(topicPartition.partition)
.setCurrentLeaderEpoch(currentLeaderEpoch)
.setTimestamp(earliestOrLatest)))
val requestBuilder = ListOffsetRequest.Builder.forReplica(listOffsetRequestVersion, replicaId)
.setTargetTimes(requestPartitions.asJava)
.setTargetTimes(Collections.singletonList(topic))
val clientResponse = leaderEndpoint.sendRequest(requestBuilder)
val response = clientResponse.responseBody.asInstanceOf[ListOffsetResponse]
val responsePartition = response.topics.asScala.find(_.name == topicPartition.topic).get
.partitions.asScala.find(_.partitionIndex == topicPartition.partition).get
val responsePartitionData = response.responseData.get(topicPartition)
responsePartitionData.error match {
Errors.forCode(responsePartition.errorCode) match {
case Errors.NONE =>
if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2)
responsePartitionData.offset
responsePartition.offset
else
responsePartitionData.offsets.get(0)
responsePartition.oldStyleOffsets.get(0)
case error => throw error.exception
}
}

View File

@ -42,6 +42,7 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{Alter
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListOffsetRequestData.{ListOffsetPartition, ListOffsetTopic}
import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
import org.apache.kafka.common.message.{AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, OffsetCommitRequestData, SyncGroupRequestData}
@ -155,7 +156,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses.asScala.find(_._1 == tp).get._2.error),
ApiKeys.FETCH -> ((resp: requests.FetchResponse[Records]) => resp.responseData.asScala.find(_._1 == tp).get._2.error),
ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData.asScala.find(_._1 == tp).get._2.error),
ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => {
Errors.forCode(
resp.data
.topics.asScala.find(_.name == topic).get
.partitions.asScala.find(_.partitionIndex == part).get
.errorCode
)
}),
ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => Errors.forCode(
resp.data().topics().get(0).partitions().get(0).errorCode())),
ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error),
@ -164,8 +172,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.error),
ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => Errors.forCode(resp.data.errorCode())),
ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => {
val errorCode = resp.data().groups().asScala.find(g => group.equals(g.groupId())).head.errorCode()
Errors.forCode(errorCode)
Errors.forCode(resp.data.groups.asScala.find(g => group == g.groupId).head.errorCode)
}),
ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.error),
ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error),
@ -212,9 +219,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.OFFSET_DELETE -> ((resp: OffsetDeleteResponse) => {
Errors.forCode(
resp.data
.topics().asScala.find(_.name() == topic).get
.partitions().asScala.find(_.partitionIndex() == part).get
.errorCode()
.topics.asScala.find(_.name == topic).get
.partitions.asScala.find(_.partitionIndex == part).get
.errorCode
)
})
)
@ -304,7 +311,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def createListOffsetsRequest = {
requests.ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED).setTargetTimes(
Map(tp -> new ListOffsetRequest.PartitionData(0L, Optional.of[Integer](27))).asJava).
List(new ListOffsetTopic()
.setName(tp.topic)
.setPartitions(List(new ListOffsetPartition()
.setPartitionIndex(tp.partition)
.setTimestamp(0L)
.setCurrentLeaderEpoch(27)).asJava)).asJava
).
build()
}

View File

@ -64,7 +64,9 @@ class DelayedFetchTest extends EasyMockSupport {
EasyMock.expect(replicaManager.getPartitionOrException(topicPartition))
.andReturn(partition)
EasyMock.expect(partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = true))
EasyMock.expect(partition.fetchOffsetSnapshot(
currentLeaderEpoch,
fetchOnlyFromLeader = true))
.andThrow(new FencedLeaderEpochException("Requested epoch has been fenced"))
EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false)

View File

@ -176,7 +176,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(Some(4), partition.leaderLogIfLocal.map(_.logEndOffset))
assertEquals(None, log.latestEpoch)
val epochEndOffset = partition.lastOffsetForLeaderEpoch(currentLeaderEpoch = Optional.of[Integer](leaderEpoch),
val epochEndOffset = partition.lastOffsetForLeaderEpoch(currentLeaderEpoch = Optional.of(leaderEpoch),
leaderEpoch = leaderEpoch, fetchOnlyFromLeader = true)
assertEquals(EpochEndOffset.UNDEFINED_EPOCH_OFFSET, epochEndOffset.endOffset)
assertEquals(EpochEndOffset.UNDEFINED_EPOCH, epochEndOffset.leaderEpoch)
@ -534,7 +534,7 @@ class PartitionTest extends AbstractPartitionTest {
assertTrue(timestampAndOffsetOpt.isDefined)
val timestampAndOffset = timestampAndOffsetOpt.get
assertEquals(Optional.of(leaderEpoch), timestampAndOffset.leaderEpoch)
assertEquals(leaderEpoch, timestampAndOffset.leaderEpoch.get)
}
/**

View File

@ -50,6 +50,7 @@ import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListOffsetRequestData.{ListOffsetPartition, ListOffsetTopic}
import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
@ -1210,19 +1211,25 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling()
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
val targetTimes = Map(tp -> new ListOffsetRequest.PartitionData(ListOffsetRequest.EARLIEST_TIMESTAMP,
currentLeaderEpoch))
val targetTimes = List(new ListOffsetTopic()
.setName(tp.topic)
.setPartitions(List(new ListOffsetPartition()
.setPartitionIndex(tp.partition)
.setTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP)
.setCurrentLeaderEpoch(currentLeaderEpoch.get)).asJava)).asJava
val listOffsetRequest = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
.setTargetTimes(targetTimes.asJava).build()
.setTargetTimes(targetTimes).build()
val request = buildRequest(listOffsetRequest)
createKafkaApis().handleListOffsetRequest(request)
val response = readResponse(ApiKeys.LIST_OFFSETS, listOffsetRequest, capturedResponse)
.asInstanceOf[ListOffsetResponse]
assertTrue(response.responseData.containsKey(tp))
val partitionDataOptional = response.topics.asScala.find(_.name == tp.topic).get
.partitions.asScala.find(_.partitionIndex == tp.partition)
assertTrue(partitionDataOptional.isDefined)
val partitionData = response.responseData.get(tp)
assertEquals(error, partitionData.error)
val partitionData = partitionDataOptional.get
assertEquals(error.code, partitionData.errorCode)
assertEquals(ListOffsetResponse.UNKNOWN_OFFSET, partitionData.offset)
assertEquals(ListOffsetResponse.UNKNOWN_TIMESTAMP, partitionData.timestamp)
}
@ -2214,18 +2221,23 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling()
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
val targetTimes = Map(tp -> new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP,
currentLeaderEpoch))
val targetTimes = List(new ListOffsetTopic()
.setName(tp.topic)
.setPartitions(List(new ListOffsetPartition()
.setPartitionIndex(tp.partition)
.setTimestamp(ListOffsetRequest.LATEST_TIMESTAMP)).asJava)).asJava
val listOffsetRequest = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
.setTargetTimes(targetTimes.asJava).build()
.setTargetTimes(targetTimes).build()
val request = buildRequest(listOffsetRequest)
createKafkaApis().handleListOffsetRequest(request)
val response = readResponse(ApiKeys.LIST_OFFSETS, listOffsetRequest, capturedResponse).asInstanceOf[ListOffsetResponse]
assertTrue(response.responseData.containsKey(tp))
val partitionDataOptional = response.topics.asScala.find(_.name == tp.topic).get
.partitions.asScala.find(_.partitionIndex == tp.partition)
assertTrue(partitionDataOptional.isDefined)
val partitionData = response.responseData.get(tp)
assertEquals(Errors.NONE, partitionData.error)
val partitionData = partitionDataOptional.get
assertEquals(Errors.NONE.code, partitionData.errorCode)
assertEquals(latestOffset, partitionData.offset)
assertEquals(ListOffsetResponse.UNKNOWN_TIMESTAMP, partitionData.timestamp)
}

View File

@ -19,6 +19,7 @@ package kafka.server
import java.util.Optional
import kafka.utils.TestUtils
import org.apache.kafka.common.message.ListOffsetRequestData.{ListOffsetPartition, ListOffsetTopic}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse}
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
@ -33,8 +34,12 @@ class ListOffsetsRequestTest extends BaseRequestTest {
def testListOffsetsErrorCodes(): Unit = {
val topic = "topic"
val partition = new TopicPartition(topic, 0)
val targetTimes = Map(partition -> new ListOffsetRequest.PartitionData(
ListOffsetRequest.EARLIEST_TIMESTAMP, Optional.of[Integer](0))).asJava
val targetTimes = List(new ListOffsetTopic()
.setName(topic)
.setPartitions(List(new ListOffsetPartition()
.setPartitionIndex(partition.partition)
.setTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP)
.setCurrentLeaderEpoch(0)).asJava)).asJava
val consumerRequest = ListOffsetRequest.Builder
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
@ -82,8 +87,14 @@ class ListOffsetsRequestTest extends BaseRequestTest {
val firstLeaderId = partitionToLeader(topicPartition.partition)
def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = {
val targetTimes = Map(topicPartition -> new ListOffsetRequest.PartitionData(
ListOffsetRequest.EARLIEST_TIMESTAMP, currentLeaderEpoch)).asJava
val partition = new ListOffsetPartition()
.setPartitionIndex(topicPartition.partition)
.setTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP)
if (currentLeaderEpoch.isPresent)
partition.setCurrentLeaderEpoch(currentLeaderEpoch.get)
val targetTimes = List(new ListOffsetTopic()
.setName(topic)
.setPartitions(List(partition).asJava)).asJava
val request = ListOffsetRequest.Builder
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(targetTimes)
@ -122,8 +133,11 @@ class ListOffsetsRequestTest extends BaseRequestTest {
def fetchOffsetAndEpoch(serverId: Int,
timestamp: Long): (Long, Int) = {
val targetTimes = Map(topicPartition -> new ListOffsetRequest.PartitionData(
timestamp, Optional.empty[Integer]())).asJava
val targetTimes = List(new ListOffsetTopic()
.setName(topic)
.setPartitions(List(new ListOffsetPartition()
.setPartitionIndex(topicPartition.partition)
.setTimestamp(timestamp)).asJava)).asJava
val request = ListOffsetRequest.Builder
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
@ -131,11 +145,10 @@ class ListOffsetsRequestTest extends BaseRequestTest {
.build()
val response = sendRequest(serverId, request)
val partitionData = response.responseData.get(topicPartition)
val epochOpt = partitionData.leaderEpoch
assertTrue(epochOpt.isPresent)
val partitionData = response.topics.asScala.find(_.name == topic).get
.partitions.asScala.find(_.partitionIndex == topicPartition.partition).get
(partitionData.offset, epochOpt.get)
(partitionData.offset, partitionData.leaderEpoch)
}
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L))
@ -157,9 +170,11 @@ class ListOffsetsRequestTest extends BaseRequestTest {
private def assertResponseError(error: Errors, brokerId: Int, request: ListOffsetRequest): Unit = {
val response = sendRequest(brokerId, request)
assertEquals(request.partitionTimestamps.size, response.responseData.size)
response.responseData.asScala.values.foreach { partitionData =>
assertEquals(error, partitionData.error)
assertEquals(request.topics.size, response.topics.size)
response.topics.asScala.foreach { topic =>
topic.partitions.asScala.foreach { partition =>
assertEquals(error.code, partition.errorCode)
}
}
}

View File

@ -23,6 +23,10 @@ import java.util.{Optional, Properties, Random}
import kafka.log.{ClientRecordDeletion, Log, LogSegment}
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse}
@ -32,6 +36,7 @@ import org.junit.Assert._
import org.junit.Test
import scala.jdk.CollectionConverters._
import scala.collection.mutable.Buffer
class LogOffsetTest extends BaseRequestTest {
@ -54,10 +59,9 @@ class LogOffsetTest extends BaseRequestTest {
def testGetOffsetsForUnknownTopic(): Unit = {
val topicPartition = new TopicPartition("foo", 0)
val request = ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(Map(topicPartition ->
new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 10)).asJava).build(0)
.setTargetTimes(buildTargetTimes(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP, 10).asJava).build(0)
val response = sendListOffsetsRequest(request)
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.responseData.get(topicPartition).error)
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, findPartition(response.topics.asScala, topicPartition).errorCode)
}
@deprecated("ListOffsetsRequest V0", since = "")
@ -87,9 +91,8 @@ class LogOffsetTest extends BaseRequestTest {
TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
"Leader should be elected")
val request = ListOffsetRequest.Builder.forReplica(0, 0)
.setTargetTimes(Map(topicPartition ->
new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 15)).asJava).build()
val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala
.setTargetTimes(buildTargetTimes(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP, 15).asJava).build()
val consumerOffsets = findPartition(sendListOffsetsRequest(request).topics.asScala, topicPartition).oldStyleOffsets.asScala
assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets)
}
@ -115,9 +118,8 @@ class LogOffsetTest extends BaseRequestTest {
TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
"Leader should be elected")
val request = ListOffsetRequest.Builder.forReplica(0, 0)
.setTargetTimes(Map(topicPartition ->
new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 15)).asJava).build()
val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala
.setTargetTimes(buildTargetTimes(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP, 15).asJava).build()
val consumerOffsets = findPartition(sendListOffsetsRequest(request).topics.asScala, topicPartition).oldStyleOffsets.asScala
assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), consumerOffsets)
// try to fetch using latest offset
@ -143,9 +145,8 @@ class LogOffsetTest extends BaseRequestTest {
for (_ <- 1 to 14) {
val topicPartition = new TopicPartition(topic, 0)
val request = ListOffsetRequest.Builder.forReplica(0, 0)
.setTargetTimes(Map(topicPartition ->
new ListOffsetRequest.PartitionData(ListOffsetRequest.EARLIEST_TIMESTAMP, 1)).asJava).build()
val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala
.setTargetTimes(buildTargetTimes(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP, 1).asJava).build()
val consumerOffsets = findPartition(sendListOffsetsRequest(request).topics.asScala, topicPartition).oldStyleOffsets.asScala
if (consumerOffsets.head == 1)
offsetChanged = true
}
@ -176,9 +177,8 @@ class LogOffsetTest extends BaseRequestTest {
TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
"Leader should be elected")
val request = ListOffsetRequest.Builder.forReplica(0, 0)
.setTargetTimes(Map(topicPartition ->
new ListOffsetRequest.PartitionData(now, 15)).asJava).build()
val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala
.setTargetTimes(buildTargetTimes(topicPartition, now, 15).asJava).build()
val consumerOffsets = findPartition(sendListOffsetsRequest(request).topics.asScala, topicPartition).oldStyleOffsets.asScala
assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), consumerOffsets)
}
@ -204,9 +204,8 @@ class LogOffsetTest extends BaseRequestTest {
TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
"Leader should be elected")
val request = ListOffsetRequest.Builder.forReplica(0, 0)
.setTargetTimes(Map(topicPartition ->
new ListOffsetRequest.PartitionData(ListOffsetRequest.EARLIEST_TIMESTAMP, 10)).asJava).build()
val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala
.setTargetTimes(buildTargetTimes(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP, 10).asJava).build()
val consumerOffsets = findPartition(sendListOffsetsRequest(request).topics.asScala, topicPartition).oldStyleOffsets.asScala
assertEquals(Seq(0L), consumerOffsets)
}
@ -256,4 +255,19 @@ class LogOffsetTest extends BaseRequestTest {
connectAndReceive[FetchResponse[MemoryRecords]](request)
}
private def buildTargetTimes(tp: TopicPartition, timestamp: Long, maxNumOffsets: Int): List[ListOffsetTopic] = {
List(new ListOffsetTopic()
.setName(tp.topic)
.setPartitions(List(new ListOffsetPartition()
.setPartitionIndex(tp.partition)
.setTimestamp(timestamp)
.setMaxNumOffsets(maxNumOffsets)).asJava)
)
}
private def findPartition(topics: Buffer[ListOffsetTopicResponse], tp: TopicPartition): ListOffsetPartitionResponse = {
topics.find(_.name == tp.topic).get
.partitions.asScala.find(_.partitionIndex == tp.partition).get
}
}

View File

@ -31,6 +31,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic,
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListOffsetRequestData.{ListOffsetPartition, ListOffsetTopic}
import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
import org.apache.kafka.common.message._
@ -221,9 +222,14 @@ class RequestQuotaTest extends BaseRequestTest {
new MetadataRequest.Builder(List(topic).asJava, true)
case ApiKeys.LIST_OFFSETS =>
val topic = new ListOffsetTopic()
.setName(tp.topic)
.setPartitions(List(new ListOffsetPartition()
.setPartitionIndex(tp.partition)
.setTimestamp(0L)
.setCurrentLeaderEpoch(15)).asJava)
ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(Map(tp -> new ListOffsetRequest.PartitionData(
0L, Optional.of[Integer](15))).asJava)
.setTargetTimes(List(topic).asJava)
case ApiKeys.LEADER_AND_ISR =>
new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, brokerId, Int.MaxValue, Long.MaxValue,