MINOR: use 'mapKey' to avoid unnecessary grouped data (#10082)

1. add 'mapKey=true' to DescribeLogDirsRequest
2. rename PartitionIndex to Partitions for DescribeLogDirsRequest
3. add 'mapKey=true' to ElectLeadersRequest
4. rename PartitionId to Partitions for ElectLeadersRequest
5. add 'mapKey=true' to ConsumerProtocolAssignment

Reviewers: David Jacot <djacot@confluent.io>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Chia-Ping Tsai 2021-02-17 12:14:08 +08:00 committed by GitHub
parent da58ce4065
commit f5c2f608b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 39 additions and 36 deletions

View File

@ -2578,13 +2578,13 @@ public class KafkaAdminClient extends AdminClient {
brokerId -> new DescribeLogDirsRequestData());
DescribableLogDirTopic describableLogDirTopic = requestData.topics().find(replica.topic());
if (describableLogDirTopic == null) {
List<Integer> partitionIndex = new ArrayList<>();
partitionIndex.add(replica.partition());
List<Integer> partitions = new ArrayList<>();
partitions.add(replica.partition());
describableLogDirTopic = new DescribableLogDirTopic().setTopic(replica.topic())
.setPartitionIndex(partitionIndex);
.setPartitions(partitions);
requestData.topics().add(describableLogDirTopic);
} else {
describableLogDirTopic.partitionIndex().add(replica.partition());
describableLogDirTopic.partitions().add(replica.partition());
}
}
@ -2594,7 +2594,7 @@ public class KafkaAdminClient extends AdminClient {
final DescribeLogDirsRequestData topicPartitions = entry.getValue();
final Map<TopicPartition, ReplicaLogDirInfo> replicaDirInfoByPartition = new HashMap<>();
for (DescribableLogDirTopic topicPartition: topicPartitions.topics()) {
for (Integer partitionId : topicPartition.partitionIndex()) {
for (Integer partitionId : topicPartition.partitions()) {
replicaDirInfoByPartition.put(new TopicPartition(topicPartition.topic(), partitionId), new ReplicaLogDirInfo());
}
}

View File

@ -25,12 +25,10 @@ import org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* ConsumerProtocol contains the schemas for consumer subscriptions and assignments for use with
@ -74,13 +72,14 @@ public class ConsumerProtocol {
ConsumerProtocolSubscription data = new ConsumerProtocolSubscription();
data.setTopics(subscription.topics());
data.setUserData(subscription.userData() != null ? subscription.userData().duplicate() : null);
Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupPartitionsByTopic(subscription.ownedPartitions());
for (Map.Entry<String, List<Integer>> topicEntry : partitionsByTopic.entrySet()) {
data.ownedPartitions().add(new ConsumerProtocolSubscription.TopicPartition()
.setTopic(topicEntry.getKey())
.setPartitions(topicEntry.getValue()));
}
subscription.ownedPartitions().forEach(tp -> {
ConsumerProtocolSubscription.TopicPartition partition = data.ownedPartitions().find(tp.topic());
if (partition == null) {
partition = new ConsumerProtocolSubscription.TopicPartition().setTopic(tp.topic());
data.ownedPartitions().add(partition);
}
partition.partitions().add(tp.partition());
});
return MessageUtil.toVersionPrefixedByteBuffer(version, data);
}
@ -120,13 +119,14 @@ public class ConsumerProtocol {
ConsumerProtocolAssignment data = new ConsumerProtocolAssignment();
data.setUserData(assignment.userData() != null ? assignment.userData().duplicate() : null);
Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupPartitionsByTopic(assignment.partitions());
for (Map.Entry<String, List<Integer>> topicEntry : partitionsByTopic.entrySet()) {
data.assignedPartitions().add(new ConsumerProtocolAssignment.TopicPartition()
.setTopic(topicEntry.getKey())
.setPartitions(topicEntry.getValue()));
}
assignment.partitions().forEach(tp -> {
ConsumerProtocolAssignment.TopicPartition partition = data.assignedPartitions().find(tp.topic());
if (partition == null) {
partition = new ConsumerProtocolAssignment.TopicPartition().setTopic(tp.topic());
data.assignedPartitions().add(partition);
}
partition.partitions().add(tp.partition());
});
return MessageUtil.toVersionPrefixedByteBuffer(version, data);
}

View File

@ -21,7 +21,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
@ -32,7 +31,6 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionR
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.utils.CollectionUtils;
public class ElectLeadersRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ElectLeadersRequest> {
@ -70,9 +68,14 @@ public class ElectLeadersRequest extends AbstractRequest {
.setTimeoutMs(timeoutMs);
if (topicPartitions != null) {
for (Map.Entry<String, List<Integer>> tp : CollectionUtils.groupPartitionsByTopic(topicPartitions).entrySet()) {
data.topicPartitions().add(new ElectLeadersRequestData.TopicPartitions().setTopic(tp.getKey()).setPartitionId(tp.getValue()));
}
topicPartitions.forEach(tp -> {
ElectLeadersRequestData.TopicPartitions tps = data.topicPartitions().find(tp.topic());
if (tps == null) {
tps = new ElectLeadersRequestData.TopicPartitions().setTopic(tp.topic());
data.topicPartitions().add(tps);
}
tps.partitions().add(tp.partition());
});
} else {
data.setTopicPartitions(null);
}
@ -104,7 +107,7 @@ public class ElectLeadersRequest extends AbstractRequest {
ReplicaElectionResult electionResult = new ReplicaElectionResult();
electionResult.setTopic(topic.topic());
for (Integer partitionId : topic.partitionId()) {
for (Integer partitionId : topic.partitions()) {
PartitionResult partitionResult = new PartitionResult();
partitionResult.setPartitionId(partitionId);
partitionResult.setErrorCode(apiError.error().code());

View File

@ -25,7 +25,7 @@
"fields": [
{ "name": "AssignedPartitions", "type": "[]TopicPartition", "versions": "0+",
"fields": [
{ "name": "Topic", "type": "string", "versions": "0+" },
{ "name": "Topic", "type": "string", "mapKey": true, "versions": "0+" },
{ "name": "Partitions", "type": "[]int32", "versions": "0+" }
]
},

View File

@ -28,7 +28,7 @@
"default": "null", "zeroCopy": true },
{ "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+", "ignorable": true,
"fields": [
{ "name": "Topic", "type": "string", "versions": "1+" },
{ "name": "Topic", "type": "string", "mapKey": true, "versions": "1+" },
{ "name": "Partitions", "type": "[]int32", "versions": "1+"}
]
}

View File

@ -26,7 +26,7 @@
"about": "Each topic that we want to describe log directories for, or null for all topics.", "fields": [
{ "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
"about": "The topic name" },
{ "name": "PartitionIndex", "type": "[]int32", "versions": "0+",
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partition indxes." }
]}
]

View File

@ -28,9 +28,9 @@
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
"about": "The topic partitions to elect leaders.",
"fields": [
{ "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
{ "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
"about": "The name of a topic." },
{ "name": "PartitionId", "type": "[]int32", "versions": "0+",
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions of this topic whose leader should be elected." }
]
},

View File

@ -28,7 +28,7 @@ package object api {
Set.empty
} else {
self.data.topicPartitions.asScala.iterator.flatMap { topicPartition =>
topicPartition.partitionId.asScala.map { partitionId =>
topicPartition.partitions.asScala.map { partitionId =>
new TopicPartition(topicPartition.topic, partitionId)
}
}.toSet

View File

@ -2772,7 +2772,7 @@ class KafkaApis(val requestChannel: RequestChannel,
replicaManager.logManager.allLogs.map(_.topicPartition).toSet
else
describeLogDirsDirRequest.data.topics.asScala.flatMap(
logDirTopic => logDirTopic.partitionIndex.asScala.map(partitionIndex =>
logDirTopic => logDirTopic.partitions.asScala.map(partitionIndex =>
new TopicPartition(logDirTopic.topic, partitionIndex))).toSet
replicaManager.describeLogDirs(partitions)

View File

@ -609,7 +609,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def describeLogDirsRequest = new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(new DescribeLogDirsRequestData.DescribableLogDirTopicCollection(Collections.singleton(
new DescribeLogDirsRequestData.DescribableLogDirTopic().setTopic(tp.topic).setPartitionIndex(Collections.singletonList(tp.partition))).iterator()))).build()
new DescribeLogDirsRequestData.DescribableLogDirTopic().setTopic(tp.topic).setPartitions(Collections.singletonList(tp.partition))).iterator()))).build()
private def addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(transactionalId, 1, 1, Collections.singletonList(tp)).build()

View File

@ -507,7 +507,7 @@ class RequestQuotaTest extends BaseRequestTest {
val data = new DescribeLogDirsRequestData()
data.topics.add(new DescribeLogDirsRequestData.DescribableLogDirTopic()
.setTopic(tp.topic)
.setPartitionIndex(Collections.singletonList(tp.partition)))
.setPartitions(Collections.singletonList(tp.partition)))
new DescribeLogDirsRequest.Builder(data)
case ApiKeys.CREATE_PARTITIONS =>