diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index c446be31e96..9acd341250e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -127,6 +127,10 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.MetadataRequestData; import org.apache.kafka.common.message.OffsetCommitRequestData; @@ -210,7 +214,6 @@ import org.apache.kafka.common.requests.ListGroupsRequest; import org.apache.kafka.common.requests.ListGroupsResponse; import org.apache.kafka.common.requests.ListOffsetRequest; import org.apache.kafka.common.requests.ListOffsetResponse; -import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData; import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest; import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse; import org.apache.kafka.common.requests.MetadataRequest; @@ -250,7 +253,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -2048,7 +2050,7 @@ public class KafkaAdminClient extends AdminClient { return new DescribeConfigsResult(new HashMap<>(brokerFutures.entrySet().stream() .flatMap(x -> x.getValue().entrySet().stream()) - .collect(Collectors.toMap(Entry::getKey, Entry::getValue)))); + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))); } private Config describeConfigResult(DescribeConfigsResponseData.DescribeConfigsResult describeConfigsResult) { @@ -2314,7 +2316,7 @@ public class KafkaAdminClient extends AdminClient { completeAllExceptionally( futures.entrySet().stream() .filter(entry -> entry.getKey().brokerId() == brokerId) - .map(Entry::getValue), + .map(Map.Entry::getValue), throwable); } }, now); @@ -2479,7 +2481,7 @@ public class KafkaAdminClient extends AdminClient { final CreatePartitionsOptions options) { final Map> futures = new HashMap<>(newPartitions.size()); final CreatePartitionsTopicCollection topics = new CreatePartitionsTopicCollection(newPartitions.size()); - for (Entry entry : newPartitions.entrySet()) { + for (Map.Entry entry : newPartitions.entrySet()) { final String topic = entry.getKey(); final NewPartitions newPartition = entry.getValue(); List> newAssignments = newPartition.assignments(); @@ -3941,7 +3943,7 @@ public class KafkaAdminClient extends AdminClient { MetadataResponse mr = context.response().orElseThrow(() -> new IllegalStateException("No Metadata response")); List calls = new ArrayList<>(); // grouping topic partitions per leader - Map> leaders = new HashMap<>(); + Map> leaders = new HashMap<>(); for (Map.Entry entry: topicPartitionOffsets.entrySet()) { @@ -3957,8 +3959,9 @@ public class KafkaAdminClient extends AdminClient { if (!mr.errors().containsKey(tp.topic())) { Node node = mr.cluster().leaderFor(tp); if (node != null) { - Map leadersOnNode = leaders.computeIfAbsent(node, k -> new HashMap<>()); - leadersOnNode.put(tp, new ListOffsetRequest.PartitionData(offsetQuery, Optional.empty())); + Map leadersOnNode = leaders.computeIfAbsent(node, k -> new HashMap()); + ListOffsetTopic topic = leadersOnNode.computeIfAbsent(tp.topic(), k -> new ListOffsetTopic().setName(tp.topic())); + topic.partitions().add(new ListOffsetPartition().setPartitionIndex(tp.partition()).setTimestamp(offsetQuery)); } else { future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception()); } @@ -3967,12 +3970,13 @@ public class KafkaAdminClient extends AdminClient { } } - for (final Map.Entry> entry: leaders.entrySet()) { + for (final Map.Entry> entry : leaders.entrySet()) { final int brokerId = entry.getKey().id(); - final Map partitionsToQuery = entry.getValue(); calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) { + final List partitionsToQuery = new ArrayList<>(entry.getValue().values()); + @Override ListOffsetRequest.Builder createRequest(int timeoutMs) { return ListOffsetRequest.Builder @@ -3985,25 +3989,38 @@ public class KafkaAdminClient extends AdminClient { ListOffsetResponse response = (ListOffsetResponse) abstractResponse; Map retryTopicPartitionOffsets = new HashMap<>(); - for (Entry result : response.responseData().entrySet()) { - TopicPartition tp = result.getKey(); - PartitionData partitionData = result.getValue(); - - KafkaFutureImpl future = futures.get(tp); - Errors error = partitionData.error; - OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp); - if (offsetRequestSpec == null) { - future.completeExceptionally(new KafkaException("Unexpected topic partition " + tp + " in broker response!")); - } else if (MetadataOperationContext.shouldRefreshMetadata(error)) { - retryTopicPartitionOffsets.put(tp, offsetRequestSpec); - } else if (error == Errors.NONE) { - future.complete(new ListOffsetsResultInfo(partitionData.offset, partitionData.timestamp, partitionData.leaderEpoch)); - } else { - future.completeExceptionally(error.exception()); + for (ListOffsetTopicResponse topic : response.topics()) { + for (ListOffsetPartitionResponse partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + KafkaFutureImpl future = futures.get(tp); + Errors error = Errors.forCode(partition.errorCode()); + OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp); + if (offsetRequestSpec == null) { + log.warn("Server response mentioned unknown topic partition {}", tp); + } else if (MetadataOperationContext.shouldRefreshMetadata(error)) { + retryTopicPartitionOffsets.put(tp, offsetRequestSpec); + } else if (error == Errors.NONE) { + Optional leaderEpoch = (partition.leaderEpoch() == ListOffsetResponse.UNKNOWN_EPOCH) + ? Optional.empty() + : Optional.of(partition.leaderEpoch()); + future.complete(new ListOffsetsResultInfo(partition.offset(), partition.timestamp(), leaderEpoch)); + } else { + future.completeExceptionally(error.exception()); + } } } - if (!retryTopicPartitionOffsets.isEmpty()) { + if (retryTopicPartitionOffsets.isEmpty()) { + // The server should send back a response for every topic partition. But do a sanity check anyway. + for (ListOffsetTopic topic : partitionsToQuery) { + for (ListOffsetPartition partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + ApiException error = new ApiException("The response from broker " + brokerId + + " did not contain a result for topic partition " + tp); + futures.get(tp).completeExceptionally(error); + } + } + } else { Set retryTopics = retryTopicPartitionOffsets.keySet().stream().map( TopicPartition::topic).collect(Collectors.toSet()); MetadataOperationContext retryContext = @@ -4014,9 +4031,12 @@ public class KafkaAdminClient extends AdminClient { @Override void handleFailure(Throwable throwable) { - for (TopicPartition tp : entry.getValue().keySet()) { - KafkaFutureImpl future = futures.get(tp); - future.completeExceptionally(throwable); + for (ListOffsetTopic topic : entry.getValue().values()) { + for (ListOffsetPartition partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + KafkaFutureImpl future = futures.get(tp); + future.completeExceptionally(throwable); + } } } }); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index b5a491e24c2..5581f25b5f4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -48,6 +48,9 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse; import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; @@ -615,7 +618,7 @@ public class Fetcher implements Closeable { // The first condition ensures that the completedFetches is not stuck with the same completedFetch // in cases such as the TopicAuthorizationException, and the second condition ensures that no // potential data loss due to an exception in a following record. - FetchResponse.PartitionData partition = records.partitionData; + FetchResponse.PartitionData partition = records.partitionData; if (fetched.isEmpty() && (partition.records() == null || partition.records().sizeInBytes() == 0)) { completedFetches.poll(); } @@ -730,11 +733,11 @@ public class Fetcher implements Closeable { } private void resetOffsetsAsync(Map partitionResetTimestamps) { - Map> timestampsToSearchByNode = + Map> timestampsToSearchByNode = groupListOffsetRequests(partitionResetTimestamps, new HashSet<>()); - for (Map.Entry> entry : timestampsToSearchByNode.entrySet()) { + for (Map.Entry> entry : timestampsToSearchByNode.entrySet()) { Node node = entry.getKey(); - final Map resetTimestamps = entry.getValue(); + final Map resetTimestamps = entry.getValue(); subscriptions.setNextAllowedRetry(resetTimestamps.keySet(), time.milliseconds() + requestTimeoutMs); RequestFuture future = sendListOffsetRequest(node, resetTimestamps, false); @@ -749,8 +752,8 @@ public class Fetcher implements Closeable { for (Map.Entry fetchedOffset : result.fetchedOffsets.entrySet()) { TopicPartition partition = fetchedOffset.getKey(); ListOffsetData offsetData = fetchedOffset.getValue(); - ListOffsetRequest.PartitionData requestedReset = resetTimestamps.get(partition); - resetOffsetIfNeeded(partition, timestampToOffsetResetStrategy(requestedReset.timestamp), offsetData); + ListOffsetPartition requestedReset = resetTimestamps.get(partition); + resetOffsetIfNeeded(partition, timestampToOffsetResetStrategy(requestedReset.timestamp()), offsetData); } } @@ -881,7 +884,7 @@ public class Fetcher implements Closeable { private RequestFuture sendListOffsetsRequests(final Map timestampsToSearch, final boolean requireTimestamps) { final Set partitionsToRetry = new HashSet<>(); - Map> timestampsToSearchByNode = + Map> timestampsToSearchByNode = groupListOffsetRequests(timestampsToSearch, partitionsToRetry); if (timestampsToSearchByNode.isEmpty()) return RequestFuture.failure(new StaleMetadataException()); @@ -890,7 +893,7 @@ public class Fetcher implements Closeable { final Map fetchedTimestampOffsets = new HashMap<>(); final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size()); - for (Map.Entry> entry : timestampsToSearchByNode.entrySet()) { + for (Map.Entry> entry : timestampsToSearchByNode.entrySet()) { RequestFuture future = sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps); future.addListener(new RequestFutureListener() { @@ -927,10 +930,10 @@ public class Fetcher implements Closeable { * @param partitionsToRetry A set of topic partitions that will be extended with partitions * that need metadata update or re-connect to the leader. */ - private Map> groupListOffsetRequests( + private Map> groupListOffsetRequests( Map timestampsToSearch, Set partitionsToRetry) { - final Map partitionDataMap = new HashMap<>(); + final Map partitionDataMap = new HashMap<>(); for (Map.Entry entry: timestampsToSearch.entrySet()) { TopicPartition tp = entry.getKey(); Long offset = entry.getValue(); @@ -952,7 +955,11 @@ public class Fetcher implements Closeable { leader, tp); partitionsToRetry.add(tp); } else { - partitionDataMap.put(tp, new ListOffsetRequest.PartitionData(offset, leaderAndEpoch.epoch)); + int currentLeaderEpoch = leaderAndEpoch.epoch.orElse(ListOffsetResponse.UNKNOWN_EPOCH); + partitionDataMap.put(tp, new ListOffsetPartition() + .setPartitionIndex(tp.partition()) + .setTimestamp(offset) + .setCurrentLeaderEpoch(currentLeaderEpoch)); } } } @@ -968,11 +975,11 @@ public class Fetcher implements Closeable { * @return A response which can be polled to obtain the corresponding timestamps and offsets. */ private RequestFuture sendListOffsetRequest(final Node node, - final Map timestampsToSearch, + final Map timestampsToSearch, boolean requireTimestamp) { ListOffsetRequest.Builder builder = ListOffsetRequest.Builder .forConsumer(requireTimestamp, isolationLevel) - .setTargetTimes(timestampsToSearch); + .setTargetTimes(ListOffsetRequest.toListOffsetTopics(timestampsToSearch)); log.debug("Sending ListOffsetRequest {} to broker {}", builder, node); return client.send(node, builder) @@ -981,14 +988,13 @@ public class Fetcher implements Closeable { public void onSuccess(ClientResponse response, RequestFuture future) { ListOffsetResponse lor = (ListOffsetResponse) response.responseBody(); log.trace("Received ListOffsetResponse {} from broker {}", lor, node); - handleListOffsetResponse(timestampsToSearch, lor, future); + handleListOffsetResponse(lor, future); } }); } /** * Callback for the response of the list offset call above. - * @param timestampsToSearch The mapping from partitions to target timestamps * @param listOffsetResponse The response from the server. * @param future The future to be completed when the response returns. Note that any partition-level errors will * generally fail the entire future result. The one exception is UNSUPPORTED_FOR_MESSAGE_FORMAT, @@ -997,77 +1003,78 @@ public class Fetcher implements Closeable { * value of each partition may be null only for v0. In v1 and later the ListOffset API would not * return a null timestamp (-1 is returned instead when necessary). */ - private void handleListOffsetResponse(Map timestampsToSearch, - ListOffsetResponse listOffsetResponse, + private void handleListOffsetResponse(ListOffsetResponse listOffsetResponse, RequestFuture future) { Map fetchedOffsets = new HashMap<>(); Set partitionsToRetry = new HashSet<>(); Set unauthorizedTopics = new HashSet<>(); - for (Map.Entry entry : timestampsToSearch.entrySet()) { - TopicPartition topicPartition = entry.getKey(); - ListOffsetResponse.PartitionData partitionData = listOffsetResponse.responseData().get(topicPartition); - Errors error = partitionData.error; - switch (error) { - case NONE: - if (partitionData.offsets != null) { - // Handle v0 response - long offset; - if (partitionData.offsets.size() > 1) { - future.raise(new IllegalStateException("Unexpected partitionData response of length " + - partitionData.offsets.size())); - return; - } else if (partitionData.offsets.isEmpty()) { - offset = ListOffsetResponse.UNKNOWN_OFFSET; + for (ListOffsetTopicResponse topic : listOffsetResponse.topics()) { + for (ListOffsetPartitionResponse partition : topic.partitions()) { + TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex()); + Errors error = Errors.forCode(partition.errorCode()); + switch (error) { + case NONE: + if (!partition.oldStyleOffsets().isEmpty()) { + // Handle v0 response with offsets + long offset; + if (partition.oldStyleOffsets().size() > 1) { + future.raise(new IllegalStateException("Unexpected partitionData response of length " + + partition.oldStyleOffsets().size())); + return; + } else { + offset = partition.oldStyleOffsets().get(0); + } + log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}", + topicPartition, offset); + if (offset != ListOffsetResponse.UNKNOWN_OFFSET) { + ListOffsetData offsetData = new ListOffsetData(offset, null, Optional.empty()); + fetchedOffsets.put(topicPartition, offsetData); + } } else { - offset = partitionData.offsets.get(0); + // Handle v1 and later response or v0 without offsets + log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}", + topicPartition, partition.offset(), partition.timestamp()); + if (partition.offset() != ListOffsetResponse.UNKNOWN_OFFSET) { + Optional leaderEpoch = (partition.leaderEpoch() == ListOffsetResponse.UNKNOWN_EPOCH) + ? Optional.empty() + : Optional.of(partition.leaderEpoch()); + ListOffsetData offsetData = new ListOffsetData(partition.offset(), partition.timestamp(), + leaderEpoch); + fetchedOffsets.put(topicPartition, offsetData); + } } - log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}", - topicPartition, offset); - if (offset != ListOffsetResponse.UNKNOWN_OFFSET) { - ListOffsetData offsetData = new ListOffsetData(offset, null, Optional.empty()); - fetchedOffsets.put(topicPartition, offsetData); - } - } else { - // Handle v1 and later response - log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}", - topicPartition, partitionData.offset, partitionData.timestamp); - if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) { - ListOffsetData offsetData = new ListOffsetData(partitionData.offset, partitionData.timestamp, - partitionData.leaderEpoch); - fetchedOffsets.put(topicPartition, offsetData); - } - } - break; - case UNSUPPORTED_FOR_MESSAGE_FORMAT: - // The message format on the broker side is before 0.10.0, which means it does not - // support timestamps. We treat this case the same as if we weren't able to find an - // offset corresponding to the requested timestamp and leave it out of the result. - log.debug("Cannot search by timestamp for partition {} because the message format version " + - "is before 0.10.0", topicPartition); - break; - case NOT_LEADER_OR_FOLLOWER: - case REPLICA_NOT_AVAILABLE: - case KAFKA_STORAGE_ERROR: - case OFFSET_NOT_AVAILABLE: - case LEADER_NOT_AVAILABLE: - case FENCED_LEADER_EPOCH: - case UNKNOWN_LEADER_EPOCH: - log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.", - topicPartition, error); - partitionsToRetry.add(topicPartition); - break; - case UNKNOWN_TOPIC_OR_PARTITION: - log.warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition); - partitionsToRetry.add(topicPartition); - break; - case TOPIC_AUTHORIZATION_FAILED: - unauthorizedTopics.add(topicPartition.topic()); - break; - default: - log.warn("Attempt to fetch offsets for partition {} failed due to unexpected exception: {}, retrying.", - topicPartition, error.message()); - partitionsToRetry.add(topicPartition); + break; + case UNSUPPORTED_FOR_MESSAGE_FORMAT: + // The message format on the broker side is before 0.10.0, which means it does not + // support timestamps. We treat this case the same as if we weren't able to find an + // offset corresponding to the requested timestamp and leave it out of the result. + log.debug("Cannot search by timestamp for partition {} because the message format version " + + "is before 0.10.0", topicPartition); + break; + case NOT_LEADER_OR_FOLLOWER: + case REPLICA_NOT_AVAILABLE: + case KAFKA_STORAGE_ERROR: + case OFFSET_NOT_AVAILABLE: + case LEADER_NOT_AVAILABLE: + case FENCED_LEADER_EPOCH: + case UNKNOWN_LEADER_EPOCH: + log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.", + topicPartition, error); + partitionsToRetry.add(topicPartition); + break; + case UNKNOWN_TOPIC_OR_PARTITION: + log.warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition); + partitionsToRetry.add(topicPartition); + break; + case TOPIC_AUTHORIZATION_FAILED: + unauthorizedTopics.add(topicPartition.topic()); + break; + default: + log.warn("Attempt to fetch offsets for partition {} failed due to unexpected exception: {}, retrying.", + topicPartition, error.message()); + partitionsToRetry.add(topicPartition); + } } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index fb026b04a7c..804e843b0cf 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -95,6 +95,8 @@ import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.ListOffsetRequestData; +import org.apache.kafka.common.message.ListOffsetResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.MetadataRequestData; @@ -128,8 +130,6 @@ import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Type; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.requests.ListOffsetRequest; -import org.apache.kafka.common.requests.ListOffsetResponse; import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest; import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; import org.apache.kafka.common.requests.ProduceRequest; @@ -153,7 +153,7 @@ import static org.apache.kafka.common.protocol.types.Type.RECORDS; public enum ApiKeys { PRODUCE(0, "Produce", ProduceRequest.schemaVersions(), ProduceResponse.schemaVersions()), FETCH(1, "Fetch", FetchRequestData.SCHEMAS, FetchResponseData.SCHEMAS), - LIST_OFFSETS(2, "ListOffsets", ListOffsetRequest.schemaVersions(), ListOffsetResponse.schemaVersions()), + LIST_OFFSETS(2, "ListOffsets", ListOffsetRequestData.SCHEMAS, ListOffsetResponseData.SCHEMAS), METADATA(3, "Metadata", MetadataRequestData.SCHEMAS, MetadataResponseData.SCHEMAS), LEADER_AND_ISR(4, "LeaderAndIsr", true, LeaderAndIsrRequestData.SCHEMAS, LeaderAndIsrResponseData.SCHEMAS), STOP_REPLICA(5, "StopReplica", true, StopReplicaRequestData.SCHEMAS, StopReplicaResponseData.SCHEMAS), diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 51fc771f889..2da1857843d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -92,7 +92,7 @@ public abstract class AbstractResponse implements AbstractRequestResponse { case FETCH: return new FetchResponse<>(new FetchResponseData(struct, version)); case LIST_OFFSETS: - return new ListOffsetResponse(struct); + return new ListOffsetResponse(struct, version); case METADATA: return new MetadataResponse(struct, version); case OFFSET_COMMIT: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index 94213efb2b0..07f2998a4b4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -16,15 +16,6 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.IsolationLevel; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.CollectionUtils; - import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -32,13 +23,19 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Optional; import java.util.Set; -import static org.apache.kafka.common.protocol.CommonFields.CURRENT_LEADER_EPOCH; -import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ListOffsetRequestData; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic; +import org.apache.kafka.common.message.ListOffsetResponseData; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; public class ListOffsetRequest extends AbstractRequest { public static final long EARLIEST_TIMESTAMP = -2L; @@ -47,96 +44,11 @@ public class ListOffsetRequest extends AbstractRequest { public static final int CONSUMER_REPLICA_ID = -1; public static final int DEBUGGING_REPLICA_ID = -2; - // top level fields - private static final Field.Int32 REPLICA_ID = new Field.Int32("replica_id", - "Broker id of the follower. For normal consumers, use -1."); - private static final Field.Int8 ISOLATION_LEVEL = new Field.Int8("isolation_level", - "This setting controls the visibility of transactional records. " + - "Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED " + - "(isolation_level = 1), non-transactional and COMMITTED transactional records are visible. " + - "To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current " + - "LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the " + - "result, which allows consumers to discard ABORTED transactional records"); - private static final Field.ComplexArray TOPICS = new Field.ComplexArray("topics", - "Topics to list offsets."); - - // topic level fields - private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partitions", - "Partitions to list offsets."); - - // partition level fields - private static final Field.Int64 TIMESTAMP = new Field.Int64("timestamp", - "The target timestamp for the partition."); - private static final Field.Int32 MAX_NUM_OFFSETS = new Field.Int32("max_num_offsets", - "Maximum offsets to return."); - - private static final Field PARTITIONS_V0 = PARTITIONS.withFields( - PARTITION_ID, - TIMESTAMP, - MAX_NUM_OFFSETS); - - private static final Field TOPICS_V0 = TOPICS.withFields( - TOPIC_NAME, - PARTITIONS_V0); - - private static final Schema LIST_OFFSET_REQUEST_V0 = new Schema( - REPLICA_ID, - TOPICS_V0); - - // V1 removes max_num_offsets - private static final Field PARTITIONS_V1 = PARTITIONS.withFields( - PARTITION_ID, - TIMESTAMP); - - private static final Field TOPICS_V1 = TOPICS.withFields( - TOPIC_NAME, - PARTITIONS_V1); - - private static final Schema LIST_OFFSET_REQUEST_V1 = new Schema( - REPLICA_ID, - TOPICS_V1); - - // V2 adds a field for the isolation level - private static final Schema LIST_OFFSET_REQUEST_V2 = new Schema( - REPLICA_ID, - ISOLATION_LEVEL, - TOPICS_V1); - - // V3 bump used to indicate that on quota violation brokers send out responses before throttling. - private static final Schema LIST_OFFSET_REQUEST_V3 = LIST_OFFSET_REQUEST_V2; - - // V4 introduces the current leader epoch, which is used for fencing - private static final Field PARTITIONS_V4 = PARTITIONS.withFields( - PARTITION_ID, - CURRENT_LEADER_EPOCH, - TIMESTAMP); - - private static final Field TOPICS_V4 = TOPICS.withFields( - TOPIC_NAME, - PARTITIONS_V4); - - private static final Schema LIST_OFFSET_REQUEST_V4 = new Schema( - REPLICA_ID, - ISOLATION_LEVEL, - TOPICS_V4); - - // V5 bump to include new possible error code (OFFSET_NOT_AVAILABLE) - private static final Schema LIST_OFFSET_REQUEST_V5 = LIST_OFFSET_REQUEST_V4; - - public static Schema[] schemaVersions() { - return new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2, - LIST_OFFSET_REQUEST_V3, LIST_OFFSET_REQUEST_V4, LIST_OFFSET_REQUEST_V5}; - } - - private final int replicaId; - private final IsolationLevel isolationLevel; - private final Map partitionTimestamps; + private final ListOffsetRequestData data; private final Set duplicatePartitions; public static class Builder extends AbstractRequest.Builder { - private final int replicaId; - private final IsolationLevel isolationLevel; - private Map partitionTimestamps = new HashMap<>(); + private final ListOffsetRequestData data; public static Builder forReplica(short allowedVersion, int replicaId) { return new Builder((short) 0, allowedVersion, replicaId, IsolationLevel.READ_UNCOMMITTED); @@ -156,145 +68,95 @@ public class ListOffsetRequest extends AbstractRequest { int replicaId, IsolationLevel isolationLevel) { super(ApiKeys.LIST_OFFSETS, oldestAllowedVersion, latestAllowedVersion); - this.replicaId = replicaId; - this.isolationLevel = isolationLevel; + data = new ListOffsetRequestData() + .setIsolationLevel(isolationLevel.id()) + .setReplicaId(replicaId); } - public Builder setTargetTimes(Map partitionTimestamps) { - this.partitionTimestamps = partitionTimestamps; + public Builder setTargetTimes(List topics) { + data.setTopics(topics); return this; } @Override public ListOffsetRequest build(short version) { - return new ListOffsetRequest(replicaId, partitionTimestamps, isolationLevel, version); + return new ListOffsetRequest(version, data); } @Override public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("(type=ListOffsetRequest") - .append(", replicaId=").append(replicaId); - if (partitionTimestamps != null) { - bld.append(", partitionTimestamps=").append(partitionTimestamps); - } - bld.append(", isolationLevel=").append(isolationLevel); - bld.append(")"); - return bld.toString(); - } - } - - public static final class PartitionData { - public final long timestamp; - public final int maxNumOffsets; // only supported in v0 - public final Optional currentLeaderEpoch; - - private PartitionData(long timestamp, int maxNumOffsets, Optional currentLeaderEpoch) { - this.timestamp = timestamp; - this.maxNumOffsets = maxNumOffsets; - this.currentLeaderEpoch = currentLeaderEpoch; - } - - // For V0 - public PartitionData(long timestamp, int maxNumOffsets) { - this(timestamp, maxNumOffsets, Optional.empty()); - } - - public PartitionData(long timestamp, Optional currentLeaderEpoch) { - this(timestamp, 1, currentLeaderEpoch); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof PartitionData)) return false; - PartitionData other = (PartitionData) obj; - return this.timestamp == other.timestamp && - this.currentLeaderEpoch.equals(other.currentLeaderEpoch); - } - - @Override - public int hashCode() { - return Objects.hash(timestamp, currentLeaderEpoch); - } - - @Override - public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("{timestamp: ").append(timestamp). - append(", maxNumOffsets: ").append(maxNumOffsets). - append(", currentLeaderEpoch: ").append(currentLeaderEpoch). - append("}"); - return bld.toString(); + return data.toString(); } } /** * Private constructor with a specified version. */ - private ListOffsetRequest(int replicaId, - Map targetTimes, - IsolationLevel isolationLevel, - short version) { + private ListOffsetRequest(short version, ListOffsetRequestData data) { super(ApiKeys.LIST_OFFSETS, version); - this.replicaId = replicaId; - this.isolationLevel = isolationLevel; - this.partitionTimestamps = targetTimes; + this.data = data; this.duplicatePartitions = Collections.emptySet(); } public ListOffsetRequest(Struct struct, short version) { super(ApiKeys.LIST_OFFSETS, version); - Set duplicatePartitions = new HashSet<>(); - replicaId = struct.get(REPLICA_ID); - isolationLevel = struct.hasField(ISOLATION_LEVEL) ? - IsolationLevel.forId(struct.get(ISOLATION_LEVEL)) : - IsolationLevel.READ_UNCOMMITTED; - partitionTimestamps = new HashMap<>(); - for (Object topicResponseObj : struct.get(TOPICS)) { - Struct topicResponse = (Struct) topicResponseObj; - String topic = topicResponse.get(TOPIC_NAME); - for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) { - Struct partitionResponse = (Struct) partitionResponseObj; - int partition = partitionResponse.get(PARTITION_ID); - long timestamp = partitionResponse.get(TIMESTAMP); - TopicPartition tp = new TopicPartition(topic, partition); - - int maxNumOffsets = partitionResponse.getOrElse(MAX_NUM_OFFSETS, 1); - Optional currentLeaderEpoch = RequestUtils.getLeaderEpoch(partitionResponse, CURRENT_LEADER_EPOCH); - PartitionData partitionData = new PartitionData(timestamp, maxNumOffsets, currentLeaderEpoch); - if (partitionTimestamps.put(tp, partitionData) != null) + data = new ListOffsetRequestData(struct, version); + duplicatePartitions = new HashSet<>(); + Set partitions = new HashSet<>(); + for (ListOffsetTopic topic : data.topics()) { + for (ListOffsetPartition partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + if (!partitions.add(tp)) { duplicatePartitions.add(tp); + } } } - this.duplicatePartitions = duplicatePartitions; } @Override - @SuppressWarnings("deprecation") public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - Map responseData = new HashMap<>(); short versionId = version(); + short errorCode = Errors.forException(e).code(); - ListOffsetResponse.PartitionData partitionError = versionId == 0 ? - new ListOffsetResponse.PartitionData(Errors.forException(e), Collections.emptyList()) : - new ListOffsetResponse.PartitionData(Errors.forException(e), -1L, -1L, Optional.empty()); - for (TopicPartition partition : partitionTimestamps.keySet()) { - responseData.put(partition, partitionError); + List responses = new ArrayList<>(); + for (ListOffsetTopic topic : data.topics()) { + ListOffsetTopicResponse topicResponse = new ListOffsetTopicResponse().setName(topic.name()); + List partitions = new ArrayList<>(); + for (ListOffsetPartition partition : topic.partitions()) { + ListOffsetPartitionResponse partitionResponse = new ListOffsetPartitionResponse() + .setErrorCode(errorCode) + .setPartitionIndex(partition.partitionIndex()); + if (versionId == 0) { + partitionResponse.setOldStyleOffsets(Collections.emptyList()); + } else { + partitionResponse.setOffset(ListOffsetResponse.UNKNOWN_OFFSET) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP); + } + partitions.add(partitionResponse); + } + topicResponse.setPartitions(partitions); + responses.add(topicResponse); } + ListOffsetResponseData responseData = new ListOffsetResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setTopics(responses); + return new ListOffsetResponse(responseData); + } - return new ListOffsetResponse(throttleTimeMs, responseData); + public ListOffsetRequestData data() { + return data; } public int replicaId() { - return replicaId; + return data.replicaId(); } public IsolationLevel isolationLevel() { - return isolationLevel; + return IsolationLevel.forId(data.isolationLevel()); } - public Map partitionTimestamps() { - return partitionTimestamps; + public List topics() { + return data.topics(); } public Set duplicatePartitions() { @@ -307,32 +169,25 @@ public class ListOffsetRequest extends AbstractRequest { @Override protected Struct toStruct() { - short version = version(); - Struct struct = new Struct(ApiKeys.LIST_OFFSETS.requestSchema(version)); - Map> topicsData = CollectionUtils.groupPartitionDataByTopic(partitionTimestamps); + return data.toStruct(version()); + } - struct.set(REPLICA_ID, replicaId); - struct.setIfExists(ISOLATION_LEVEL, isolationLevel.id()); - - List topicArray = new ArrayList<>(); - for (Map.Entry> topicEntry: topicsData.entrySet()) { - Struct topicData = struct.instance(TOPICS); - topicData.set(TOPIC_NAME, topicEntry.getKey()); - List partitionArray = new ArrayList<>(); - for (Map.Entry partitionEntry : topicEntry.getValue().entrySet()) { - PartitionData offsetPartitionData = partitionEntry.getValue(); - Struct partitionData = topicData.instance(PARTITIONS); - partitionData.set(PARTITION_ID, partitionEntry.getKey()); - partitionData.set(TIMESTAMP, offsetPartitionData.timestamp); - partitionData.setIfExists(MAX_NUM_OFFSETS, offsetPartitionData.maxNumOffsets); - RequestUtils.setLeaderEpochIfExists(partitionData, CURRENT_LEADER_EPOCH, - offsetPartitionData.currentLeaderEpoch); - partitionArray.add(partitionData); - } - topicData.set(PARTITIONS, partitionArray.toArray()); - topicArray.add(topicData); + public static List toListOffsetTopics(Map timestampsToSearch) { + Map topics = new HashMap<>(); + for (Map.Entry entry : timestampsToSearch.entrySet()) { + TopicPartition tp = entry.getKey(); + ListOffsetTopic topic = topics.computeIfAbsent(tp.topic(), k -> new ListOffsetTopic().setName(tp.topic())); + topic.partitions().add(entry.getValue()); } - struct.set(TOPICS, topicArray.toArray()); - return struct; + return new ArrayList<>(topics.values()); + } + + public static ListOffsetTopic singletonRequestData(String topic, int partitionIndex, long timestamp, int maxNumOffsets) { + return new ListOffsetTopic() + .setName(topic) + .setPartitions(Collections.singletonList(new ListOffsetPartition() + .setPartitionIndex(partitionIndex) + .setTimestamp(timestamp) + .setMaxNumOffsets(maxNumOffsets))); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java index 3fe14b0c271..dd941d742ab 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java @@ -16,28 +16,20 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.CollectionUtils; -import org.apache.kafka.common.utils.Utils; - import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.LEADER_EPOCH; -import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.types.Type.INT64; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ListOffsetResponseData; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.RecordBatch; /** * Possible error codes: @@ -58,243 +50,69 @@ import static org.apache.kafka.common.protocol.types.Type.INT64; public class ListOffsetResponse extends AbstractResponse { public static final long UNKNOWN_TIMESTAMP = -1L; public static final long UNKNOWN_OFFSET = -1L; + public static final int UNKNOWN_EPOCH = RecordBatch.NO_PARTITION_LEADER_EPOCH; - // top level fields - private static final Field.ComplexArray TOPICS = new Field.ComplexArray("responses", - "The listed offsets by topic"); + private final ListOffsetResponseData data; - // topic level fields - private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partition_responses", - "The listed offsets by partition"); - - // partition level fields - // This key is only used by ListOffsetResponse v0 - @Deprecated - private static final Field.Array OFFSETS = new Field.Array("offsets", INT64, "A list of offsets."); - private static final Field.Int64 TIMESTAMP = new Field.Int64("timestamp", - "The timestamp associated with the returned offset"); - private static final Field.Int64 OFFSET = new Field.Int64("offset", - "The offset found"); - - private static final Field PARTITIONS_V0 = PARTITIONS.withFields( - PARTITION_ID, - ERROR_CODE, - OFFSETS); - - private static final Field TOPICS_V0 = TOPICS.withFields( - TOPIC_NAME, - PARTITIONS_V0); - - private static final Schema LIST_OFFSET_RESPONSE_V0 = new Schema( - TOPICS_V0); - - // V1 bumped for the removal of the offsets array - private static final Field PARTITIONS_V1 = PARTITIONS.withFields( - PARTITION_ID, - ERROR_CODE, - TIMESTAMP, - OFFSET); - - private static final Field TOPICS_V1 = TOPICS.withFields( - TOPIC_NAME, - PARTITIONS_V1); - - private static final Schema LIST_OFFSET_RESPONSE_V1 = new Schema( - TOPICS_V1); - - // V2 bumped for the addition of the throttle time - private static final Schema LIST_OFFSET_RESPONSE_V2 = new Schema( - THROTTLE_TIME_MS, - TOPICS_V1); - - // V3 bumped to indicate that on quota violation brokers send out responses before throttling. - private static final Schema LIST_OFFSET_RESPONSE_V3 = LIST_OFFSET_RESPONSE_V2; - - // V4 bumped for the addition of the current leader epoch in the request schema and the - // leader epoch in the response partition data - private static final Field PARTITIONS_V4 = PARTITIONS.withFields( - PARTITION_ID, - ERROR_CODE, - TIMESTAMP, - OFFSET, - LEADER_EPOCH); - - private static final Field TOPICS_V4 = TOPICS.withFields( - TOPIC_NAME, - PARTITIONS_V4); - - private static final Schema LIST_OFFSET_RESPONSE_V4 = new Schema( - THROTTLE_TIME_MS, - TOPICS_V4); - - private static final Schema LIST_OFFSET_RESPONSE_V5 = LIST_OFFSET_RESPONSE_V4; - - public static Schema[] schemaVersions() { - return new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2, - LIST_OFFSET_RESPONSE_V3, LIST_OFFSET_RESPONSE_V4, LIST_OFFSET_RESPONSE_V5}; + public ListOffsetResponse(ListOffsetResponseData data) { + this.data = data; } - public static final class PartitionData { - public final Errors error; - // The offsets list is only used in ListOffsetResponse v0. - public final List offsets; - public final Long timestamp; - public final Long offset; - public final Optional leaderEpoch; - - /** - * Constructor for ListOffsetResponse v0 - */ - public PartitionData(Errors error, List offsets) { - this.error = error; - this.offsets = offsets; - this.timestamp = null; - this.offset = null; - this.leaderEpoch = Optional.empty(); - } - - /** - * Constructor for ListOffsetResponse v1 - */ - public PartitionData(Errors error, long timestamp, long offset, Optional leaderEpoch) { - this.error = error; - this.timestamp = timestamp; - this.offset = offset; - this.offsets = null; - this.leaderEpoch = leaderEpoch; - } - - @Override - public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("PartitionData("). - append("errorCode: ").append(error.code()); - - if (offsets == null) { - bld.append(", timestamp: ").append(timestamp). - append(", offset: ").append(offset). - append(", leaderEpoch: ").append(leaderEpoch); - } else { - bld.append(", offsets: "). - append("["). - append(Utils.join(this.offsets, ",")). - append("]"); - } - bld.append(")"); - return bld.toString(); - } - } - - private final int throttleTimeMs; - private final Map responseData; - - /** - * Constructor for all versions without throttle time - */ - public ListOffsetResponse(Map responseData) { - this(DEFAULT_THROTTLE_TIME, responseData); - } - - public ListOffsetResponse(int throttleTimeMs, Map responseData) { - this.throttleTimeMs = throttleTimeMs; - this.responseData = responseData; - } - - public ListOffsetResponse(Struct struct) { - this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); - responseData = new HashMap<>(); - for (Object topicResponseObj : struct.get(TOPICS)) { - Struct topicResponse = (Struct) topicResponseObj; - String topic = topicResponse.get(TOPIC_NAME); - for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) { - Struct partitionResponse = (Struct) partitionResponseObj; - int partition = partitionResponse.get(PARTITION_ID); - Errors error = Errors.forCode(partitionResponse.get(ERROR_CODE)); - PartitionData partitionData; - if (partitionResponse.hasField(OFFSETS)) { - Object[] offsets = partitionResponse.get(OFFSETS); - List offsetsList = new ArrayList<>(); - for (Object offset : offsets) - offsetsList.add((Long) offset); - partitionData = new PartitionData(error, offsetsList); - } else { - long timestamp = partitionResponse.get(TIMESTAMP); - long offset = partitionResponse.get(OFFSET); - Optional leaderEpoch = RequestUtils.getLeaderEpoch(partitionResponse, LEADER_EPOCH); - partitionData = new PartitionData(error, timestamp, offset, leaderEpoch); - } - responseData.put(new TopicPartition(topic, partition), partitionData); - } - } + public ListOffsetResponse(Struct struct, short version) { + data = new ListOffsetResponseData(struct, version); } @Override public int throttleTimeMs() { - return throttleTimeMs; + return data.throttleTimeMs(); } - public Map responseData() { - return responseData; + public ListOffsetResponseData data() { + return data; + } + + public List topics() { + return data.topics(); } @Override public Map errorCounts() { Map errorCounts = new HashMap<>(); - responseData.values().forEach(response -> - updateErrorCounts(errorCounts, response.error) + topics().forEach(topic -> + topic.partitions().forEach(partition -> + updateErrorCounts(errorCounts, Errors.forCode(partition.errorCode())) + ) ); return errorCounts; } public static ListOffsetResponse parse(ByteBuffer buffer, short version) { - return new ListOffsetResponse(ApiKeys.LIST_OFFSETS.parseResponse(version, buffer)); + return new ListOffsetResponse(ApiKeys.LIST_OFFSETS.parseResponse(version, buffer), version); } @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.LIST_OFFSETS.responseSchema(version)); - struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); - Map> topicsData = CollectionUtils.groupPartitionDataByTopic(responseData); - - List topicArray = new ArrayList<>(); - for (Map.Entry> topicEntry: topicsData.entrySet()) { - Struct topicData = struct.instance(TOPICS); - topicData.set(TOPIC_NAME, topicEntry.getKey()); - List partitionArray = new ArrayList<>(); - for (Map.Entry partitionEntry : topicEntry.getValue().entrySet()) { - PartitionData offsetPartitionData = partitionEntry.getValue(); - Struct partitionData = topicData.instance(PARTITIONS); - partitionData.set(PARTITION_ID, partitionEntry.getKey()); - partitionData.set(ERROR_CODE, offsetPartitionData.error.code()); - if (version == 0) { - partitionData.set(OFFSETS, offsetPartitionData.offsets.toArray()); - } else { - partitionData.set(TIMESTAMP, offsetPartitionData.timestamp); - partitionData.set(OFFSET, offsetPartitionData.offset); - RequestUtils.setLeaderEpochIfExists(partitionData, LEADER_EPOCH, offsetPartitionData.leaderEpoch); - } - partitionArray.add(partitionData); - } - topicData.set(PARTITIONS, partitionArray.toArray()); - topicArray.add(topicData); - } - struct.set(TOPICS, topicArray.toArray()); - - return struct; + return data.toStruct(version); } @Override public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("(type=ListOffsetResponse") - .append(", throttleTimeMs=").append(throttleTimeMs) - .append(", responseData=").append(responseData) - .append(")"); - return bld.toString(); + return data.toString(); } @Override public boolean shouldClientThrottle(short version) { return version >= 3; } + + public static ListOffsetTopicResponse singletonListOffsetTopicResponse(TopicPartition tp, Errors error, long timestamp, long offset, int epoch) { + return new ListOffsetTopicResponse() + .setName(tp.topic()) + .setPartitions(Collections.singletonList(new ListOffsetPartitionResponse() + .setPartitionIndex(tp.partition()) + .setErrorCode(error.code()) + .setTimestamp(timestamp) + .setOffset(offset) + .setLeaderEpoch(epoch))); + } } diff --git a/clients/src/main/resources/common/message/ListOffsetRequest.json b/clients/src/main/resources/common/message/ListOffsetRequest.json index 2e12b8b2b7d..259d7bf1da2 100644 --- a/clients/src/main/resources/common/message/ListOffsetRequest.json +++ b/clients/src/main/resources/common/message/ListOffsetRequest.json @@ -42,7 +42,7 @@ "about": "Each partition in the request.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, - { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", + { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", "default": "-1", "about": "The current leader epoch." }, { "name": "Timestamp", "type": "int64", "versions": "0+", "about": "The current timestamp." }, diff --git a/clients/src/main/resources/common/message/ListOffsetResponse.json b/clients/src/main/resources/common/message/ListOffsetResponse.json index 4037e1af80b..e9b06441ac4 100644 --- a/clients/src/main/resources/common/message/ListOffsetResponse.json +++ b/clients/src/main/resources/common/message/ListOffsetResponse.json @@ -48,7 +48,7 @@ "about": "The timestamp associated with the returned offset." }, { "name": "Offset", "type": "int64", "versions": "1+", "default": "-1", "ignorable": false, "about": "The returned offset." }, - { "name": "LeaderEpoch", "type": "int32", "versions": "4+" } + { "name": "LeaderEpoch", "type": "int32", "versions": "4+", "default": "-1" } ]} ]} ] diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index e3c1b5071b5..80d52947cd3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -101,6 +101,8 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.ListOffsetResponseData; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic; import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition; @@ -144,7 +146,6 @@ import org.apache.kafka.common.requests.LeaveGroupResponse; import org.apache.kafka.common.requests.ListGroupsRequest; import org.apache.kafka.common.requests.ListGroupsResponse; import org.apache.kafka.common.requests.ListOffsetResponse; -import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData; import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; @@ -3737,41 +3738,43 @@ public class KafkaAdminClientTest { Collections.emptySet(), node0); - final TopicPartition tp1 = new TopicPartition("foo", 0); - final TopicPartition tp2 = new TopicPartition("bar", 0); - final TopicPartition tp3 = new TopicPartition("baz", 0); + final TopicPartition tp0 = new TopicPartition("foo", 0); + final TopicPartition tp1 = new TopicPartition("bar", 0); + final TopicPartition tp2 = new TopicPartition("baz", 0); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); - Map responseData = new HashMap<>(); - responseData.put(tp1, new PartitionData(Errors.NONE, -1L, 123L, Optional.of(321))); - responseData.put(tp2, new PartitionData(Errors.NONE, -1L, 234L, Optional.of(432))); - responseData.put(tp3, new PartitionData(Errors.NONE, 123456789L, 345L, Optional.of(543))); + ListOffsetTopicResponse t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.NONE, -1L, 123L, 321); + ListOffsetTopicResponse t1 = ListOffsetResponse.singletonListOffsetTopicResponse(tp1, Errors.NONE, -1L, 234L, 432); + ListOffsetTopicResponse t2 = ListOffsetResponse.singletonListOffsetTopicResponse(tp2, Errors.NONE, 123456789L, 345L, 543); + ListOffsetResponseData responseData = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(t0, t1, t2)); env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData)); Map partitions = new HashMap<>(); - partitions.put(tp1, OffsetSpec.latest()); - partitions.put(tp2, OffsetSpec.earliest()); - partitions.put(tp3, OffsetSpec.forTimestamp(System.currentTimeMillis())); + partitions.put(tp0, OffsetSpec.latest()); + partitions.put(tp1, OffsetSpec.earliest()); + partitions.put(tp2, OffsetSpec.forTimestamp(System.currentTimeMillis())); ListOffsetsResult result = env.adminClient().listOffsets(partitions); Map offsets = result.all().get(); assertFalse(offsets.isEmpty()); - assertEquals(123L, offsets.get(tp1).offset()); - assertEquals(321, offsets.get(tp1).leaderEpoch().get().intValue()); + assertEquals(123L, offsets.get(tp0).offset()); + assertEquals(321, offsets.get(tp0).leaderEpoch().get().intValue()); + assertEquals(-1L, offsets.get(tp0).timestamp()); + assertEquals(234L, offsets.get(tp1).offset()); + assertEquals(432, offsets.get(tp1).leaderEpoch().get().intValue()); assertEquals(-1L, offsets.get(tp1).timestamp()); - assertEquals(234L, offsets.get(tp2).offset()); - assertEquals(432, offsets.get(tp2).leaderEpoch().get().intValue()); - assertEquals(-1L, offsets.get(tp2).timestamp()); - assertEquals(345L, offsets.get(tp3).offset()); - assertEquals(543, offsets.get(tp3).leaderEpoch().get().intValue()); - assertEquals(123456789L, offsets.get(tp3).timestamp()); + assertEquals(345L, offsets.get(tp2).offset()); + assertEquals(543, offsets.get(tp2).leaderEpoch().get().intValue()); + assertEquals(123456789L, offsets.get(tp2).timestamp()); + assertEquals(offsets.get(tp0), result.partitionResult(tp0).get()); assertEquals(offsets.get(tp1), result.partitionResult(tp1).get()); assertEquals(offsets.get(tp2), result.partitionResult(tp2).get()); - assertEquals(offsets.get(tp3), result.partitionResult(tp3).get()); try { result.partitionResult(new TopicPartition("unknown", 0)).get(); fail("should have thrown IllegalArgumentException"); @@ -3798,48 +3801,53 @@ public class KafkaAdminClientTest { Collections.emptySet(), node0); - final TopicPartition tp1 = new TopicPartition("foo", 0); - final TopicPartition tp2 = new TopicPartition("foo", 1); - final TopicPartition tp3 = new TopicPartition("bar", 0); + final TopicPartition tp0 = new TopicPartition("foo", 0); + final TopicPartition tp1 = new TopicPartition("foo", 1); + final TopicPartition tp2 = new TopicPartition("bar", 0); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); // listoffsets response from broker 0 - Map responseData = new HashMap<>(); - responseData.put(tp1, new PartitionData(Errors.LEADER_NOT_AVAILABLE, -1L, 123L, Optional.of(321))); - responseData.put(tp3, new PartitionData(Errors.NONE, -1L, 987L, Optional.of(789))); - env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData)); + ListOffsetTopicResponse t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.LEADER_NOT_AVAILABLE, -1L, 123L, 321); + ListOffsetTopicResponse t1 = ListOffsetResponse.singletonListOffsetTopicResponse(tp1, Errors.NONE, -1L, 987L, 789); + ListOffsetResponseData responseData = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(t0, t1)); + env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0); // listoffsets response from broker 1 - responseData = new HashMap<>(); - responseData.put(tp2, new PartitionData(Errors.NONE, -1L, 456L, Optional.of(654))); - env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData)); + ListOffsetTopicResponse t2 = ListOffsetResponse.singletonListOffsetTopicResponse(tp2, Errors.NONE, -1L, 456L, 654); + responseData = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(t2)); + env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node1); // metadata refresh because of LEADER_NOT_AVAILABLE env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); // listoffsets response from broker 0 - responseData = new HashMap<>(); - responseData.put(tp1, new PartitionData(Errors.NONE, -1L, 345L, Optional.of(543))); - env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData)); + t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.NONE, -1L, 345L, 543); + responseData = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(t0)); + env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0); Map partitions = new HashMap<>(); + partitions.put(tp0, OffsetSpec.latest()); partitions.put(tp1, OffsetSpec.latest()); partitions.put(tp2, OffsetSpec.latest()); - partitions.put(tp3, OffsetSpec.latest()); ListOffsetsResult result = env.adminClient().listOffsets(partitions); Map offsets = result.all().get(); assertFalse(offsets.isEmpty()); - assertEquals(345L, offsets.get(tp1).offset()); - assertEquals(543, offsets.get(tp1).leaderEpoch().get().intValue()); + assertEquals(345L, offsets.get(tp0).offset()); + assertEquals(543, offsets.get(tp0).leaderEpoch().get().intValue()); + assertEquals(-1L, offsets.get(tp0).timestamp()); + assertEquals(987L, offsets.get(tp1).offset()); + assertEquals(789, offsets.get(tp1).leaderEpoch().get().intValue()); assertEquals(-1L, offsets.get(tp1).timestamp()); - assertEquals(456, offsets.get(tp2).offset()); + assertEquals(456L, offsets.get(tp2).offset()); assertEquals(654, offsets.get(tp2).leaderEpoch().get().intValue()); assertEquals(-1L, offsets.get(tp2).timestamp()); - assertEquals(987, offsets.get(tp3).offset()); - assertEquals(789, offsets.get(tp3).leaderEpoch().get().intValue()); - assertEquals(-1L, offsets.get(tp3).timestamp()); } } @@ -3860,19 +3868,21 @@ public class KafkaAdminClientTest { Collections.emptySet(), node0); - final TopicPartition tp1 = new TopicPartition("foo", 0); + final TopicPartition tp0 = new TopicPartition("foo", 0); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); - Map responseData = new HashMap<>(); - responseData.put(tp1, new PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, -1L, -1, Optional.empty())); + ListOffsetTopicResponse t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.TOPIC_AUTHORIZATION_FAILED, -1L, -1L, -1); + ListOffsetResponseData responseData = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(t0)); env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData)); Map partitions = new HashMap<>(); - partitions.put(tp1, OffsetSpec.latest()); + partitions.put(tp0, OffsetSpec.latest()); ListOffsetsResult result = env.adminClient().listOffsets(partitions); TestUtils.assertFutureError(result.all(), TopicAuthorizationException.class); @@ -3908,13 +3918,17 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); // listoffsets response from broker 0 - Map responseData = new HashMap<>(); - responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 345L, Optional.of(543))); - env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData)); + ListOffsetTopicResponse t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.NONE, -1L, 345L, 543); + ListOffsetResponseData responseData = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(t0)); + env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0); // listoffsets response from broker 1 - responseData = new HashMap<>(); - responseData.put(tp1, new PartitionData(Errors.NONE, -1L, 789L, Optional.of(987))); - env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData)); + ListOffsetTopicResponse t1 = ListOffsetResponse.singletonListOffsetTopicResponse(tp1, Errors.NONE, -1L, 789L, 987); + responseData = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(t1)); + env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node1); Map partitions = new HashMap<>(); partitions.put(tp0, OffsetSpec.latest()); @@ -3955,9 +3969,11 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(prepareMetadataResponse(oldCluster, Errors.NONE)); - Map responseData = new HashMap<>(); - responseData.put(tp0, new PartitionData(Errors.NOT_LEADER_OR_FOLLOWER, -1L, 345L, Optional.of(543))); - responseData.put(tp1, new PartitionData(Errors.LEADER_NOT_AVAILABLE, -2L, 123L, Optional.of(456))); + ListOffsetTopicResponse t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1L, 345L, 543); + ListOffsetTopicResponse t1 = ListOffsetResponse.singletonListOffsetTopicResponse(tp1, Errors.LEADER_NOT_AVAILABLE, -2L, 123L, 456); + ListOffsetResponseData responseData = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(t0, t1)); env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0); final PartitionInfo newPInfo1 = new PartitionInfo("foo", 0, node1, @@ -3971,12 +3987,16 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(prepareMetadataResponse(newCluster, Errors.NONE)); - responseData = new HashMap<>(); - responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 345L, Optional.of(543))); + t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.NONE, -1L, 345L, 543); + responseData = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(t0)); env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node1); - responseData = new HashMap<>(); - responseData.put(tp1, new PartitionData(Errors.NONE, -2L, 123L, Optional.of(456))); + t1 = ListOffsetResponse.singletonListOffsetTopicResponse(tp1, Errors.NONE, -2L, 123L, 456); + responseData = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(t1)); env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node2); Map partitions = new HashMap<>(); @@ -4013,8 +4033,10 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(prepareMetadataResponse(oldCluster, Errors.NONE)); - Map responseData = new HashMap<>(); - responseData.put(tp0, new PartitionData(Errors.NOT_LEADER_OR_FOLLOWER, -1L, 345L, Optional.of(543))); + ListOffsetTopicResponse t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1L, 345L, 543); + ListOffsetResponseData responseData = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(t0)); env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0); // updating leader from node0 to node1 and metadata refresh because of NOT_LEADER_OR_FOLLOWER @@ -4025,8 +4047,10 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(prepareMetadataResponse(newCluster, Errors.NONE)); - responseData = new HashMap<>(); - responseData.put(tp0, new PartitionData(Errors.NONE, -2L, 123L, Optional.of(456))); + t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.NONE, -2L, 123L, 456); + responseData = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(t0)); env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node1); Map partitions = new HashMap<>(); @@ -4073,6 +4097,47 @@ public class KafkaAdminClientTest { } } + @Test + public void testListOffsetsPartialResponse() throws Exception { + Node node0 = new Node(0, "localhost", 8120); + Node node1 = new Node(1, "localhost", 8121); + List nodes = Arrays.asList(node0, node1); + List pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0, node1}, new Node[]{node0, node1})); + pInfos.add(new PartitionInfo("foo", 1, node0, new Node[]{node0, node1}, new Node[]{node0, node1})); + final Cluster cluster = + new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node0); + + final TopicPartition tp0 = new TopicPartition("foo", 0); + final TopicPartition tp1 = new TopicPartition("foo", 1); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + ListOffsetTopicResponse t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.NONE, -2L, 123L, 456); + ListOffsetResponseData data = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(t0)); + env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(data), node0); + + Map partitions = new HashMap<>(); + partitions.put(tp0, OffsetSpec.latest()); + partitions.put(tp1, OffsetSpec.latest()); + ListOffsetsResult result = env.adminClient().listOffsets(partitions); + assertNotNull(result.partitionResult(tp0).get()); + TestUtils.assertFutureThrows(result.partitionResult(tp1), ApiException.class); + TestUtils.assertFutureThrows(result.all(), ApiException.class); + } + } + @Test public void testGetSubLevelError() { List memberIdentities = Arrays.asList( diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 8cfbfd6e791..401dde27653 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -48,8 +48,12 @@ import org.apache.kafka.common.message.HeartbeatResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.ListOffsetResponseData; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.Selectable; @@ -118,6 +122,8 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; @@ -588,14 +594,22 @@ public class KafkaConsumerTest { consumer.seekToEnd(singleton(tp0)); consumer.seekToBeginning(singleton(tp1)); - client.prepareResponse( - body -> { - ListOffsetRequest request = (ListOffsetRequest) body; - Map timestamps = request.partitionTimestamps(); - return timestamps.get(tp0).timestamp == ListOffsetRequest.LATEST_TIMESTAMP && - timestamps.get(tp1).timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP; - }, listOffsetsResponse(Collections.singletonMap(tp0, 50L), - Collections.singletonMap(tp1, Errors.NOT_LEADER_OR_FOLLOWER))); + client.prepareResponse(body -> { + ListOffsetRequest request = (ListOffsetRequest) body; + List partitions = request.topics().stream().flatMap(t -> { + if (t.name().equals(topic)) + return Stream.of(t.partitions()); + else + return Stream.empty(); + }).flatMap(List::stream).collect(Collectors.toList()); + ListOffsetPartition expectedTp0 = new ListOffsetPartition() + .setPartitionIndex(tp0.partition()) + .setTimestamp(ListOffsetRequest.LATEST_TIMESTAMP); + ListOffsetPartition expectedTp1 = new ListOffsetPartition() + .setPartitionIndex(tp1.partition()) + .setTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP); + return partitions.contains(expectedTp0) && partitions.contains(expectedTp1); + }, listOffsetsResponse(Collections.singletonMap(tp0, 50L), Collections.singletonMap(tp1, Errors.NOT_LEADER_OR_FOLLOWER))); client.prepareResponse( body -> { FetchRequest request = (FetchRequest) body; @@ -1568,12 +1582,12 @@ public class KafkaConsumerTest { public void testMetricConfigRecordingLevel() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); - try (KafkaConsumer consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + try (KafkaConsumer consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { assertEquals(Sensor.RecordingLevel.INFO, consumer.metrics.config().recordLevel()); } props.put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); - try (KafkaConsumer consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + try (KafkaConsumer consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { assertEquals(Sensor.RecordingLevel.DEBUG, consumer.metrics.config().recordLevel()); } } @@ -2150,20 +2164,29 @@ public class KafkaConsumerTest { private ListOffsetResponse listOffsetsResponse(Map partitionOffsets, Map partitionErrors) { - Map partitionData = new HashMap<>(); + Map responses = new HashMap<>(); for (Map.Entry partitionOffset : partitionOffsets.entrySet()) { - partitionData.put(partitionOffset.getKey(), new ListOffsetResponse.PartitionData(Errors.NONE, - ListOffsetResponse.UNKNOWN_TIMESTAMP, partitionOffset.getValue(), - Optional.empty())); + TopicPartition tp = partitionOffset.getKey(); + ListOffsetTopicResponse topic = responses.computeIfAbsent(tp.topic(), k -> new ListOffsetTopicResponse().setName(tp.topic())); + topic.partitions().add(new ListOffsetPartitionResponse() + .setPartitionIndex(tp.partition()) + .setErrorCode(Errors.NONE.code()) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(partitionOffset.getValue())); } for (Map.Entry partitionError : partitionErrors.entrySet()) { - partitionData.put(partitionError.getKey(), new ListOffsetResponse.PartitionData( - partitionError.getValue(), ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())); + TopicPartition tp = partitionError.getKey(); + ListOffsetTopicResponse topic = responses.computeIfAbsent(tp.topic(), k -> new ListOffsetTopicResponse().setName(tp.topic())); + topic.partitions().add(new ListOffsetPartitionResponse() + .setPartitionIndex(tp.partition()) + .setErrorCode(partitionError.getValue().code()) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)); } - - return new ListOffsetResponse(partitionData); + ListOffsetResponseData data = new ListOffsetResponseData() + .setTopics(new ArrayList<>(responses.values())); + return new ListOffsetResponse(data); } private FetchResponse fetchResponse(Map fetches) { @@ -2189,7 +2212,7 @@ public class KafkaConsumerTest { return new FetchResponse<>(Errors.NONE, tpResponses, 0, INVALID_SESSION_ID); } - private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, int count) { + private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, int count) { FetchInfo fetchInfo = new FetchInfo(fetchOffset, count); return fetchResponse(Collections.singletonMap(partition, fetchInfo)); } @@ -2450,7 +2473,7 @@ public class KafkaConsumerTest { assertEquals((1.0d + 0.0d + 0.5d) / 3, consumer.metrics().get(pollIdleRatio).metricValue()); } - private static boolean consumerMetricPresent(KafkaConsumer consumer, String name) { + private static boolean consumerMetricPresent(KafkaConsumer consumer, String name) { MetricName metricName = new MetricName(name, "consumer-metrics", "", Collections.emptyMap()); return consumer.metrics.metrics().containsKey(metricName); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 02481cac996..44bdcf63590 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -48,6 +48,11 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition; +import org.apache.kafka.common.message.ListOffsetResponseData; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -1435,7 +1440,7 @@ public class FetcherTest { subscriptions.requestOffsetReset(tp0); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP, - Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NONE, 1L, 5L)); + validLeaderEpoch), listOffsetResponse(Errors.NONE, 1L, 5L)); fetcher.resetOffsetsIfNeeded(); consumerClient.pollNoWakeup(); assertFalse(subscriptions.isOffsetResetNeeded(tp0)); @@ -1471,7 +1476,7 @@ public class FetcherTest { // Fail with OFFSET_NOT_AVAILABLE client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP, - Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.OFFSET_NOT_AVAILABLE, 1L, 5L), false); + validLeaderEpoch), listOffsetResponse(Errors.OFFSET_NOT_AVAILABLE, 1L, 5L), false); fetcher.resetOffsetsIfNeeded(); consumerClient.pollNoWakeup(); assertFalse(subscriptions.hasValidPosition(tp0)); @@ -1481,7 +1486,7 @@ public class FetcherTest { // Fail with LEADER_NOT_AVAILABLE time.sleep(retryBackoffMs); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP, - Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.LEADER_NOT_AVAILABLE, 1L, 5L), false); + validLeaderEpoch), listOffsetResponse(Errors.LEADER_NOT_AVAILABLE, 1L, 5L), false); fetcher.resetOffsetsIfNeeded(); consumerClient.pollNoWakeup(); assertFalse(subscriptions.hasValidPosition(tp0)); @@ -1563,7 +1568,7 @@ public class FetcherTest { subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP, - Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NONE, 1L, 5L)); + validLeaderEpoch), listOffsetResponse(Errors.NONE, 1L, 5L)); fetcher.resetOffsetsIfNeeded(); consumerClient.pollNoWakeup(); @@ -1580,7 +1585,7 @@ public class FetcherTest { // First fetch fails with stale metadata client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP, - Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NOT_LEADER_OR_FOLLOWER, 1L, 5L), false); + validLeaderEpoch), listOffsetResponse(Errors.NOT_LEADER_OR_FOLLOWER, 1L, 5L), false); fetcher.resetOffsetsIfNeeded(); consumerClient.pollNoWakeup(); assertFalse(subscriptions.hasValidPosition(tp0)); @@ -1657,7 +1662,7 @@ public class FetcherTest { // First request gets a disconnect client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP, - Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NONE, 1L, 5L), true); + validLeaderEpoch), listOffsetResponse(Errors.NONE, 1L, 5L), true); fetcher.resetOffsetsIfNeeded(); consumerClient.pollNoWakeup(); assertFalse(subscriptions.hasValidPosition(tp0)); @@ -1835,7 +1840,7 @@ public class FetcherTest { // First request gets a disconnect client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP, - Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.TOPIC_AUTHORIZATION_FAILED, -1, -1), false); + validLeaderEpoch), listOffsetResponse(Errors.TOPIC_AUTHORIZATION_FAILED, -1, -1), false); fetcher.resetOffsetsIfNeeded(); consumerClient.pollNoWakeup(); assertFalse(subscriptions.hasValidPosition(tp0)); @@ -1873,7 +1878,7 @@ public class FetcherTest { subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP, - Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NONE, 1L, 10L)); + validLeaderEpoch), listOffsetResponse(Errors.NONE, 1L, 10L)); fetcher.resetOffsetsIfNeeded(); consumerClient.pollNoWakeup(); @@ -2496,36 +2501,65 @@ public class FetcherTest { client.updateMetadata(initialUpdateResponse); final long fetchTimestamp = 10L; - Map allPartitionData = new HashMap<>(); - allPartitionData.put(tp0, new ListOffsetResponse.PartitionData( - Errors.NONE, fetchTimestamp, 4L, Optional.empty())); - allPartitionData.put(tp1, new ListOffsetResponse.PartitionData( - retriableError, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty())); + ListOffsetPartitionResponse tp0NoError = new ListOffsetPartitionResponse() + .setPartitionIndex(tp0.partition()) + .setErrorCode(Errors.NONE.code()) + .setTimestamp(fetchTimestamp) + .setOffset(4L); + List topics = Collections.singletonList( + new ListOffsetTopicResponse() + .setName(tp0.topic()) + .setPartitions(Arrays.asList( + tp0NoError, + new ListOffsetPartitionResponse() + .setPartitionIndex(tp1.partition()) + .setErrorCode(retriableError.code()) + .setTimestamp(ListOffsetRequest.LATEST_TIMESTAMP) + .setOffset(-1L)))); + ListOffsetResponseData data = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(topics); client.prepareResponseFrom(body -> { boolean isListOffsetRequest = body instanceof ListOffsetRequest; if (isListOffsetRequest) { ListOffsetRequest request = (ListOffsetRequest) body; - Map expectedTopicPartitions = new HashMap<>(); - expectedTopicPartitions.put(tp0, new ListOffsetRequest.PartitionData( - fetchTimestamp, Optional.empty())); - expectedTopicPartitions.put(tp1, new ListOffsetRequest.PartitionData( - fetchTimestamp, Optional.empty())); - - return request.partitionTimestamps().equals(expectedTopicPartitions); + List expectedTopics = Collections.singletonList( + new ListOffsetTopic() + .setName(tp0.topic()) + .setPartitions(Arrays.asList( + new ListOffsetPartition() + .setPartitionIndex(tp1.partition()) + .setTimestamp(fetchTimestamp) + .setCurrentLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH), + new ListOffsetPartition() + .setPartitionIndex(tp0.partition()) + .setTimestamp(fetchTimestamp) + .setCurrentLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH)))); + return request.topics().equals(expectedTopics); } else { return false; } - }, new ListOffsetResponse(allPartitionData), originalLeader); + }, new ListOffsetResponse(data), originalLeader); client.prepareMetadataUpdate(updatedMetadata); // If the metadata wasn't updated before retrying, the fetcher would consult the original leader and hit a NOT_LEADER exception. // We will count the answered future response in the end to verify if this is the case. - Map paritionDataWithFatalError = new HashMap<>(allPartitionData); - paritionDataWithFatalError.put(tp1, new ListOffsetResponse.PartitionData( - Errors.NOT_LEADER_OR_FOLLOWER, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty())); - client.prepareResponseFrom(new ListOffsetResponse(paritionDataWithFatalError), originalLeader); + List topicsWithFatalError = Collections.singletonList( + new ListOffsetTopicResponse() + .setName(tp0.topic()) + .setPartitions(Arrays.asList( + tp0NoError, + new ListOffsetPartitionResponse() + .setPartitionIndex(tp1.partition()) + .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()) + .setTimestamp(ListOffsetRequest.LATEST_TIMESTAMP) + .setOffset(-1L)))); + ListOffsetResponseData dataWithFatalError = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(topicsWithFatalError); + client.prepareResponseFrom(new ListOffsetResponse(dataWithFatalError), originalLeader); // The request to new leader must only contain one partition tp1 with error. client.prepareResponseFrom(body -> { @@ -2533,9 +2567,12 @@ public class FetcherTest { if (isListOffsetRequest) { ListOffsetRequest request = (ListOffsetRequest) body; - return request.partitionTimestamps().equals( - Collections.singletonMap(tp1, new ListOffsetRequest.PartitionData( - fetchTimestamp, Optional.of(newLeaderEpoch)))); + ListOffsetTopic requestTopic = request.topics().get(0); + ListOffsetPartition expectedPartition = new ListOffsetPartition() + .setPartitionIndex(tp1.partition()) + .setTimestamp(fetchTimestamp) + .setCurrentLeaderEpoch(newLeaderEpoch); + return expectedPartition.equals(requestTopic.partitions().get(0)); } else { return false; } @@ -2594,9 +2631,9 @@ public class FetcherTest { MockClient.RequestMatcher matcher = body -> { if (body instanceof ListOffsetRequest) { ListOffsetRequest offsetRequest = (ListOffsetRequest) body; - Optional epoch = offsetRequest.partitionTimestamps().get(tp0).currentLeaderEpoch; - assertTrue("Expected Fetcher to set leader epoch in request", epoch.isPresent()); - assertEquals("Expected leader epoch to match epoch from metadata update", epoch.get().longValue(), 99); + int epoch = offsetRequest.topics().get(0).partitions().get(0).currentLeaderEpoch(); + assertTrue("Expected Fetcher to set leader epoch in request", epoch != ListOffsetResponse.UNKNOWN_EPOCH); + assertEquals("Expected leader epoch to match epoch from metadata update", epoch, 99); return true; } else { fail("Should have seen ListOffsetRequest"); @@ -2692,14 +2729,22 @@ public class FetcherTest { public void testBatchedListOffsetsMetadataErrors() { buildFetcher(); - Map partitionData = new HashMap<>(); - partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_OR_FOLLOWER, - ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())); - partitionData.put(tp1, new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, - ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())); - client.prepareResponse(new ListOffsetResponse(0, partitionData)); + ListOffsetResponseData data = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Collections.singletonList(new ListOffsetTopicResponse() + .setName(tp0.topic()) + .setPartitions(Arrays.asList( + new ListOffsetPartitionResponse() + .setPartitionIndex(tp0.partition()) + .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET), + new ListOffsetPartitionResponse() + .setPartitionIndex(tp1.partition()) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET))))); + client.prepareResponse(new ListOffsetResponse(data)); Map offsetsToSearch = new HashMap<>(); offsetsToSearch.put(tp0, ListOffsetRequest.EARLIEST_TIMESTAMP); @@ -3544,16 +3589,58 @@ public class FetcherTest { private void testGetOffsetsForTimesWithUnknownOffset() { client.reset(); - // Ensure metadata has both partition. + // Ensure metadata has both partitions. MetadataResponse initialMetadataUpdate = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1)); client.updateMetadata(initialMetadataUpdate); - Map partitionData = new HashMap<>(); - partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NONE, - ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())); + ListOffsetResponseData data = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Collections.singletonList(new ListOffsetTopicResponse() + .setName(tp0.topic()) + .setPartitions(Collections.singletonList(new ListOffsetPartitionResponse() + .setPartitionIndex(tp0.partition()) + .setErrorCode(Errors.NONE.code()) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET))))); - client.prepareResponseFrom(new ListOffsetResponse(0, partitionData), + client.prepareResponseFrom(new ListOffsetResponse(data), + metadata.fetch().leaderFor(tp0)); + + Map timestampToSearch = new HashMap<>(); + timestampToSearch.put(tp0, 0L); + Map offsetAndTimestampMap = + fetcher.offsetsForTimes(timestampToSearch, time.timer(Long.MAX_VALUE)); + + assertTrue(offsetAndTimestampMap.containsKey(tp0)); + assertNull(offsetAndTimestampMap.get(tp0)); + } + + @Test + public void testGetOffsetsForTimesWithUnknownOffsetV0() { + buildFetcher(); + // Empty map + assertTrue(fetcher.offsetsForTimes(new HashMap<>(), time.timer(100L)).isEmpty()); + // Unknown Offset + client.reset(); + // Ensure metadata has both partition. + MetadataResponse initialMetadataUpdate = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1)); + client.updateMetadata(initialMetadataUpdate); + // Force LIST_OFFSETS version 0 + Node node = metadata.fetch().nodes().get(0); + apiVersions.update(node.idString(), NodeApiVersions.create( + ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 0)); + + ListOffsetResponseData data = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Collections.singletonList(new ListOffsetTopicResponse() + .setName(tp0.topic()) + .setPartitions(Collections.singletonList(new ListOffsetPartitionResponse() + .setPartitionIndex(tp0.partition()) + .setErrorCode(Errors.NONE.code()) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOldStyleOffsets(Collections.emptyList()))))); + + client.prepareResponseFrom(new ListOffsetResponse(data), metadata.fetch().leaderFor(tp0)); Map timestampToSearch = new HashMap<>(); @@ -4230,7 +4317,7 @@ public class FetcherTest { expectedOffsets.put(tp1, 4L); expectedOffsets.put(tp2, 6L); assignFromUser(expectedOffsets.keySet()); - client.prepareResponse(listOffsetResponse(Errors.NONE, ListOffsetRequest.EARLIEST_TIMESTAMP, expectedOffsets)); + client.prepareResponse(listOffsetResponse(expectedOffsets, Errors.NONE, ListOffsetRequest.EARLIEST_TIMESTAMP, ListOffsetResponse.UNKNOWN_EPOCH)); assertEquals(expectedOffsets, fetcher.beginningOffsets(asList(tp0, tp1, tp2), time.timer(5000L))); } @@ -4264,7 +4351,7 @@ public class FetcherTest { expectedOffsets.put(tp1, 7L); expectedOffsets.put(tp2, 9L); assignFromUser(expectedOffsets.keySet()); - client.prepareResponse(listOffsetResponse(Errors.NONE, ListOffsetRequest.LATEST_TIMESTAMP, expectedOffsets)); + client.prepareResponse(listOffsetResponse(expectedOffsets, Errors.NONE, ListOffsetRequest.LATEST_TIMESTAMP, ListOffsetResponse.UNKNOWN_EPOCH)); assertEquals(expectedOffsets, fetcher.endOffsets(asList(tp0, tp1, tp2), time.timer(5000L))); } @@ -4275,24 +4362,19 @@ public class FetcherTest { } private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) { - return listOffsetRequestMatcher(timestamp, Optional.empty()); - } - - private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp, Optional leaderEpoch) { - // matches any list offset request with the provided timestamp - return body -> { - ListOffsetRequest req = (ListOffsetRequest) body; - return req.partitionTimestamps().equals(Collections.singletonMap( - tp0, new ListOffsetRequest.PartitionData(timestamp, leaderEpoch))); - }; + return listOffsetRequestMatcher(timestamp, ListOffsetResponse.UNKNOWN_EPOCH); } private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp, final int leaderEpoch) { // matches any list offset request with the provided timestamp return body -> { ListOffsetRequest req = (ListOffsetRequest) body; - return req.partitionTimestamps().equals(Collections.singletonMap( - tp0, new ListOffsetRequest.PartitionData(timestamp, Optional.of(leaderEpoch)))); + ListOffsetTopic topic = req.topics().get(0); + ListOffsetPartition partition = topic.partitions().get(0); + return tp0.topic().equals(topic.name()) + && tp0.partition() == partition.partitionIndex() + && timestamp == partition.timestamp() + && leaderEpoch == partition.currentLeaderEpoch(); }; } @@ -4301,26 +4383,35 @@ public class FetcherTest { } private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) { - return listOffsetResponse(tp, error, timestamp, offset, null); + return listOffsetResponse(tp, error, timestamp, offset, ListOffsetResponse.UNKNOWN_EPOCH); } - private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset, - Integer leaderEpoch) { - ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData( - error, timestamp, offset, Optional.ofNullable(leaderEpoch)); - Map allPartitionData = new HashMap<>(); - allPartitionData.put(tp, partitionData); - return new ListOffsetResponse(allPartitionData); + private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset, int leaderEpoch) { + Map offsets = new HashMap<>(); + offsets.put(tp, offset); + return listOffsetResponse(offsets, error, timestamp, leaderEpoch); } - private ListOffsetResponse listOffsetResponse(Errors error, long timestamp, Map offsets) { - Map allPartitionData = new HashMap<>(); + private ListOffsetResponse listOffsetResponse(Map offsets, Errors error, long timestamp, int leaderEpoch) { + Map> responses = new HashMap<>(); for (Map.Entry entry : offsets.entrySet()) { - ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error, timestamp, - entry.getValue(), Optional.empty()); - allPartitionData.put(entry.getKey(), partitionData); + TopicPartition tp = entry.getKey(); + responses.putIfAbsent(tp.topic(), new ArrayList<>()); + responses.get(tp.topic()).add(new ListOffsetPartitionResponse() + .setPartitionIndex(tp.partition()) + .setErrorCode(error.code()) + .setOffset(entry.getValue()) + .setTimestamp(timestamp) + .setLeaderEpoch(leaderEpoch)); } - return new ListOffsetResponse(allPartitionData); + List topics = new ArrayList<>(); + for (Map.Entry> response : responses.entrySet()) { + topics.add(new ListOffsetTopicResponse() + .setName(response.getKey()) + .setPartitions(response.getValue())); + } + ListOffsetResponseData data = new ListOffsetResponseData().setTopics(topics); + return new ListOffsetResponse(data); } private FetchResponse fullFetchResponseWithAbortedTransactions(MemoryRecords records, diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index 30b8590e33d..9672bf45379 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -18,6 +18,8 @@ package org.apache.kafka.common.message; import com.fasterxml.jackson.databind.JsonNode; + +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic; import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection; @@ -25,6 +27,10 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse; import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition; import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic; import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition; @@ -162,6 +168,49 @@ public final class MessageTest { testAllMessageRoundTripsFromVersion((short) 5, newRequest.get().setGroupInstanceId("instanceId")); } + @Test + public void testListOffsetsRequestVersions() throws Exception { + List v = Collections.singletonList(new ListOffsetTopic() + .setName("topic") + .setPartitions(Collections.singletonList(new ListOffsetPartition() + .setPartitionIndex(0) + .setTimestamp(123L)))); + Supplier newRequest = () -> new ListOffsetRequestData() + .setTopics(v) + .setReplicaId(0); + testAllMessageRoundTrips(newRequest.get()); + testAllMessageRoundTripsFromVersion((short) 2, newRequest.get().setIsolationLevel(IsolationLevel.READ_COMMITTED.id())); + } + + @Test + public void testListOffsetsResponseVersions() throws Exception { + ListOffsetPartitionResponse partition = new ListOffsetPartitionResponse() + .setErrorCode(Errors.NONE.code()) + .setPartitionIndex(0) + .setOldStyleOffsets(Collections.singletonList(321L)); + List topics = Collections.singletonList(new ListOffsetTopicResponse() + .setName("topic") + .setPartitions(Collections.singletonList(partition))); + Supplier response = () -> new ListOffsetResponseData() + .setTopics(topics); + for (short version = 0; version <= ApiKeys.LIST_OFFSETS.latestVersion(); version++) { + ListOffsetResponseData responseData = response.get(); + if (version > 0) { + responseData.topics().get(0).partitions().get(0) + .setOldStyleOffsets(Collections.emptyList()) + .setOffset(456L) + .setTimestamp(123L); + } + if (version > 1) { + responseData.setThrottleTimeMs(1000); + } + if (version > 3) { + partition.setLeaderEpoch(1); + } + testEquivalentMessageRoundTrip(version, responseData); + } + } + @Test public void testJoinGroupResponseVersions() throws Exception { Supplier newResponse = () -> new JoinGroupResponseData() diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetRequestTest.java new file mode 100644 index 00000000000..9d7c2aa7cfd --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetRequestTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ListOffsetRequestData; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic; +import org.apache.kafka.common.message.ListOffsetResponseData; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.junit.Test; + +public class ListOffsetRequestTest { + + @Test + public void testDuplicatePartitions() { + List topics = Collections.singletonList( + new ListOffsetTopic() + .setName("topic") + .setPartitions(Arrays.asList( + new ListOffsetPartition() + .setPartitionIndex(0), + new ListOffsetPartition() + .setPartitionIndex(0)))); + ListOffsetRequestData data = new ListOffsetRequestData() + .setTopics(topics) + .setReplicaId(-1); + ListOffsetRequest request = new ListOffsetRequest(data.toStruct((short) 0), (short) 0); + assertEquals(Collections.singleton(new TopicPartition("topic", 0)), request.duplicatePartitions()); + } + + @Test + public void testGetErrorResponse() { + for (short version = 1; version <= ApiKeys.LIST_OFFSETS.latestVersion(); version++) { + List topics = Arrays.asList( + new ListOffsetTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new ListOffsetPartition() + .setPartitionIndex(0)))); + ListOffsetRequest request = ListOffsetRequest.Builder + .forConsumer(true, IsolationLevel.READ_COMMITTED) + .setTargetTimes(topics) + .build(version); + ListOffsetResponse response = (ListOffsetResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception()); + + List v = Collections.singletonList( + new ListOffsetTopicResponse() + .setName("topic") + .setPartitions(Collections.singletonList( + new ListOffsetPartitionResponse() + .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()) + .setLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET) + .setPartitionIndex(0) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)))); + ListOffsetResponseData data = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(v); + ListOffsetResponse expectedResponse = new ListOffsetResponse(data); + assertEquals(expectedResponse.data().topics(), response.data().topics()); + assertEquals(expectedResponse.throttleTimeMs(), response.throttleTimeMs()); + } + } + + @Test + public void testGetErrorResponseV0() { + List topics = Arrays.asList( + new ListOffsetTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new ListOffsetPartition() + .setPartitionIndex(0)))); + ListOffsetRequest request = ListOffsetRequest.Builder + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .setTargetTimes(topics) + .build((short) 0); + ListOffsetResponse response = (ListOffsetResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception()); + + List v = Collections.singletonList( + new ListOffsetTopicResponse() + .setName("topic") + .setPartitions(Collections.singletonList( + new ListOffsetPartitionResponse() + .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()) + .setOldStyleOffsets(Collections.emptyList()) + .setPartitionIndex(0)))); + ListOffsetResponseData data = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(v); + ListOffsetResponse expectedResponse = new ListOffsetResponse(data); + assertEquals(expectedResponse.data().topics(), response.data().topics()); + assertEquals(expectedResponse.throttleTimeMs(), response.throttleTimeMs()); + } + + @Test + public void testToListOffsetTopics() { + ListOffsetPartition lop0 = new ListOffsetPartition() + .setPartitionIndex(0) + .setCurrentLeaderEpoch(1) + .setMaxNumOffsets(2) + .setTimestamp(123L); + ListOffsetPartition lop1 = new ListOffsetPartition() + .setPartitionIndex(1) + .setCurrentLeaderEpoch(3) + .setMaxNumOffsets(4) + .setTimestamp(567L); + Map timestampsToSearch = new HashMap<>(); + timestampsToSearch.put(new TopicPartition("topic", 0), lop0); + timestampsToSearch.put(new TopicPartition("topic", 1), lop1); + List listOffsetTopics = ListOffsetRequest.toListOffsetTopics(timestampsToSearch); + assertEquals(1, listOffsetTopics.size()); + ListOffsetTopic topic = listOffsetTopics.get(0); + assertEquals("topic", topic.name()); + assertEquals(2, topic.partitions().size()); + assertTrue(topic.partitions().contains(lop0)); + assertTrue(topic.partitions().contains(lop1)); + } + +} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 4af683d7f64..8576c0c7dcc 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -113,6 +113,11 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic; +import org.apache.kafka.common.message.ListOffsetResponseData; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.OffsetCommitRequestData; @@ -1221,28 +1226,39 @@ public class RequestResponseTest { private ListOffsetRequest createListOffsetRequest(int version) { if (version == 0) { - Map offsetData = Collections.singletonMap( - new TopicPartition("test", 0), - new ListOffsetRequest.PartitionData(1000000L, 10)); + ListOffsetTopic topic = new ListOffsetTopic() + .setName("test") + .setPartitions(Arrays.asList(new ListOffsetPartition() + .setPartitionIndex(0) + .setTimestamp(1000000L) + .setMaxNumOffsets(10))); return ListOffsetRequest.Builder .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) - .setTargetTimes(offsetData) + .setTargetTimes(Collections.singletonList(topic)) .build((short) version); } else if (version == 1) { - Map offsetData = Collections.singletonMap( - new TopicPartition("test", 0), - new ListOffsetRequest.PartitionData(1000000L, Optional.empty())); + ListOffsetTopic topic = new ListOffsetTopic() + .setName("test") + .setPartitions(Arrays.asList(new ListOffsetPartition() + .setPartitionIndex(0) + .setTimestamp(1000000L))); return ListOffsetRequest.Builder .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) - .setTargetTimes(offsetData) + .setTargetTimes(Collections.singletonList(topic)) .build((short) version); } else if (version >= 2 && version <= 5) { - Map offsetData = Collections.singletonMap( - new TopicPartition("test", 0), - new ListOffsetRequest.PartitionData(1000000L, Optional.of(5))); + ListOffsetPartition partition = new ListOffsetPartition() + .setPartitionIndex(0) + .setTimestamp(1000000L); + if (version >= 4) { + partition.setCurrentLeaderEpoch(5); + } + ListOffsetTopic topic = new ListOffsetTopic() + .setName("test") + .setPartitions(Arrays.asList(partition)); return ListOffsetRequest.Builder .forConsumer(true, IsolationLevel.READ_COMMITTED) - .setTargetTimes(offsetData) + .setTargetTimes(Collections.singletonList(topic)) .build((short) version); } else { throw new IllegalArgumentException("Illegal ListOffsetRequest version " + version); @@ -1251,15 +1267,28 @@ public class RequestResponseTest { private ListOffsetResponse createListOffsetResponse(int version) { if (version == 0) { - Map responseData = new HashMap<>(); - responseData.put(new TopicPartition("test", 0), - new ListOffsetResponse.PartitionData(Errors.NONE, asList(100L))); - return new ListOffsetResponse(responseData); + ListOffsetResponseData data = new ListOffsetResponseData() + .setTopics(Collections.singletonList(new ListOffsetTopicResponse() + .setName("test") + .setPartitions(Collections.singletonList(new ListOffsetPartitionResponse() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code()) + .setOldStyleOffsets(asList(100L)))))); + return new ListOffsetResponse(data); } else if (version >= 1 && version <= 5) { - Map responseData = new HashMap<>(); - responseData.put(new TopicPartition("test", 0), - new ListOffsetResponse.PartitionData(Errors.NONE, 10000L, 100L, Optional.of(27))); - return new ListOffsetResponse(responseData); + ListOffsetPartitionResponse partition = new ListOffsetPartitionResponse() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code()) + .setTimestamp(10000L) + .setOffset(100L); + if (version >= 4) { + partition.setLeaderEpoch(27); + } + ListOffsetResponseData data = new ListOffsetResponseData() + .setTopics(Collections.singletonList(new ListOffsetTopicResponse() + .setName("test") + .setPartitions(Collections.singletonList(partition)))); + return new ListOffsetResponse(data); } else { throw new IllegalArgumentException("Illegal ListOffsetResponse version " + version); } diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index e1d1ef10292..ed922b18f6f 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -29,7 +29,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Random; import java.util.Base64.Encoder; import java.util.Set; @@ -53,7 +52,6 @@ import javax.security.sasl.SaslException; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.config.types.Password; @@ -62,6 +60,9 @@ import org.apache.kafka.common.message.ApiVersionsRequestData; import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection; +import org.apache.kafka.common.message.ListOffsetResponseData; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse; import org.apache.kafka.common.message.RequestHeaderData; import org.apache.kafka.common.message.SaslAuthenticateRequestData; import org.apache.kafka.common.message.SaslHandshakeRequestData; @@ -1581,8 +1582,17 @@ public class SaslAuthenticatorTest { @Test public void testConvertListOffsetResponseToSaslHandshakeResponse() { - ListOffsetResponse response = new ListOffsetResponse(0, Collections.singletonMap(new TopicPartition("topic", 0), - new ListOffsetResponse.PartitionData(Errors.NONE, 0, 0, Optional.empty()))); + ListOffsetResponseData data = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Collections.singletonList(new ListOffsetTopicResponse() + .setName("topic") + .setPartitions(Collections.singletonList(new ListOffsetPartitionResponse() + .setErrorCode(Errors.NONE.code()) + .setLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH) + .setPartitionIndex(0) + .setOffset(0) + .setTimestamp(0))))); + ListOffsetResponse response = new ListOffsetResponse(data); ByteBuffer buffer = response.serialize(ApiKeys.LIST_OFFSETS, LIST_OFFSETS.latestVersion(), 0); final RequestHeader header0 = new RequestHeader(LIST_OFFSETS, LIST_OFFSETS.latestVersion(), "id", SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID); Assert.assertThrows(SchemaException.class, () -> NetworkClient.parseResponse(buffer.duplicate(), header0)); diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 1508690fdb1..3539bde6582 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -378,7 +378,7 @@ class Partition(val topicPartition: TopicPartition, getLocalLog(currentLeaderEpoch, requireLeader) match { case Left(localLog) => localLog case Right(error) => - throw error.exception(s"Failed to find ${if (requireLeader) "leader " else ""} log for " + + throw error.exception(s"Failed to find ${if (requireLeader) "leader" else ""} log for " + s"partition $topicPartition with leader epoch $currentLeaderEpoch. The current leader " + s"is $leaderReplicaIdOpt and the current epoch $leaderEpoch") } @@ -1071,7 +1071,7 @@ class Partition(val topicPartition: TopicPartition, case None => localLog.logEndOffset } - val epochLogString = if(currentLeaderEpoch.isPresent) { + val epochLogString = if (currentLeaderEpoch.isPresent) { s"epoch ${currentLeaderEpoch.get}" } else { "unknown epoch" diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 08f88456565..4ed462b8282 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -62,6 +62,9 @@ import org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsP import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition +import org.apache.kafka.common.message.ListOffsetResponseData +import org.apache.kafka.common.message.ListOffsetResponseData.{ListOffsetPartitionResponse, ListOffsetTopicResponse} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ListenerName, Send} import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -934,136 +937,161 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion - val mergedResponseMap = if (version == 0) + val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) - sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val partitionTimestamps = offsetRequest.partitionTimestamps.asScala - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, + DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) - } + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)).asJava) + ) - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { - val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { - // NOTE: UnknownTopicOrPartitionException and NotLeaderOrFollowerException are special cased since these error messages - // are typically transient and there is no value in logging the entire stack trace for the same - case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderOrFollowerException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } - } - responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { - val correlationId = request.header.correlationId - val clientId = request.header.clientId - val offsetRequest = request.body[ListOffsetRequest] - - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty()) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + - s"failed because the partition is duplicated in the request.") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { - - def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( - e, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) try { - val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID - val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID - val isolationLevelOpt = if (isClientRequest) - Some(offsetRequest.isolationLevel) - else - None - - val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, - partitionData.timestamp, - isolationLevelOpt, - partitionData.currentLeaderEpoch, - fetchOnlyFromLeader) - - val response = foundOpt match { - case Some(found) => - new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset, found.leaderEpoch) - case None => - new ListOffsetResponse.PartitionData(Errors.NONE, ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty()) - } - (topicPartition, response) + val offsets = replicaManager.legacyFetchOffsetsForTimestamp( + topicPartition = topicPartition, + timestamp = partition.timestamp, + maxNumOffsets = partition.maxNumOffsets, + isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, + fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.NONE.code) + .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava) } catch { - // NOTE: These exceptions are special cased since these error messages are typically transient or the client - // would have received a clear exception and there is no value in logging the entire stack trace for the same + // NOTE: UnknownTopicOrPartitionException and NotLeaderOrFollowerException are special cases since these error messages + // are typically transient and there is no value in logging the entire stack trace for the same case e @ (_ : UnknownTopicOrPartitionException | _ : NotLeaderOrFollowerException | - _ : UnknownLeaderEpochException | - _ : FencedLeaderEpochException | - _ : KafkaStorageException | - _ : UnsupportedForMessageFormatException) => - debug(s"Offset request with correlation id $correlationId from client $clientId on " + - s"partition $topicPartition failed due to ${e.getMessage}") - buildErrorResponse(Errors.forException(e)) - - // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE - case e: OffsetNotAvailableException => - if(request.header.apiVersion >= 5) { - buildErrorResponse(Errors.forException(e)) - } else { - buildErrorResponse(Errors.LEADER_NOT_AVAILABLE) - } - + _ : KafkaStorageException) => + debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + correlationId, clientId, topicPartition, e.getMessage)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) case e: Throwable => error("Error while responding to offset request", e) - buildErrorResponse(Errors.forException(e)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) } } + new ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava) } - responseMap ++ unauthorizedResponseStatus + (responseTopics ++ unauthorizedResponseStatus).toList + } + + private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): List[ListOffsetTopicResponse] = { + val correlationId = request.header.correlationId + val clientId = request.header.clientId + val offsetRequest = request.body[ListOffsetRequest] + + def buildErrorResponse(e: Errors, partition: ListOffsetPartition): ListOffsetPartitionResponse = { + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(e.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET) + } + + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, + DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + buildErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED, partition)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) + if (offsetRequest.duplicatePartitions.contains(topicPartition)) { + debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + + s"failed because the partition is duplicated in the request.") + buildErrorResponse(Errors.INVALID_REQUEST, partition) + } else { + try { + val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID + val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID + val isolationLevelOpt = if (isClientRequest) + Some(offsetRequest.isolationLevel) + else + None + + val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, + partition.timestamp, + isolationLevelOpt, + if (partition.currentLeaderEpoch == ListOffsetResponse.UNKNOWN_EPOCH) Optional.empty() else Optional.of(partition.currentLeaderEpoch), + fetchOnlyFromLeader) + + val response = foundOpt match { + case Some(found) => + val partitionResponse = new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.NONE.code) + .setTimestamp(found.timestamp) + .setOffset(found.offset) + if (found.leaderEpoch.isPresent) + partitionResponse.setLeaderEpoch(found.leaderEpoch.get) + partitionResponse + case None => + buildErrorResponse(Errors.NONE, partition) + } + response + } catch { + // NOTE: These exceptions are special cases since these error messages are typically transient or the client + // would have received a clear exception and there is no value in logging the entire stack trace for the same + case e @ (_ : UnknownTopicOrPartitionException | + _ : NotLeaderForPartitionException | + _ : UnknownLeaderEpochException | + _ : FencedLeaderEpochException | + _ : KafkaStorageException | + _ : UnsupportedForMessageFormatException) => + e.printStackTrace() + debug(s"Offset request with correlation id $correlationId from client $clientId on " + + s"partition $topicPartition failed due to ${e.getMessage}") + buildErrorResponse(Errors.forException(e), partition) + + // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE + case e: OffsetNotAvailableException => + if (request.header.apiVersion >= 5) { + buildErrorResponse(Errors.forException(e), partition) + } else { + buildErrorResponse(Errors.LEADER_NOT_AVAILABLE, partition) + } + + case e: Throwable => + error("Error while responding to offset request", e) + buildErrorResponse(Errors.forException(e), partition) + } + } + } + new ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava) + } + (responseTopics ++ unauthorizedResponseStatus).toList } private def createTopic(topic: String, diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index d0432eae001..749755db510 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -17,6 +17,7 @@ package kafka.server +import java.util.Collections import java.util.Optional import kafka.api._ @@ -28,6 +29,7 @@ import kafka.utils.Implicits._ import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.message.ListOffsetRequestData.{ListOffsetPartition, ListOffsetTopic} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.{MemoryRecords, Records} @@ -229,22 +231,27 @@ class ReplicaFetcherThread(name: String, } private def fetchOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int, earliestOrLatest: Long): Long = { - val requestPartitionData = new ListOffsetRequest.PartitionData(earliestOrLatest, - Optional.of[Integer](currentLeaderEpoch)) - val requestPartitions = Map(topicPartition -> requestPartitionData) + val topic = new ListOffsetTopic() + .setName(topicPartition.topic) + .setPartitions(Collections.singletonList( + new ListOffsetPartition() + .setPartitionIndex(topicPartition.partition) + .setCurrentLeaderEpoch(currentLeaderEpoch) + .setTimestamp(earliestOrLatest))) val requestBuilder = ListOffsetRequest.Builder.forReplica(listOffsetRequestVersion, replicaId) - .setTargetTimes(requestPartitions.asJava) + .setTargetTimes(Collections.singletonList(topic)) val clientResponse = leaderEndpoint.sendRequest(requestBuilder) val response = clientResponse.responseBody.asInstanceOf[ListOffsetResponse] + val responsePartition = response.topics.asScala.find(_.name == topicPartition.topic).get + .partitions.asScala.find(_.partitionIndex == topicPartition.partition).get - val responsePartitionData = response.responseData.get(topicPartition) - responsePartitionData.error match { + Errors.forCode(responsePartition.errorCode) match { case Errors.NONE => if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) - responsePartitionData.offset + responsePartition.offset else - responsePartitionData.offsets.get(0) + responsePartition.oldStyleOffsets.get(0) case error => throw error.exception } } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 863fb43b530..c50c5cb776c 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -42,6 +42,7 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{Alter import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity +import org.apache.kafka.common.message.ListOffsetRequestData.{ListOffsetPartition, ListOffsetTopic} import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState} import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} import org.apache.kafka.common.message.{AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, OffsetCommitRequestData, SyncGroupRequestData} @@ -155,7 +156,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2), ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses.asScala.find(_._1 == tp).get._2.error), ApiKeys.FETCH -> ((resp: requests.FetchResponse[Records]) => resp.responseData.asScala.find(_._1 == tp).get._2.error), - ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData.asScala.find(_._1 == tp).get._2.error), + ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => { + Errors.forCode( + resp.data + .topics.asScala.find(_.name == topic).get + .partitions.asScala.find(_.partitionIndex == part).get + .errorCode + ) + }), ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => Errors.forCode( resp.data().topics().get(0).partitions().get(0).errorCode())), ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error), @@ -164,8 +172,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.error), ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => Errors.forCode(resp.data.errorCode())), ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => { - val errorCode = resp.data().groups().asScala.find(g => group.equals(g.groupId())).head.errorCode() - Errors.forCode(errorCode) + Errors.forCode(resp.data.groups.asScala.find(g => group == g.groupId).head.errorCode) }), ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.error), ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error), @@ -212,9 +219,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.OFFSET_DELETE -> ((resp: OffsetDeleteResponse) => { Errors.forCode( resp.data - .topics().asScala.find(_.name() == topic).get - .partitions().asScala.find(_.partitionIndex() == part).get - .errorCode() + .topics.asScala.find(_.name == topic).get + .partitions.asScala.find(_.partitionIndex == part).get + .errorCode ) }) ) @@ -304,7 +311,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def createListOffsetsRequest = { requests.ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED).setTargetTimes( - Map(tp -> new ListOffsetRequest.PartitionData(0L, Optional.of[Integer](27))).asJava). + List(new ListOffsetTopic() + .setName(tp.topic) + .setPartitions(List(new ListOffsetPartition() + .setPartitionIndex(tp.partition) + .setTimestamp(0L) + .setCurrentLeaderEpoch(27)).asJava)).asJava + ). build() } diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index f0dda569878..01ebfd15cbf 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -64,7 +64,9 @@ class DelayedFetchTest extends EasyMockSupport { EasyMock.expect(replicaManager.getPartitionOrException(topicPartition)) .andReturn(partition) - EasyMock.expect(partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = true)) + EasyMock.expect(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) .andThrow(new FencedLeaderEpochException("Requested epoch has been fenced")) EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 08bd1b98770..e989b960df6 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -176,7 +176,7 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(Some(4), partition.leaderLogIfLocal.map(_.logEndOffset)) assertEquals(None, log.latestEpoch) - val epochEndOffset = partition.lastOffsetForLeaderEpoch(currentLeaderEpoch = Optional.of[Integer](leaderEpoch), + val epochEndOffset = partition.lastOffsetForLeaderEpoch(currentLeaderEpoch = Optional.of(leaderEpoch), leaderEpoch = leaderEpoch, fetchOnlyFromLeader = true) assertEquals(EpochEndOffset.UNDEFINED_EPOCH_OFFSET, epochEndOffset.endOffset) assertEquals(EpochEndOffset.UNDEFINED_EPOCH, epochEndOffset.leaderEpoch) @@ -534,7 +534,7 @@ class PartitionTest extends AbstractPartitionTest { assertTrue(timestampAndOffsetOpt.isDefined) val timestampAndOffset = timestampAndOffsetOpt.get - assertEquals(Optional.of(leaderEpoch), timestampAndOffset.leaderEpoch) + assertEquals(leaderEpoch, timestampAndOffset.leaderEpoch.get) } /** diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index eec57a4295f..5ee74011994 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -50,6 +50,7 @@ import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity +import org.apache.kafka.common.message.ListOffsetRequestData.{ListOffsetPartition, ListOffsetTopic} import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection} import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState} import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} @@ -1210,19 +1211,25 @@ class KafkaApisTest { val capturedResponse = expectNoThrottling() EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel) - val targetTimes = Map(tp -> new ListOffsetRequest.PartitionData(ListOffsetRequest.EARLIEST_TIMESTAMP, - currentLeaderEpoch)) + val targetTimes = List(new ListOffsetTopic() + .setName(tp.topic) + .setPartitions(List(new ListOffsetPartition() + .setPartitionIndex(tp.partition) + .setTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP) + .setCurrentLeaderEpoch(currentLeaderEpoch.get)).asJava)).asJava val listOffsetRequest = ListOffsetRequest.Builder.forConsumer(true, isolationLevel) - .setTargetTimes(targetTimes.asJava).build() + .setTargetTimes(targetTimes).build() val request = buildRequest(listOffsetRequest) createKafkaApis().handleListOffsetRequest(request) val response = readResponse(ApiKeys.LIST_OFFSETS, listOffsetRequest, capturedResponse) .asInstanceOf[ListOffsetResponse] - assertTrue(response.responseData.containsKey(tp)) + val partitionDataOptional = response.topics.asScala.find(_.name == tp.topic).get + .partitions.asScala.find(_.partitionIndex == tp.partition) + assertTrue(partitionDataOptional.isDefined) - val partitionData = response.responseData.get(tp) - assertEquals(error, partitionData.error) + val partitionData = partitionDataOptional.get + assertEquals(error.code, partitionData.errorCode) assertEquals(ListOffsetResponse.UNKNOWN_OFFSET, partitionData.offset) assertEquals(ListOffsetResponse.UNKNOWN_TIMESTAMP, partitionData.timestamp) } @@ -2214,18 +2221,23 @@ class KafkaApisTest { val capturedResponse = expectNoThrottling() EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel) - val targetTimes = Map(tp -> new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, - currentLeaderEpoch)) + val targetTimes = List(new ListOffsetTopic() + .setName(tp.topic) + .setPartitions(List(new ListOffsetPartition() + .setPartitionIndex(tp.partition) + .setTimestamp(ListOffsetRequest.LATEST_TIMESTAMP)).asJava)).asJava val listOffsetRequest = ListOffsetRequest.Builder.forConsumer(true, isolationLevel) - .setTargetTimes(targetTimes.asJava).build() + .setTargetTimes(targetTimes).build() val request = buildRequest(listOffsetRequest) createKafkaApis().handleListOffsetRequest(request) val response = readResponse(ApiKeys.LIST_OFFSETS, listOffsetRequest, capturedResponse).asInstanceOf[ListOffsetResponse] - assertTrue(response.responseData.containsKey(tp)) + val partitionDataOptional = response.topics.asScala.find(_.name == tp.topic).get + .partitions.asScala.find(_.partitionIndex == tp.partition) + assertTrue(partitionDataOptional.isDefined) - val partitionData = response.responseData.get(tp) - assertEquals(Errors.NONE, partitionData.error) + val partitionData = partitionDataOptional.get + assertEquals(Errors.NONE.code, partitionData.errorCode) assertEquals(latestOffset, partitionData.offset) assertEquals(ListOffsetResponse.UNKNOWN_TIMESTAMP, partitionData.timestamp) } diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index 2600cceccfe..07ff804b82e 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -19,6 +19,7 @@ package kafka.server import java.util.Optional import kafka.utils.TestUtils +import org.apache.kafka.common.message.ListOffsetRequestData.{ListOffsetPartition, ListOffsetTopic} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse} import org.apache.kafka.common.{IsolationLevel, TopicPartition} @@ -33,8 +34,12 @@ class ListOffsetsRequestTest extends BaseRequestTest { def testListOffsetsErrorCodes(): Unit = { val topic = "topic" val partition = new TopicPartition(topic, 0) - val targetTimes = Map(partition -> new ListOffsetRequest.PartitionData( - ListOffsetRequest.EARLIEST_TIMESTAMP, Optional.of[Integer](0))).asJava + val targetTimes = List(new ListOffsetTopic() + .setName(topic) + .setPartitions(List(new ListOffsetPartition() + .setPartitionIndex(partition.partition) + .setTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP) + .setCurrentLeaderEpoch(0)).asJava)).asJava val consumerRequest = ListOffsetRequest.Builder .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) @@ -82,8 +87,14 @@ class ListOffsetsRequestTest extends BaseRequestTest { val firstLeaderId = partitionToLeader(topicPartition.partition) def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = { - val targetTimes = Map(topicPartition -> new ListOffsetRequest.PartitionData( - ListOffsetRequest.EARLIEST_TIMESTAMP, currentLeaderEpoch)).asJava + val partition = new ListOffsetPartition() + .setPartitionIndex(topicPartition.partition) + .setTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP) + if (currentLeaderEpoch.isPresent) + partition.setCurrentLeaderEpoch(currentLeaderEpoch.get) + val targetTimes = List(new ListOffsetTopic() + .setName(topic) + .setPartitions(List(partition).asJava)).asJava val request = ListOffsetRequest.Builder .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) .setTargetTimes(targetTimes) @@ -122,8 +133,11 @@ class ListOffsetsRequestTest extends BaseRequestTest { def fetchOffsetAndEpoch(serverId: Int, timestamp: Long): (Long, Int) = { - val targetTimes = Map(topicPartition -> new ListOffsetRequest.PartitionData( - timestamp, Optional.empty[Integer]())).asJava + val targetTimes = List(new ListOffsetTopic() + .setName(topic) + .setPartitions(List(new ListOffsetPartition() + .setPartitionIndex(topicPartition.partition) + .setTimestamp(timestamp)).asJava)).asJava val request = ListOffsetRequest.Builder .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) @@ -131,11 +145,10 @@ class ListOffsetsRequestTest extends BaseRequestTest { .build() val response = sendRequest(serverId, request) - val partitionData = response.responseData.get(topicPartition) - val epochOpt = partitionData.leaderEpoch - assertTrue(epochOpt.isPresent) + val partitionData = response.topics.asScala.find(_.name == topic).get + .partitions.asScala.find(_.partitionIndex == topicPartition.partition).get - (partitionData.offset, epochOpt.get) + (partitionData.offset, partitionData.leaderEpoch) } assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L)) @@ -157,9 +170,11 @@ class ListOffsetsRequestTest extends BaseRequestTest { private def assertResponseError(error: Errors, brokerId: Int, request: ListOffsetRequest): Unit = { val response = sendRequest(brokerId, request) - assertEquals(request.partitionTimestamps.size, response.responseData.size) - response.responseData.asScala.values.foreach { partitionData => - assertEquals(error, partitionData.error) + assertEquals(request.topics.size, response.topics.size) + response.topics.asScala.foreach { topic => + topic.partitions.asScala.foreach { partition => + assertEquals(error.code, partition.errorCode) + } } } diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 8e848ad1680..e89a69854ca 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -23,6 +23,10 @@ import java.util.{Optional, Properties, Random} import kafka.log.{ClientRecordDeletion, Log, LogSegment} import kafka.utils.{MockTime, TestUtils} +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse} @@ -32,6 +36,7 @@ import org.junit.Assert._ import org.junit.Test import scala.jdk.CollectionConverters._ +import scala.collection.mutable.Buffer class LogOffsetTest extends BaseRequestTest { @@ -54,10 +59,9 @@ class LogOffsetTest extends BaseRequestTest { def testGetOffsetsForUnknownTopic(): Unit = { val topicPartition = new TopicPartition("foo", 0) val request = ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED) - .setTargetTimes(Map(topicPartition -> - new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 10)).asJava).build(0) + .setTargetTimes(buildTargetTimes(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP, 10).asJava).build(0) val response = sendListOffsetsRequest(request) - assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.responseData.get(topicPartition).error) + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, findPartition(response.topics.asScala, topicPartition).errorCode) } @deprecated("ListOffsetsRequest V0", since = "") @@ -87,9 +91,8 @@ class LogOffsetTest extends BaseRequestTest { TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server), "Leader should be elected") val request = ListOffsetRequest.Builder.forReplica(0, 0) - .setTargetTimes(Map(topicPartition -> - new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 15)).asJava).build() - val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala + .setTargetTimes(buildTargetTimes(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP, 15).asJava).build() + val consumerOffsets = findPartition(sendListOffsetsRequest(request).topics.asScala, topicPartition).oldStyleOffsets.asScala assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets) } @@ -115,9 +118,8 @@ class LogOffsetTest extends BaseRequestTest { TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server), "Leader should be elected") val request = ListOffsetRequest.Builder.forReplica(0, 0) - .setTargetTimes(Map(topicPartition -> - new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 15)).asJava).build() - val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala + .setTargetTimes(buildTargetTimes(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP, 15).asJava).build() + val consumerOffsets = findPartition(sendListOffsetsRequest(request).topics.asScala, topicPartition).oldStyleOffsets.asScala assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), consumerOffsets) // try to fetch using latest offset @@ -143,9 +145,8 @@ class LogOffsetTest extends BaseRequestTest { for (_ <- 1 to 14) { val topicPartition = new TopicPartition(topic, 0) val request = ListOffsetRequest.Builder.forReplica(0, 0) - .setTargetTimes(Map(topicPartition -> - new ListOffsetRequest.PartitionData(ListOffsetRequest.EARLIEST_TIMESTAMP, 1)).asJava).build() - val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala + .setTargetTimes(buildTargetTimes(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP, 1).asJava).build() + val consumerOffsets = findPartition(sendListOffsetsRequest(request).topics.asScala, topicPartition).oldStyleOffsets.asScala if (consumerOffsets.head == 1) offsetChanged = true } @@ -176,9 +177,8 @@ class LogOffsetTest extends BaseRequestTest { TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server), "Leader should be elected") val request = ListOffsetRequest.Builder.forReplica(0, 0) - .setTargetTimes(Map(topicPartition -> - new ListOffsetRequest.PartitionData(now, 15)).asJava).build() - val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala + .setTargetTimes(buildTargetTimes(topicPartition, now, 15).asJava).build() + val consumerOffsets = findPartition(sendListOffsetsRequest(request).topics.asScala, topicPartition).oldStyleOffsets.asScala assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), consumerOffsets) } @@ -204,9 +204,8 @@ class LogOffsetTest extends BaseRequestTest { TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server), "Leader should be elected") val request = ListOffsetRequest.Builder.forReplica(0, 0) - .setTargetTimes(Map(topicPartition -> - new ListOffsetRequest.PartitionData(ListOffsetRequest.EARLIEST_TIMESTAMP, 10)).asJava).build() - val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala + .setTargetTimes(buildTargetTimes(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP, 10).asJava).build() + val consumerOffsets = findPartition(sendListOffsetsRequest(request).topics.asScala, topicPartition).oldStyleOffsets.asScala assertEquals(Seq(0L), consumerOffsets) } @@ -256,4 +255,19 @@ class LogOffsetTest extends BaseRequestTest { connectAndReceive[FetchResponse[MemoryRecords]](request) } + private def buildTargetTimes(tp: TopicPartition, timestamp: Long, maxNumOffsets: Int): List[ListOffsetTopic] = { + List(new ListOffsetTopic() + .setName(tp.topic) + .setPartitions(List(new ListOffsetPartition() + .setPartitionIndex(tp.partition) + .setTimestamp(timestamp) + .setMaxNumOffsets(maxNumOffsets)).asJava) + ) + } + + private def findPartition(topics: Buffer[ListOffsetTopicResponse], tp: TopicPartition): ListOffsetPartitionResponse = { + topics.find(_.name == tp.topic).get + .partitions.asScala.find(_.partitionIndex == tp.partition).get + } + } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 8de651137dd..c6ec05430ee 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -31,6 +31,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity +import org.apache.kafka.common.message.ListOffsetRequestData.{ListOffsetPartition, ListOffsetTopic} import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState} import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} import org.apache.kafka.common.message._ @@ -221,9 +222,14 @@ class RequestQuotaTest extends BaseRequestTest { new MetadataRequest.Builder(List(topic).asJava, true) case ApiKeys.LIST_OFFSETS => + val topic = new ListOffsetTopic() + .setName(tp.topic) + .setPartitions(List(new ListOffsetPartition() + .setPartitionIndex(tp.partition) + .setTimestamp(0L) + .setCurrentLeaderEpoch(15)).asJava) ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED) - .setTargetTimes(Map(tp -> new ListOffsetRequest.PartitionData( - 0L, Optional.of[Integer](15))).asJava) + .setTargetTimes(List(topic).asJava) case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, brokerId, Int.MaxValue, Long.MaxValue,