mirror of https://github.com/apache/kafka.git
MINOR: refactor FetchResponse#toMessage to avoid creating unnecessary collections (#9818)
Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
parent
e3ce4a6e11
commit
c9afd2db01
|
@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.Utils;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -144,30 +143,6 @@ public class FetchRequest extends AbstractRequest {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class TopicAndPartitionData<T> {
|
|
||||||
public final String topic;
|
|
||||||
public final LinkedHashMap<Integer, T> partitions;
|
|
||||||
|
|
||||||
public TopicAndPartitionData(String topic) {
|
|
||||||
this.topic = topic;
|
|
||||||
this.partitions = new LinkedHashMap<>();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <T> List<TopicAndPartitionData<T>> batchByTopic(Iterator<Map.Entry<TopicPartition, T>> iter) {
|
|
||||||
List<TopicAndPartitionData<T>> topics = new ArrayList<>();
|
|
||||||
while (iter.hasNext()) {
|
|
||||||
Map.Entry<TopicPartition, T> topicEntry = iter.next();
|
|
||||||
String topic = topicEntry.getKey().topic();
|
|
||||||
int partition = topicEntry.getKey().partition();
|
|
||||||
T partitionData = topicEntry.getValue();
|
|
||||||
if (topics.isEmpty() || !topics.get(topics.size() - 1).topic.equals(topic))
|
|
||||||
topics.add(new TopicAndPartitionData<T>(topic));
|
|
||||||
topics.get(topics.size() - 1).partitions.put(partition, partitionData);
|
|
||||||
}
|
|
||||||
return topics;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class Builder extends AbstractRequest.Builder<FetchRequest> {
|
public static class Builder extends AbstractRequest.Builder<FetchRequest> {
|
||||||
private final int maxWait;
|
private final int maxWait;
|
||||||
private final int minBytes;
|
private final int minBytes;
|
||||||
|
|
|
@ -331,28 +331,31 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
|
||||||
private static <T extends BaseRecords> FetchResponseData toMessage(int throttleTimeMs, Errors error,
|
private static <T extends BaseRecords> FetchResponseData toMessage(int throttleTimeMs, Errors error,
|
||||||
Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator,
|
Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator,
|
||||||
int sessionId) {
|
int sessionId) {
|
||||||
FetchResponseData message = new FetchResponseData();
|
|
||||||
message.setThrottleTimeMs(throttleTimeMs);
|
|
||||||
message.setErrorCode(error.code());
|
|
||||||
message.setSessionId(sessionId);
|
|
||||||
|
|
||||||
List<FetchResponseData.FetchableTopicResponse> topicResponseList = new ArrayList<>();
|
List<FetchResponseData.FetchableTopicResponse> topicResponseList = new ArrayList<>();
|
||||||
List<FetchRequest.TopicAndPartitionData<PartitionData<T>>> topicsData =
|
partIterator.forEachRemaining(entry -> {
|
||||||
FetchRequest.TopicAndPartitionData.batchByTopic(partIterator);
|
PartitionData<T> partitionData = entry.getValue();
|
||||||
topicsData.forEach(partitionDataTopicAndPartitionData -> {
|
// Since PartitionData alone doesn't know the partition ID, we set it here
|
||||||
List<FetchResponseData.FetchablePartitionResponse> partitionResponses = new ArrayList<>();
|
partitionData.partitionResponse.setPartition(entry.getKey().partition());
|
||||||
partitionDataTopicAndPartitionData.partitions.forEach((partitionId, partitionData) -> {
|
// We have to keep the order of input topic-partition. Hence, we batch the partitions only if the last
|
||||||
// Since PartitionData alone doesn't know the partition ID, we set it here
|
// batch is in the same topic group.
|
||||||
partitionData.partitionResponse.setPartition(partitionId);
|
FetchResponseData.FetchableTopicResponse previousTopic = topicResponseList.isEmpty() ? null
|
||||||
|
: topicResponseList.get(topicResponseList.size() - 1);
|
||||||
|
if (previousTopic != null && previousTopic.topic().equals(entry.getKey().topic()))
|
||||||
|
previousTopic.partitionResponses().add(partitionData.partitionResponse);
|
||||||
|
else {
|
||||||
|
List<FetchResponseData.FetchablePartitionResponse> partitionResponses = new ArrayList<>();
|
||||||
partitionResponses.add(partitionData.partitionResponse);
|
partitionResponses.add(partitionData.partitionResponse);
|
||||||
});
|
topicResponseList.add(new FetchResponseData.FetchableTopicResponse()
|
||||||
topicResponseList.add(new FetchResponseData.FetchableTopicResponse()
|
.setTopic(entry.getKey().topic())
|
||||||
.setTopic(partitionDataTopicAndPartitionData.topic)
|
.setPartitionResponses(partitionResponses));
|
||||||
.setPartitionResponses(partitionResponses));
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
message.setResponses(topicResponseList);
|
return new FetchResponseData()
|
||||||
return message;
|
.setThrottleTimeMs(throttleTimeMs)
|
||||||
|
.setErrorCode(error.code())
|
||||||
|
.setSessionId(sessionId)
|
||||||
|
.setResponses(topicResponseList);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue