From f5c2f608b05c8094a898c14a6c4a3238f6388df1 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Wed, 17 Feb 2021 12:14:08 +0800 Subject: [PATCH] MINOR: use 'mapKey' to avoid unnecessary grouped data (#10082) 1. add 'mapKey=true' to DescribeLogDirsRequest 2. rename PartitionIndex to Partitions for DescribeLogDirsRequest 3. add 'mapKey=true' to ElectLeadersRequest 4. rename PartitionId to Partitions for ElectLeadersRequest 5. add 'mapKey=true' to ConsumerProtocolAssignment Reviewers: David Jacot , Ismael Juma --- .../kafka/clients/admin/KafkaAdminClient.java | 10 +++--- .../consumer/internals/ConsumerProtocol.java | 32 +++++++++---------- .../common/requests/ElectLeadersRequest.java | 15 +++++---- .../message/ConsumerProtocolAssignment.json | 2 +- .../message/ConsumerProtocolSubscription.json | 2 +- .../message/DescribeLogDirsRequest.json | 2 +- .../common/message/ElectLeadersRequest.json | 4 +-- core/src/main/scala/kafka/api/package.scala | 2 +- .../main/scala/kafka/server/KafkaApis.scala | 2 +- .../kafka/api/AuthorizerIntegrationTest.scala | 2 +- .../unit/kafka/server/RequestQuotaTest.scala | 2 +- 11 files changed, 39 insertions(+), 36 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 3cad32800b7..3296fb8be5f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2578,13 +2578,13 @@ public class KafkaAdminClient extends AdminClient { brokerId -> new DescribeLogDirsRequestData()); DescribableLogDirTopic describableLogDirTopic = requestData.topics().find(replica.topic()); if (describableLogDirTopic == null) { - List partitionIndex = new ArrayList<>(); - partitionIndex.add(replica.partition()); + List partitions = new ArrayList<>(); + partitions.add(replica.partition()); describableLogDirTopic = new DescribableLogDirTopic().setTopic(replica.topic()) - .setPartitionIndex(partitionIndex); + .setPartitions(partitions); requestData.topics().add(describableLogDirTopic); } else { - describableLogDirTopic.partitionIndex().add(replica.partition()); + describableLogDirTopic.partitions().add(replica.partition()); } } @@ -2594,7 +2594,7 @@ public class KafkaAdminClient extends AdminClient { final DescribeLogDirsRequestData topicPartitions = entry.getValue(); final Map replicaDirInfoByPartition = new HashMap<>(); for (DescribableLogDirTopic topicPartition: topicPartitions.topics()) { - for (Integer partitionId : topicPartition.partitionIndex()) { + for (Integer partitionId : topicPartition.partitions()) { replicaDirInfoByPartition.put(new TopicPartition(topicPartition.topic(), partitionId), new ReplicaLogDirInfo()); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java index a05e8715eae..a9c74301429 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java @@ -25,12 +25,10 @@ import org.apache.kafka.common.message.ConsumerProtocolSubscription; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.common.protocol.types.SchemaException; -import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.Map; /** * ConsumerProtocol contains the schemas for consumer subscriptions and assignments for use with @@ -74,13 +72,14 @@ public class ConsumerProtocol { ConsumerProtocolSubscription data = new ConsumerProtocolSubscription(); data.setTopics(subscription.topics()); data.setUserData(subscription.userData() != null ? subscription.userData().duplicate() : null); - Map> partitionsByTopic = CollectionUtils.groupPartitionsByTopic(subscription.ownedPartitions()); - for (Map.Entry> topicEntry : partitionsByTopic.entrySet()) { - data.ownedPartitions().add(new ConsumerProtocolSubscription.TopicPartition() - .setTopic(topicEntry.getKey()) - .setPartitions(topicEntry.getValue())); - } - + subscription.ownedPartitions().forEach(tp -> { + ConsumerProtocolSubscription.TopicPartition partition = data.ownedPartitions().find(tp.topic()); + if (partition == null) { + partition = new ConsumerProtocolSubscription.TopicPartition().setTopic(tp.topic()); + data.ownedPartitions().add(partition); + } + partition.partitions().add(tp.partition()); + }); return MessageUtil.toVersionPrefixedByteBuffer(version, data); } @@ -120,13 +119,14 @@ public class ConsumerProtocol { ConsumerProtocolAssignment data = new ConsumerProtocolAssignment(); data.setUserData(assignment.userData() != null ? assignment.userData().duplicate() : null); - Map> partitionsByTopic = CollectionUtils.groupPartitionsByTopic(assignment.partitions()); - for (Map.Entry> topicEntry : partitionsByTopic.entrySet()) { - data.assignedPartitions().add(new ConsumerProtocolAssignment.TopicPartition() - .setTopic(topicEntry.getKey()) - .setPartitions(topicEntry.getValue())); - } - + assignment.partitions().forEach(tp -> { + ConsumerProtocolAssignment.TopicPartition partition = data.assignedPartitions().find(tp.topic()); + if (partition == null) { + partition = new ConsumerProtocolAssignment.TopicPartition().setTopic(tp.topic()); + data.assignedPartitions().add(partition); + } + partition.partitions().add(tp.partition()); + }); return MessageUtil.toVersionPrefixedByteBuffer(version, data); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java index 92f6b45eed5..1bc888af605 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java @@ -21,7 +21,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -32,7 +31,6 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionR import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.MessageUtil; -import org.apache.kafka.common.utils.CollectionUtils; public class ElectLeadersRequest extends AbstractRequest { public static class Builder extends AbstractRequest.Builder { @@ -70,9 +68,14 @@ public class ElectLeadersRequest extends AbstractRequest { .setTimeoutMs(timeoutMs); if (topicPartitions != null) { - for (Map.Entry> tp : CollectionUtils.groupPartitionsByTopic(topicPartitions).entrySet()) { - data.topicPartitions().add(new ElectLeadersRequestData.TopicPartitions().setTopic(tp.getKey()).setPartitionId(tp.getValue())); - } + topicPartitions.forEach(tp -> { + ElectLeadersRequestData.TopicPartitions tps = data.topicPartitions().find(tp.topic()); + if (tps == null) { + tps = new ElectLeadersRequestData.TopicPartitions().setTopic(tp.topic()); + data.topicPartitions().add(tps); + } + tps.partitions().add(tp.partition()); + }); } else { data.setTopicPartitions(null); } @@ -104,7 +107,7 @@ public class ElectLeadersRequest extends AbstractRequest { ReplicaElectionResult electionResult = new ReplicaElectionResult(); electionResult.setTopic(topic.topic()); - for (Integer partitionId : topic.partitionId()) { + for (Integer partitionId : topic.partitions()) { PartitionResult partitionResult = new PartitionResult(); partitionResult.setPartitionId(partitionId); partitionResult.setErrorCode(apiError.error().code()); diff --git a/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json b/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json index 2ad373a5659..544db20b47e 100644 --- a/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json +++ b/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json @@ -25,7 +25,7 @@ "fields": [ { "name": "AssignedPartitions", "type": "[]TopicPartition", "versions": "0+", "fields": [ - { "name": "Topic", "type": "string", "versions": "0+" }, + { "name": "Topic", "type": "string", "mapKey": true, "versions": "0+" }, { "name": "Partitions", "type": "[]int32", "versions": "0+" } ] }, diff --git a/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json b/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json index fa4c371d70d..207dac79fbc 100644 --- a/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json +++ b/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json @@ -28,7 +28,7 @@ "default": "null", "zeroCopy": true }, { "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+", "ignorable": true, "fields": [ - { "name": "Topic", "type": "string", "versions": "1+" }, + { "name": "Topic", "type": "string", "mapKey": true, "versions": "1+" }, { "name": "Partitions", "type": "[]int32", "versions": "1+"} ] } diff --git a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json index 577f2eb4cf6..c498e0f2223 100644 --- a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json +++ b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json @@ -26,7 +26,7 @@ "about": "Each topic that we want to describe log directories for, or null for all topics.", "fields": [ { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true, "about": "The topic name" }, - { "name": "PartitionIndex", "type": "[]int32", "versions": "0+", + { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partition indxes." } ]} ] diff --git a/clients/src/main/resources/common/message/ElectLeadersRequest.json b/clients/src/main/resources/common/message/ElectLeadersRequest.json index 5b86c96b04d..a2ba2bdca73 100644 --- a/clients/src/main/resources/common/message/ElectLeadersRequest.json +++ b/clients/src/main/resources/common/message/ElectLeadersRequest.json @@ -28,9 +28,9 @@ { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", "about": "The topic partitions to elect leaders.", "fields": [ - { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true, "about": "The name of a topic." }, - { "name": "PartitionId", "type": "[]int32", "versions": "0+", + { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions of this topic whose leader should be elected." } ] }, diff --git a/core/src/main/scala/kafka/api/package.scala b/core/src/main/scala/kafka/api/package.scala index 11a956d40b5..e0678f810ff 100644 --- a/core/src/main/scala/kafka/api/package.scala +++ b/core/src/main/scala/kafka/api/package.scala @@ -28,7 +28,7 @@ package object api { Set.empty } else { self.data.topicPartitions.asScala.iterator.flatMap { topicPartition => - topicPartition.partitionId.asScala.map { partitionId => + topicPartition.partitions.asScala.map { partitionId => new TopicPartition(topicPartition.topic, partitionId) } }.toSet diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 5d71c8e056b..ffb1b8e7445 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2772,7 +2772,7 @@ class KafkaApis(val requestChannel: RequestChannel, replicaManager.logManager.allLogs.map(_.topicPartition).toSet else describeLogDirsDirRequest.data.topics.asScala.flatMap( - logDirTopic => logDirTopic.partitionIndex.asScala.map(partitionIndex => + logDirTopic => logDirTopic.partitions.asScala.map(partitionIndex => new TopicPartition(logDirTopic.topic, partitionIndex))).toSet replicaManager.describeLogDirs(partitions) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index fec75eb5ef9..66f25eeaac1 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -609,7 +609,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } private def describeLogDirsRequest = new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(new DescribeLogDirsRequestData.DescribableLogDirTopicCollection(Collections.singleton( - new DescribeLogDirsRequestData.DescribableLogDirTopic().setTopic(tp.topic).setPartitionIndex(Collections.singletonList(tp.partition))).iterator()))).build() + new DescribeLogDirsRequestData.DescribableLogDirTopic().setTopic(tp.topic).setPartitions(Collections.singletonList(tp.partition))).iterator()))).build() private def addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(transactionalId, 1, 1, Collections.singletonList(tp)).build() diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 5b1363f6aec..1924034ffdf 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -507,7 +507,7 @@ class RequestQuotaTest extends BaseRequestTest { val data = new DescribeLogDirsRequestData() data.topics.add(new DescribeLogDirsRequestData.DescribableLogDirTopic() .setTopic(tp.topic) - .setPartitionIndex(Collections.singletonList(tp.partition))) + .setPartitions(Collections.singletonList(tp.partition))) new DescribeLogDirsRequest.Builder(data) case ApiKeys.CREATE_PARTITIONS =>