From 3709901c9ee0f61277b122a4863fea46f039335f Mon Sep 17 00:00:00 2001 From: Ritika Reddy <98577846+rreddy-22@users.noreply.github.com> Date: Fri, 28 Jul 2023 10:30:04 -0700 Subject: [PATCH] KAFKA-14702: Extend server side assignor to support rack aware replica placement (#14099) This patch updates the `PartitionAssignor` interface to support rack-awareness. The change introduces the `SubscribedTopicDescriber` interface that can be used to retrieve topic metadata such as the number of partitions or the racks from within an assignor. We use an interface because it allows us to wrap internal data structures instead of having to copy them. It is more efficient. Reviewers: David Jacot --- .../group/GroupMetadataManager.java | 6 +- .../coordinator/group/RecordHelpers.java | 17 +- .../group/assignor/AssignmentSpec.java | 34 +-- .../assignor/AssignmentTopicMetadata.java | 59 ---- .../group/assignor/PartitionAssignor.java | 9 +- .../group/assignor/RangeAssignor.java | 36 +-- .../assignor/SubscribedTopicDescriber.java | 51 ++++ .../group/consumer/ConsumerGroup.java | 38 ++- .../consumer/SubscribedTopicMetadata.java | 98 +++++++ .../consumer/TargetAssignmentBuilder.java | 16 +- .../group/consumer/TopicMetadata.java | 41 ++- .../ConsumerGroupPartitionMetadataValue.json | 9 +- .../group/GroupMetadataManagerTest.java | 164 +++++------ .../coordinator/group/RecordHelpersTest.java | 247 ++++++++++++++++- .../group/assignor/RangeAssignorTest.java | 261 ++++++++++++++---- .../group/consumer/ConsumerGroupTest.java | 66 +++-- .../consumer/SubscribedTopicMetadataTest.java | 74 +++++ .../consumer/TargetAssignmentBuilderTest.java | 66 ++--- .../group/consumer/TopicMetadataTest.java | 28 +- 19 files changed, 959 insertions(+), 361 deletions(-) delete mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicMetadata.java create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SubscribedTopicDescriber.java create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadataTest.java diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 1b490c55e10..09857750c48 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -792,7 +792,8 @@ public class GroupMetadataManager { subscriptionMetadata = group.computeSubscriptionMetadata( member, updatedMember, - metadataImage.topics() + metadataImage.topics(), + metadataImage.cluster() ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { @@ -942,7 +943,8 @@ public class GroupMetadataManager { Map subscriptionMetadata = group.computeSubscriptionMetadata( member, null, - metadataImage.topics() + metadataImage.topics(), + metadataImage.cluster() ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java index 13fcabe023d..c9bae675c2e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java @@ -131,13 +131,24 @@ public class RecordHelpers { Map newSubscriptionMetadata ) { ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue(); - newSubscriptionMetadata.forEach((topicName, topicMetadata) -> + newSubscriptionMetadata.forEach((topicName, topicMetadata) -> { + List partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> + partitionMetadata.add(new ConsumerGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(new ArrayList<>(racks)) + ) + ); + } value.topics().add(new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(topicMetadata.id()) .setTopicName(topicMetadata.name()) .setNumPartitions(topicMetadata.numPartitions()) - ) - ); + .setPartitionMetadata(partitionMetadata) + ); + }); return new Record( new ApiMessageAndVersion( diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java index bf775439077..0518d9cbd2a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.coordinator.group.assignor; -import org.apache.kafka.common.Uuid; - import java.util.Map; import java.util.Objects; @@ -26,59 +24,39 @@ import java.util.Objects; */ public class AssignmentSpec { /** - * The members keyed by member id. + * The member metadata keyed by member Id. */ private final Map members; - /** - * The topics' metadata keyed by topic id. - */ - private final Map topics; - public AssignmentSpec( - Map members, - Map topics + Map members ) { Objects.requireNonNull(members); - Objects.requireNonNull(topics); this.members = members; - this.topics = topics; } /** - * @return Member metadata keyed by member Ids. + * @return Member metadata keyed by member Id. */ public Map members() { return members; } - /** - * @return Topic metadata keyed by topic Ids. - */ - public Map topics() { - return topics; - } - @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; AssignmentSpec that = (AssignmentSpec) o; - if (!members.equals(that.members)) return false; - return topics.equals(that.topics); + return members.equals(that.members); } @Override public int hashCode() { - int result = members.hashCode(); - result = 31 * result + topics.hashCode(); - return result; + return members.hashCode(); } @Override public String toString() { - return "AssignmentSpec(members=" + members + - ", topics=" + topics + - ')'; + return "AssignmentSpec(members=" + members + ')'; } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicMetadata.java deleted file mode 100644 index e5a96e79b2c..00000000000 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicMetadata.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.coordinator.group.assignor; - -/** - * Metadata of a topic. - */ -public class AssignmentTopicMetadata { - - /** - * The number of partitions. - */ - private final int numPartitions; - - public AssignmentTopicMetadata( - int numPartitions - ) { - this.numPartitions = numPartitions; - } - - /** - * @return The number of partitions present for the topic. - */ - public int numPartitions() { - return numPartitions; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - AssignmentTopicMetadata that = (AssignmentTopicMetadata) o; - return numPartitions == that.numPartitions; - } - - @Override - public int hashCode() { - return numPartitions; - } - - @Override - public String toString() { - return "AssignmentTopicMetadata(numPartitions=" + numPartitions + ')'; - } -} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java index 43410fc8aae..ddb5f9b2bc6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java @@ -26,18 +26,17 @@ import org.apache.kafka.common.annotation.InterfaceStability; */ @InterfaceStability.Unstable public interface PartitionAssignor { - /** * Unique name for this assignor. */ String name(); /** - * Perform the group assignment given the current members and - * topic metadata. + * Assigns partitions to group members based on the given assignment specification and topic metadata. * - * @param assignmentSpec The assignment spec. + * @param assignmentSpec The assignment spec which includes member metadata. + * @param subscribedTopicDescriber The topic and partition metadata describer. * @return The new assignment for the group. */ - GroupAssignment assign(AssignmentSpec assignmentSpec) throws PartitionAssignorException; + GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException; } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java index b5316ffbbdb..081211ca1c6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java @@ -33,8 +33,9 @@ import static java.lang.Math.min; * This Range Assignor inherits properties of both the range assignor and the sticky assignor. * The properties are as follows: *
    - *
  1. Each member must get at least one partition from every topic that it is subscribed to. The only exception is when - * the number of subscribed members is greater than the number of partitions for that topic. (Range)
  2. + *
  3. Each member must get at least one partition from every topic that it is subscribed to. + * The only exception is when the number of subscribed members is greater than the + * number of partitions for that topic. (Range)
  4. *
  5. Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. * Two streams are co-partitioned if the following conditions are met: @@ -76,25 +77,28 @@ public class RangeAssignor implements PartitionAssignor { } /** - * @return Map of topic ids to a list of members subscribed to them. + * Returns a map of topic Ids to a list of members subscribed to them, + * based on the given assignment specification and metadata. + * + * @param assignmentSpec The specification for member assignments. + * @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters. + * @return A map of topic Ids to a list of member Ids subscribed to them. + * + * @throws PartitionAssignorException If a member is subscribed to a non-existent topic. */ - private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { + private Map> membersPerTopic(final AssignmentSpec assignmentSpec, final SubscribedTopicDescriber subscribedTopicDescriber) { Map> membersPerTopic = new HashMap<>(); Map membersData = assignmentSpec.members(); membersData.forEach((memberId, memberMetadata) -> { Collection topics = memberMetadata.subscribedTopicIds(); for (Uuid topicId : topics) { - // Only topics that are present in both the subscribed topics list and the topic metadata should be - // considered for assignment. - if (assignmentSpec.topics().containsKey(topicId)) { - membersPerTopic - .computeIfAbsent(topicId, k -> new ArrayList<>()) - .add(memberId); - } else { - throw new PartitionAssignorException("Member " + memberId + " subscribed to topic " + - topicId + " which doesn't exist in the topic metadata"); + if (subscribedTopicDescriber.numPartitions(topicId) == -1) { + throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); } + membersPerTopic + .computeIfAbsent(topicId, k -> new ArrayList<>()) + .add(memberId); } }); @@ -118,14 +122,14 @@ public class RangeAssignor implements PartitionAssignor { *
*/ @Override - public GroupAssignment assign(final AssignmentSpec assignmentSpec) throws PartitionAssignorException { + public GroupAssignment assign(final AssignmentSpec assignmentSpec, final SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException { Map newAssignment = new HashMap<>(); // Step 1 - Map> membersPerTopic = membersPerTopic(assignmentSpec); + Map> membersPerTopic = membersPerTopic(assignmentSpec, subscribedTopicDescriber); membersPerTopic.forEach((topicId, membersForTopic) -> { - int numPartitionsForTopic = assignmentSpec.topics().get(topicId).numPartitions(); + int numPartitionsForTopic = subscribedTopicDescriber.numPartitions(topicId); int minRequiredQuota = numPartitionsForTopic / membersForTopic.size(); // Each member can get only ONE extra partition per topic after receiving the minimum quota. int numMembersWithExtraPartition = numPartitionsForTopic % membersForTopic.size(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SubscribedTopicDescriber.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SubscribedTopicDescriber.java new file mode 100644 index 00000000000..28586648cbf --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SubscribedTopicDescriber.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Set; + +/** + * The subscribed topic describer is used by the {@link PartitionAssignor} + * to obtain topic and partition metadata of the subscribed topics. + * + * The interface is kept in an internal module until KIP-848 is fully + * implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface SubscribedTopicDescriber { + /** + * The number of partitions for the given topic Id. + * + * @param topicId Uuid corresponding to the topic. + * @return The number of partitions corresponding to the given topic Id, + * or -1 if the topic Id does not exist. + */ + int numPartitions(Uuid topicId); + + /** + * Returns all the available racks associated with the replicas of the given partition. + * + * @param topicId Uuid corresponding to the partition's topic. + * @param partition Partition Id within topic. + * @return The set of racks corresponding to the replicas of the topic's partition. + * If the topic Id does not exist, an empty set is returned. + */ + Set racksForPartition(Uuid topicId, int partition); +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index 533c590c36d..82f3f19659c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.image.ClusterImage; import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicsImage; import org.apache.kafka.timeline.SnapshotRegistry; @@ -30,6 +31,7 @@ import org.apache.kafka.timeline.TimelineObject; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -404,8 +406,8 @@ public class ConsumerGroup implements Group { } /** - * @return An immutable Map containing the subscription metadata for all the topics whose - * members are subscribed to. + * @return An immutable Map of subscription metadata for + * each topic that the consumer group is subscribed to. */ public Map subscriptionMetadata() { return Collections.unmodifiableMap(subscribedTopicMetadata); @@ -427,16 +429,18 @@ public class ConsumerGroup implements Group { * Computes the subscription metadata based on the current subscription and * an updated member. * - * @param oldMember The old member. - * @param newMember The new member. - * @param topicsImage The topic metadata. + * @param oldMember The old member of the consumer group. + * @param newMember The updated member of the consumer group. + * @param topicsImage The current metadata for all available topics. + * @param clusterImage The current metadata for the Kafka cluster. * - * @return The new subscription metadata as an immutable Map. + * @return An immutable map of subscription metadata for each topic that the consumer group is subscribed to. */ public Map computeSubscriptionMetadata( ConsumerGroupMember oldMember, ConsumerGroupMember newMember, - TopicsImage topicsImage + TopicsImage topicsImage, + ClusterImage clusterImage ) { // Copy and update the current subscriptions. Map subscribedTopicNames = new HashMap<>(this.subscribedTopicNames); @@ -444,14 +448,30 @@ public class ConsumerGroup implements Group { // Create the topic metadata for each subscribed topic. Map newSubscriptionMetadata = new HashMap<>(subscribedTopicNames.size()); + subscribedTopicNames.forEach((topicName, count) -> { TopicImage topicImage = topicsImage.getTopic(topicName); if (topicImage != null) { + Map> partitionRacks = new HashMap<>(); + topicImage.partitions().forEach((partition, partitionRegistration) -> { + Set racks = new HashSet<>(); + for (int replica : partitionRegistration.replicas) { + Optional rackOptional = clusterImage.broker(replica).rack(); + // Only add the rack if it is available for the broker/replica. + rackOptional.ifPresent(racks::add); + } + // If rack information is unavailable for all replicas of this partition, + // no corresponding entry will be stored for it in the map. + if (!racks.isEmpty()) + partitionRacks.put(partition, racks); + }); + newSubscriptionMetadata.put(topicName, new TopicMetadata( topicImage.id(), topicImage.name(), - topicImage.partitions().size() - )); + topicImage.partitions().size(), + partitionRacks) + ); } }); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java new file mode 100644 index 00000000000..c18dd24d316 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * The subscribed topic metadata class is used by the {@link PartitionAssignor} to obtain + * topic and partition metadata for the topics that the consumer group is subscribed to. + */ +public class SubscribedTopicMetadata implements SubscribedTopicDescriber { + /** + * The topic Ids mapped to their corresponding {@link TopicMetadata} + * object, which contains topic and partition metadata. + */ + private final Map topicMetadata; + + public SubscribedTopicMetadata(Map topicMetadata) { + this.topicMetadata = Objects.requireNonNull(topicMetadata); + } + + /** + * Map of topic Ids to topic metadata. + * + * @return The map of topic Ids to topic metadata. + */ + public Map topicMetadata() { + return this.topicMetadata; + } + + /** + * The number of partitions for the given topic Id. + * + * @param topicId Uuid corresponding to the topic. + * @return The number of partitions corresponding to the given topic Id, + * or -1 if the topic Id does not exist. + */ + @Override + public int numPartitions(Uuid topicId) { + TopicMetadata topic = this.topicMetadata.get(topicId); + return topic == null ? -1 : topic.numPartitions(); + } + + /** + * Returns all the available racks associated with the replicas of the given partition. + * + * @param topicId Uuid corresponding to the partition's topic. + * @param partition Partition Id within the topic. + * @return The set of racks corresponding to the replicas of the topics partition. + * If the topic Id does not exist, an empty set is returned + */ + @Override + public Set racksForPartition(Uuid topicId, int partition) { + TopicMetadata topic = this.topicMetadata.get(topicId); + return topic == null ? Collections.emptySet() : topic.partitionRacks().get(partition); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SubscribedTopicMetadata that = (SubscribedTopicMetadata) o; + return topicMetadata.equals(that.topicMetadata); + } + + @Override + public int hashCode() { + return topicMetadata.hashCode(); + } + + @Override + public String toString() { + return "SubscribedTopicMetadata(" + + "topicMetadata=" + topicMetadata + + ')'; + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java index 02b120db1ef..2026efaa82c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.Record; import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; -import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; @@ -244,16 +243,19 @@ public class TargetAssignmentBuilder { }); // Prepare the topic metadata. - Map topics = new HashMap<>(); + Map topicMetadataMap = new HashMap<>(); subscriptionMetadata.forEach((topicName, topicMetadata) -> - topics.put(topicMetadata.id(), new AssignmentTopicMetadata(topicMetadata.numPartitions())) + topicMetadataMap.put( + topicMetadata.id(), + topicMetadata + ) ); // Compute the assignment. - GroupAssignment newGroupAssignment = assignor.assign(new AssignmentSpec( - Collections.unmodifiableMap(memberSpecs), - Collections.unmodifiableMap(topics) - )); + GroupAssignment newGroupAssignment = assignor.assign( + new AssignmentSpec(Collections.unmodifiableMap(memberSpecs)), + new SubscribedTopicMetadata(topicMetadataMap) + ); // Compute delta from previous to new target assignment and create the // relevant records. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java index 395d48d95df..f9b2d757671 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java @@ -19,7 +19,12 @@ package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Objects; +import java.util.Set; /** * Immutable topic metadata. @@ -40,10 +45,17 @@ public class TopicMetadata { */ private final int numPartitions; + /** + * Map of every partition Id to a set of its rack Ids, if they exist. + * If rack information is unavailable for all partitions, this is an empty map. + */ + private final Map> partitionRacks; + public TopicMetadata( Uuid id, String name, - int numPartitions + int numPartitions, + Map> partitionRacks ) { this.id = Objects.requireNonNull(id); if (Uuid.ZERO_UUID.equals(id)) { @@ -57,6 +69,7 @@ public class TopicMetadata { if (numPartitions < 0) { throw new IllegalArgumentException("Number of partitions cannot be negative."); } + this.partitionRacks = Objects.requireNonNull(partitionRacks); } /** @@ -80,6 +93,14 @@ public class TopicMetadata { return this.numPartitions; } + /** + * @return Every partition mapped to the set of corresponding available rack Ids of its replicas. + * An empty map is returned if rack information is unavailable for all partitions. + */ + public Map> partitionRacks() { + return this.partitionRacks; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -89,7 +110,8 @@ public class TopicMetadata { if (!id.equals(that.id)) return false; if (!name.equals(that.name)) return false; - return numPartitions == that.numPartitions; + if (numPartitions != that.numPartitions) return false; + return partitionRacks.equals(that.partitionRacks); } @Override @@ -97,6 +119,7 @@ public class TopicMetadata { int result = id.hashCode(); result = 31 * result + name.hashCode(); result = 31 * result + numPartitions; + result = 31 * result + partitionRacks.hashCode(); return result; } @@ -106,16 +129,26 @@ public class TopicMetadata { "id=" + id + ", name=" + name + ", numPartitions=" + numPartitions + + ", partitionRacks=" + partitionRacks + ')'; } public static TopicMetadata fromRecord( ConsumerGroupPartitionMetadataValue.TopicMetadata record ) { + // Converting the data type from a list stored in the record to a map for the topic metadata. + Map> partitionRacks = new HashMap<>(); + for (ConsumerGroupPartitionMetadataValue.PartitionMetadata partitionMetadata : record.partitionMetadata()) { + partitionRacks.put( + partitionMetadata.partition(), + Collections.unmodifiableSet(new HashSet<>(partitionMetadata.racks())) + ); + } + return new TopicMetadata( record.topicId(), record.topicName(), - record.numPartitions() - ); + record.numPartitions(), + partitionRacks); } } diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json index 81b5f5225e7..19de783b49b 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json @@ -29,7 +29,14 @@ { "name": "TopicName", "versions": "0+", "type": "string", "about": "The topic name." }, { "name": "NumPartitions", "versions": "0+", "type": "int32", - "about": "The number of partitions of the topic." } + "about": "The number of partitions of the topic." }, + { "name": "PartitionMetadata", "versions": "0+", "type": "[]PartitionMetadata", + "about": "Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [ + { "name": "Partition", "versions": "0+", "type": "int32", + "about": "The partition number." }, + { "name": "Racks", "versions": "0+", "type": "[]string", + "about": "The set of racks that the partition is mapped to." } + ]} ]} ] } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 8f6ae07b37f..dd00d1f7ae8 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.network.ClientInformation; @@ -62,6 +63,7 @@ import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; import org.apache.kafka.coordinator.group.consumer.Assignment; import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; @@ -95,7 +97,6 @@ import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.opentest4j.AssertionFailedError; import java.net.InetAddress; import java.util.ArrayList; @@ -126,6 +127,9 @@ import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGr import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT; import static org.apache.kafka.coordinator.group.GroupMetadataManager.genericGroupHeartbeatKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.genericGroupSyncKey; +import static org.apache.kafka.coordinator.group.RecordHelpersTest.assertRecordEquals; +import static org.apache.kafka.coordinator.group.RecordHelpersTest.assertRecordsEquals; +import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; @@ -163,7 +167,7 @@ public class GroupMetadataManagerTest { } @Override - public GroupAssignment assign(AssignmentSpec assignmentSpec) throws PartitionAssignorException { + public GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException { return prepareGroupAssignment; } } @@ -176,11 +180,28 @@ public class GroupMetadataManagerTest { String topicName, int numPartitions ) { + // For testing purposes, the following criteria are used: + // - Number of replicas for each partition: 2 + // - Number of brokers available in the cluster: 4 delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName)); for (int i = 0; i < numPartitions; i++) { delta.replay(new PartitionRecord() .setTopicId(topicId) - .setPartitionId(i)); + .setPartitionId(i) + .setReplicas(Arrays.asList(i % 4, (i + 1) % 4))); + } + return this; + } + + /** + * Add rack Ids for 4 broker Ids. + * + * For testing purposes, each broker is mapped + * to a rack Id with the same broker Id as a suffix. + */ + public MetadataImageBuilder addRacks() { + for (int i = 0; i < 4; i++) { + delta.replay(new RegisterBrokerRecord().setBrokerId(i).setRack("rack" + i)); } return this; } @@ -242,8 +263,8 @@ public class GroupMetadataManagerTest { subscriptionMetadata.put(topicName, new TopicMetadata( topicImage.id(), topicImage.name(), - topicImage.partitions().size() - )); + topicImage.partitions().size(), + Collections.emptyMap())); } }); }); @@ -1462,6 +1483,7 @@ public class GroupMetadataManagerTest { .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) + .addRacks() .build()) .build(); @@ -1519,8 +1541,8 @@ public class GroupMetadataManagerTest { List expectedRecords = Arrays.asList( RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() {{ - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); }}), RecordHelpers.newGroupEpochRecord(groupId, 1), RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment( @@ -1551,6 +1573,7 @@ public class GroupMetadataManagerTest { .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) + .addRacks() .build()) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) @@ -1559,7 +1582,7 @@ public class GroupMetadataManagerTest { .setTargetMemberEpoch(10) .setClientId("client") .setClientHost("localhost/127.0.0.1") - .setSubscribedTopicNames(Arrays.asList("foo")) + .setSubscribedTopicNames(Collections.singletonList("foo")) .setServerAssignorName("range") .setAssignedPartitions(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) @@ -1617,8 +1640,8 @@ public class GroupMetadataManagerTest { RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); } }), RecordHelpers.newGroupEpochRecord(groupId, 11), @@ -1652,6 +1675,7 @@ public class GroupMetadataManagerTest { .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) + .addRacks() .build()) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) @@ -1731,7 +1755,7 @@ public class GroupMetadataManagerTest { .setPartitions(Arrays.asList(4, 5)), new ConsumerGroupHeartbeatResponseData.TopicPartitions() .setTopicId(barTopicId) - .setPartitions(Arrays.asList(2)) + .setPartitions(Collections.singletonList(2)) ))), result.response() ); @@ -1752,6 +1776,12 @@ public class GroupMetadataManagerTest { List expectedRecords = Arrays.asList( RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember3), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + } + }), RecordHelpers.newGroupEpochRecord(groupId, 11), RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1), @@ -1769,9 +1799,9 @@ public class GroupMetadataManagerTest { RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3) ); - assertRecordsEquals(expectedRecords.subList(0, 2), result.records().subList(0, 2)); - assertUnorderedListEquals(expectedRecords.subList(2, 5), result.records().subList(2, 5)); - assertRecordsEquals(expectedRecords.subList(5, 7), result.records().subList(5, 7)); + assertRecordsEquals(expectedRecords.subList(0, 3), result.records().subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 6), result.records().subList(3, 6)); + assertRecordsEquals(expectedRecords.subList(6, 8), result.records().subList(6, 8)); } @Test @@ -1797,6 +1827,7 @@ public class GroupMetadataManagerTest { .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() .build()) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) @@ -1857,8 +1888,8 @@ public class GroupMetadataManagerTest { // Subscription metadata is recomputed because zar is no longer there. RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); } }), RecordHelpers.newGroupEpochRecord(groupId, 11) @@ -1887,6 +1918,7 @@ public class GroupMetadataManagerTest { .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) + .addRacks() .build()) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) @@ -2332,6 +2364,7 @@ public class GroupMetadataManagerTest { .withAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() .build()) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) @@ -2692,13 +2725,14 @@ public class GroupMetadataManagerTest { PartitionAssignor assignor = mock(PartitionAssignor.class); when(assignor.name()).thenReturn("range"); - when(assignor.assign(any())).thenThrow(new PartitionAssignorException("Assignment failed.")); + when(assignor.assign(any(), any())).thenThrow(new PartitionAssignorException("Assignment failed.")); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) + .addRacks() .build()) .build(); @@ -2732,6 +2766,7 @@ public class GroupMetadataManagerTest { .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() .build()) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) @@ -2753,7 +2788,7 @@ public class GroupMetadataManagerTest { { // foo only has 3 partitions stored in the metadata but foo has // 6 partitions the metadata image. - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3, mkMapOfPartitionRacks(3))); } })) .build(); @@ -2807,7 +2842,7 @@ public class GroupMetadataManagerTest { List expectedRecords = Arrays.asList( RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); } }), RecordHelpers.newGroupEpochRecord(groupId, 11), @@ -2842,6 +2877,7 @@ public class GroupMetadataManagerTest { .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() .build()) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) @@ -2863,7 +2899,7 @@ public class GroupMetadataManagerTest { { // foo only has 3 partitions stored in the metadata but foo has // 6 partitions the metadata image. - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3, mkMapOfPartitionRacks(3))); } })) .build(); @@ -2935,7 +2971,7 @@ public class GroupMetadataManagerTest { List expectedRecords = Arrays.asList( RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); } }), RecordHelpers.newGroupEpochRecord(groupId, 11), @@ -3172,6 +3208,7 @@ public class GroupMetadataManagerTest { .withAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() .build()) .build(); @@ -3246,6 +3283,7 @@ public class GroupMetadataManagerTest { .withAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() .build()) .build(); @@ -3311,6 +3349,7 @@ public class GroupMetadataManagerTest { .withAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 3) + .addRacks() .build()) .build(); @@ -3550,6 +3589,7 @@ public class GroupMetadataManagerTest { .withAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 3) + .addRacks() .build()) .build(); @@ -5737,7 +5777,7 @@ public class GroupMetadataManagerTest { assertEquals(Errors.LEADER_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE); } - private void assertUnorderedListEquals( + public static void assertUnorderedListEquals( List expected, List actual ) { @@ -5797,86 +5837,6 @@ public class GroupMetadataManagerTest { return assignmentMap; } - private void assertRecordsEquals( - List expectedRecords, - List actualRecords - ) { - try { - assertEquals(expectedRecords.size(), actualRecords.size()); - - for (int i = 0; i < expectedRecords.size(); i++) { - Record expectedRecord = expectedRecords.get(i); - Record actualRecord = actualRecords.get(i); - assertRecordEquals(expectedRecord, actualRecord); - } - } catch (AssertionFailedError e) { - assertionFailure() - .expected(expectedRecords) - .actual(actualRecords) - .buildAndThrow(); - } - } - - private void assertRecordEquals( - Record expected, - Record actual - ) { - try { - assertApiMessageAndVersionEquals(expected.key(), actual.key()); - assertApiMessageAndVersionEquals(expected.value(), actual.value()); - } catch (AssertionFailedError e) { - assertionFailure() - .expected(expected) - .actual(actual) - .buildAndThrow(); - } - } - - private void assertApiMessageAndVersionEquals( - ApiMessageAndVersion expected, - ApiMessageAndVersion actual - ) { - if (expected == actual) return; - - assertEquals(expected.version(), actual.version()); - - if (actual.message() instanceof ConsumerGroupCurrentMemberAssignmentValue) { - // The order of the topics stored in ConsumerGroupCurrentMemberAssignmentValue is not - // always guaranteed. Therefore, we need a special comparator. - ConsumerGroupCurrentMemberAssignmentValue expectedValue = - (ConsumerGroupCurrentMemberAssignmentValue) expected.message(); - ConsumerGroupCurrentMemberAssignmentValue actualValue = - (ConsumerGroupCurrentMemberAssignmentValue) actual.message(); - - assertEquals(expectedValue.memberEpoch(), actualValue.memberEpoch()); - assertEquals(expectedValue.previousMemberEpoch(), actualValue.previousMemberEpoch()); - assertEquals(expectedValue.targetMemberEpoch(), actualValue.targetMemberEpoch()); - assertEquals(expectedValue.error(), actualValue.error()); - assertEquals(expectedValue.metadataVersion(), actualValue.metadataVersion()); - assertEquals(expectedValue.metadataBytes(), actualValue.metadataBytes()); - - // We transform those to Maps before comparing them. - assertEquals(fromTopicPartitions(expectedValue.assignedPartitions()), - fromTopicPartitions(actualValue.assignedPartitions())); - assertEquals(fromTopicPartitions(expectedValue.partitionsPendingRevocation()), - fromTopicPartitions(actualValue.partitionsPendingRevocation())); - assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()), - fromTopicPartitions(actualValue.partitionsPendingAssignment())); - } else { - assertEquals(expected.message(), actual.message()); - } - } - - private Map> fromTopicPartitions( - List assignment - ) { - Map> assignmentMap = new HashMap<>(); - assignment.forEach(topicPartitions -> { - assignmentMap.put(topicPartitions.topicId(), new HashSet<>(topicPartitions.partitions())); - }); - return assignmentMap; - } - private List verifyGenericGroupJoinResponses( List> responseFutures, int expectedSuccessCount, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java index bdea8b79529..03b11cbed6f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java @@ -52,6 +52,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; +import org.opentest4j.AssertionFailedError; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -59,6 +60,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; + import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -71,6 +74,7 @@ import java.util.stream.Stream; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedTopicAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.GroupMetadataManagerTest.assertUnorderedListEquals; import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; @@ -83,8 +87,10 @@ import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignme import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochTombstoneRecord; import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; public class RecordHelpersTest { @@ -161,15 +167,18 @@ public class RecordHelpersTest { Uuid fooTopicId = Uuid.randomUuid(); Uuid barTopicId = Uuid.randomUuid(); Map subscriptionMetadata = new LinkedHashMap<>(); + subscriptionMetadata.put("foo", new TopicMetadata( fooTopicId, "foo", - 10 + 10, + mkMapOfPartitionRacks(10) )); subscriptionMetadata.put("bar", new TopicMetadata( barTopicId, "bar", - 20 + 20, + mkMapOfPartitionRacks(20) )); Record expectedRecord = new Record( @@ -184,14 +193,16 @@ public class RecordHelpersTest { new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(fooTopicId) .setTopicName("foo") - .setNumPartitions(10), + .setNumPartitions(10) + .setPartitionMetadata(mkListOfPartitionRacks(10)), new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(barTopicId) .setTopicName("bar") - .setNumPartitions(20))), + .setNumPartitions(20) + .setPartitionMetadata(mkListOfPartitionRacks(20)))), (short) 0)); - assertEquals(expectedRecord, newGroupSubscriptionMetadataRecord( + assertRecordEquals(expectedRecord, newGroupSubscriptionMetadataRecord( "group-id", subscriptionMetadata )); @@ -212,6 +223,52 @@ public class RecordHelpersTest { )); } + @Test + public void testEmptyPartitionMetadataWhenRacksUnavailableGroupSubscriptionMetadataRecord() { + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + Map subscriptionMetadata = new LinkedHashMap<>(); + + subscriptionMetadata.put("foo", new TopicMetadata( + fooTopicId, + "foo", + 10, + Collections.emptyMap() + )); + subscriptionMetadata.put("bar", new TopicMetadata( + barTopicId, + "bar", + 20, + Collections.emptyMap() + )); + + Record expectedRecord = new Record( + new ApiMessageAndVersion( + new ConsumerGroupPartitionMetadataKey() + .setGroupId("group-id"), + (short) 4 + ), + new ApiMessageAndVersion( + new ConsumerGroupPartitionMetadataValue() + .setTopics(Arrays.asList( + new ConsumerGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(fooTopicId) + .setTopicName("foo") + .setNumPartitions(10) + .setPartitionMetadata(Collections.emptyList()), + new ConsumerGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(barTopicId) + .setTopicName("bar") + .setNumPartitions(20) + .setPartitionMetadata(Collections.emptyList()))), + (short) 0)); + + assertRecordEquals(expectedRecord, newGroupSubscriptionMetadataRecord( + "group-id", + subscriptionMetadata + )); + } + @Test public void testNewGroupEpochRecord() { Record expectedRecord = new Record( @@ -620,7 +677,7 @@ public class RecordHelpersTest { MetadataVersion.IBP_3_5_IV2 )); } - + @ParameterizedTest @MethodSource("metadataToExpectedGroupMetadataValue") public void testEmptyGroupMetadataRecord( @@ -764,4 +821,182 @@ public class RecordHelpersTest { Record record = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1); assertEquals(expectedRecord, record); } + + /** + * Creates a list of values to be added to the record and assigns partitions to racks for testing. + * + * @param numPartitions The number of partitions for the topic. + * + * For testing purposes, the following criteria are used: + * - Number of replicas for each partition: 2 + * - Number of racks available to the cluster: 4 + */ + public static List mkListOfPartitionRacks(int numPartitions) { + List partitionRacks = new ArrayList<>(numPartitions); + for (int i = 0; i < numPartitions; i++) { + List racks = new ArrayList<>(Arrays.asList("rack" + i % 4, "rack" + (i + 1) % 4)); + partitionRacks.add( + new ConsumerGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(i) + .setRacks(racks) + ); + } + return partitionRacks; + } + + /** + * Creates a map of partitions to racks for testing. + * + * @param numPartitions The number of partitions for the topic. + * + * For testing purposes, the following criteria are used: + * - Number of replicas for each partition: 2 + * - Number of racks available to the cluster: 4 + */ + public static Map> mkMapOfPartitionRacks(int numPartitions) { + Map> partitionRacks = new HashMap<>(numPartitions); + for (int i = 0; i < numPartitions; i++) { + partitionRacks.put(i, new HashSet<>(Arrays.asList("rack" + i % 4, "rack" + (i + 1) % 4))); + } + return partitionRacks; + } + + /** + * Asserts whether the two provided lists of records are equal. + * + * @param expectedRecords The expected list of records. + * @param actualRecords The actual list of records. + */ + public static void assertRecordsEquals( + List expectedRecords, + List actualRecords + ) { + try { + assertEquals(expectedRecords.size(), actualRecords.size()); + + for (int i = 0; i < expectedRecords.size(); i++) { + Record expectedRecord = expectedRecords.get(i); + Record actualRecord = actualRecords.get(i); + assertRecordEquals(expectedRecord, actualRecord); + } + } catch (AssertionFailedError e) { + assertionFailure() + .expected(expectedRecords) + .actual(actualRecords) + .buildAndThrow(); + } + } + + /** + * Asserts if the two provided records are equal. + * + * @param expectedRecord The expected record. + * @param actualRecord The actual record. + */ + public static void assertRecordEquals( + Record expectedRecord, + Record actualRecord + ) { + try { + assertApiMessageAndVersionEquals(expectedRecord.key(), actualRecord.key()); + assertApiMessageAndVersionEquals(expectedRecord.value(), actualRecord.value()); + } catch (AssertionFailedError e) { + assertionFailure() + .expected(expectedRecord) + .actual(actualRecord) + .buildAndThrow(); + } + } + + private static void assertApiMessageAndVersionEquals( + ApiMessageAndVersion expected, + ApiMessageAndVersion actual + ) { + if (expected == actual) return; + + assertEquals(expected.version(), actual.version()); + + if (actual.message() instanceof ConsumerGroupCurrentMemberAssignmentValue) { + // The order of the topics stored in ConsumerGroupCurrentMemberAssignmentValue is not + // always guaranteed. Therefore, we need a special comparator. + ConsumerGroupCurrentMemberAssignmentValue expectedValue = + (ConsumerGroupCurrentMemberAssignmentValue) expected.message(); + ConsumerGroupCurrentMemberAssignmentValue actualValue = + (ConsumerGroupCurrentMemberAssignmentValue) actual.message(); + + assertEquals(expectedValue.memberEpoch(), actualValue.memberEpoch()); + assertEquals(expectedValue.previousMemberEpoch(), actualValue.previousMemberEpoch()); + assertEquals(expectedValue.targetMemberEpoch(), actualValue.targetMemberEpoch()); + assertEquals(expectedValue.error(), actualValue.error()); + assertEquals(expectedValue.metadataVersion(), actualValue.metadataVersion()); + assertEquals(expectedValue.metadataBytes(), actualValue.metadataBytes()); + + // We transform those to Maps before comparing them. + assertEquals(fromTopicPartitions(expectedValue.assignedPartitions()), + fromTopicPartitions(actualValue.assignedPartitions())); + assertEquals(fromTopicPartitions(expectedValue.partitionsPendingRevocation()), + fromTopicPartitions(actualValue.partitionsPendingRevocation())); + assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()), + fromTopicPartitions(actualValue.partitionsPendingAssignment())); + } else if (actual.message() instanceof ConsumerGroupPartitionMetadataValue) { + // The order of the racks stored in the PartitionMetadata of the ConsumerGroupPartitionMetadataValue + // is not always guaranteed. Therefore, we need a special comparator. + ConsumerGroupPartitionMetadataValue expectedValue = + (ConsumerGroupPartitionMetadataValue) expected.message(); + ConsumerGroupPartitionMetadataValue actualValue = + (ConsumerGroupPartitionMetadataValue) actual.message(); + + List expectedTopicMetadataList = + expectedValue.topics(); + List actualTopicMetadataList = + actualValue.topics(); + + if (expectedTopicMetadataList.size() != actualTopicMetadataList.size()) { + fail("Topic metadata lists have different sizes"); + } + + for (int i = 0; i < expectedTopicMetadataList.size(); i++) { + ConsumerGroupPartitionMetadataValue.TopicMetadata expectedTopicMetadata = + expectedTopicMetadataList.get(i); + ConsumerGroupPartitionMetadataValue.TopicMetadata actualTopicMetadata = + actualTopicMetadataList.get(i); + + assertEquals(expectedTopicMetadata.topicId(), actualTopicMetadata.topicId()); + assertEquals(expectedTopicMetadata.topicName(), actualTopicMetadata.topicName()); + assertEquals(expectedTopicMetadata.numPartitions(), actualTopicMetadata.numPartitions()); + + List expectedPartitionMetadataList = + expectedTopicMetadata.partitionMetadata(); + List actualPartitionMetadataList = + actualTopicMetadata.partitionMetadata(); + + // If the list is empty, rack information wasn't available for any replica of + // the partition and hence, the entry wasn't added to the record. + if (expectedPartitionMetadataList.size() != actualPartitionMetadataList.size()) { + fail("Partition metadata lists have different sizes"); + } else if (!expectedPartitionMetadataList.isEmpty() && !actualPartitionMetadataList.isEmpty()) { + for (int j = 0; j < expectedTopicMetadataList.size(); j++) { + ConsumerGroupPartitionMetadataValue.PartitionMetadata expectedPartitionMetadata = + expectedPartitionMetadataList.get(j); + ConsumerGroupPartitionMetadataValue.PartitionMetadata actualPartitionMetadata = + actualPartitionMetadataList.get(j); + + assertEquals(expectedPartitionMetadata.partition(), actualPartitionMetadata.partition()); + assertUnorderedListEquals(expectedPartitionMetadata.racks(), actualPartitionMetadata.racks()); + } + } + } + } else { + assertEquals(expected.message(), actual.message()); + } + } + + private static Map> fromTopicPartitions( + List assignment + ) { + Map> assignmentMap = new HashMap<>(); + assignment.forEach(topicPartitions -> + assignmentMap.put(topicPartitions.topicId(), new HashSet<>(topicPartitions.partitions()))); + return assignmentMap; + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java index 91f6385f104..eb58c653b13 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java @@ -18,6 +18,8 @@ package org.apache.kafka.coordinator.group.assignor; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; import org.junit.jupiter.api.Test; import java.util.Arrays; @@ -37,15 +39,29 @@ import static org.junit.jupiter.api.Assertions.assertThrows; public class RangeAssignorTest { private final RangeAssignor assignor = new RangeAssignor(); private final Uuid topic1Uuid = Uuid.randomUuid(); + private final String topic1Name = "topic1"; private final Uuid topic2Uuid = Uuid.randomUuid(); + private final String topic2Name = "topic2"; private final Uuid topic3Uuid = Uuid.randomUuid(); + private final String topic3Name = "topic3"; private final String consumerA = "A"; private final String consumerB = "B"; private final String consumerC = "C"; @Test public void testOneConsumerNoTopic() { - Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata( + Collections.singletonMap( + topic1Uuid, + new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + createPartitionRacks(3) + ) + ) + ); + Map members = Collections.singletonMap( consumerA, new AssignmentMemberSpec( @@ -56,15 +72,26 @@ public class RangeAssignorTest { ) ); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); assertEquals(Collections.emptyMap(), groupAssignment.members()); } @Test public void testOneConsumerSubscribedToNonExistentTopic() { - Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata( + Collections.singletonMap( + topic1Uuid, + new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + createPartitionRacks(3) + ) + ) + ); + Map members = Collections.singletonMap( consumerA, new AssignmentMemberSpec( @@ -75,17 +102,29 @@ public class RangeAssignorTest { ) ); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); assertThrows(PartitionAssignorException.class, - () -> assignor.assign(assignmentSpec)); + () -> assignor.assign(assignmentSpec, subscribedTopicMetadata)); } @Test public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { - Map topics = new HashMap<>(); - topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); - topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + createPartitionRacks(3) + )); + topicMetadata.put(topic3Uuid, new TopicMetadata( + topic3Uuid, + topic3Name, + 2, + createPartitionRacks(2) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); @@ -103,8 +142,8 @@ public class RangeAssignorTest { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); @@ -123,10 +162,27 @@ public class RangeAssignorTest { @Test public void testFirstAssignmentThreeConsumersThreeTopicsDifferentSubscriptions() { - Map topics = new HashMap<>(); - topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); - topics.put(topic2Uuid, new AssignmentTopicMetadata(3)); - topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + createPartitionRacks(3) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 3, + createPartitionRacks(3) + )); + topicMetadata.put(topic3Uuid, new TopicMetadata( + topic3Uuid, + topic3Name, + 2, + createPartitionRacks(2) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); @@ -151,8 +207,8 @@ public class RangeAssignorTest { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); @@ -175,9 +231,21 @@ public class RangeAssignorTest { @Test public void testFirstAssignmentNumConsumersGreaterThanNumPartitions() { - Map topics = new HashMap<>(); - topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); - topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + createPartitionRacks(3) + )); + topicMetadata.put(topic3Uuid, new TopicMetadata( + topic3Uuid, + topic3Name, + 2, + createPartitionRacks(2) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); @@ -202,8 +270,8 @@ public class RangeAssignorTest { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); // Topic 3 has 2 partitions but three consumers subscribed to it - one of them will not get a partition. @@ -226,9 +294,21 @@ public class RangeAssignorTest { @Test public void testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerAdded() { - Map topics = new HashMap<>(); - topics.put(topic1Uuid, new AssignmentTopicMetadata(2)); - topics.put(topic2Uuid, new AssignmentTopicMetadata(2)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 2, + createPartitionRacks(2) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 2, + createPartitionRacks(2) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); @@ -264,8 +344,8 @@ public class RangeAssignorTest { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); @@ -287,9 +367,21 @@ public class RangeAssignorTest { @Test public void testReassignmentWhenOnePartitionAddedForTwoConsumersTwoTopics() { // Simulating adding a partition - originally T1 -> 3 Partitions and T2 -> 3 Partitions - Map topics = new HashMap<>(); - topics.put(topic1Uuid, new AssignmentTopicMetadata(4)); - topics.put(topic2Uuid, new AssignmentTopicMetadata(4)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 4, + createPartitionRacks(4) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 4, + createPartitionRacks(4) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); @@ -317,8 +409,8 @@ public class RangeAssignorTest { currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); @@ -337,9 +429,21 @@ public class RangeAssignorTest { @Test public void testReassignmentWhenOneConsumerAddedAfterInitialAssignmentWithTwoConsumersTwoTopics() { - Map topics = new HashMap<>(); - topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); - topics.put(topic2Uuid, new AssignmentTopicMetadata(3)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + createPartitionRacks(3) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 3, + createPartitionRacks(3) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); @@ -375,8 +479,8 @@ public class RangeAssignorTest { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); @@ -400,10 +504,22 @@ public class RangeAssignorTest { @Test public void testReassignmentWhenOneConsumerAddedAndOnePartitionAfterInitialAssignmentWithTwoConsumersTwoTopics() { - Map topics = new HashMap<>(); // Add a new partition to topic 1, initially T1 -> 3 partitions - topics.put(topic1Uuid, new AssignmentTopicMetadata(4)); - topics.put(topic2Uuid, new AssignmentTopicMetadata(3)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 4, + createPartitionRacks(4) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 3, + createPartitionRacks(3) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); @@ -439,8 +555,8 @@ public class RangeAssignorTest { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); @@ -463,9 +579,21 @@ public class RangeAssignorTest { @Test public void testReassignmentWhenOneConsumerRemovedAfterInitialAssignmentWithTwoConsumersTwoTopics() { - Map topics = new HashMap<>(); - topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); - topics.put(topic2Uuid, new AssignmentTopicMetadata(3)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + createPartitionRacks(3) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 3, + createPartitionRacks(3) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); // Consumer A was removed @@ -482,8 +610,8 @@ public class RangeAssignorTest { currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); @@ -497,10 +625,27 @@ public class RangeAssignorTest { @Test public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignmentWithThreeConsumersTwoTopics() { - Map topics = new HashMap<>(); - topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); - topics.put(topic2Uuid, new AssignmentTopicMetadata(3)); - topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + createPartitionRacks(3) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 3, + createPartitionRacks(3) + )); + topicMetadata.put(topic3Uuid, new TopicMetadata( + topic3Uuid, + topic3Name, + 2, + createPartitionRacks(2) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); @@ -542,8 +687,8 @@ public class RangeAssignorTest { currentAssignmentForC )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); @@ -571,4 +716,14 @@ public class RangeAssignorTest { assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember); } } + + // When rack awareness is enabled for this assignor, rack information can be updated in this method. + private static Map> createPartitionRacks(int numPartitions) { + Map> partitionRacks = new HashMap<>(numPartitions); + Set emptySet = Collections.emptySet(); + for (int i = 0; i < numPartitions; i++) { + partitionRacks.put(i, emptySet); + } + return partitionRacks; + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java index 7b4923c4b17..a715d4187bb 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java @@ -34,6 +34,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -398,16 +399,17 @@ public class ConsumerGroupTest { .addTopic(fooTopicId, "foo", 1) .addTopic(barTopicId, "bar", 2) .addTopic(zarTopicId, "zar", 3) + .addRacks() .build(); ConsumerGroupMember member1 = new ConsumerGroupMember.Builder("member1") - .setSubscribedTopicNames(Arrays.asList("foo")) + .setSubscribedTopicNames(Collections.singletonList("foo")) .build(); ConsumerGroupMember member2 = new ConsumerGroupMember.Builder("member2") - .setSubscribedTopicNames(Arrays.asList("bar")) + .setSubscribedTopicNames(Collections.singletonList("bar")) .build(); ConsumerGroupMember member3 = new ConsumerGroupMember.Builder("member3") - .setSubscribedTopicNames(Arrays.asList("zar")) + .setSubscribedTopicNames(Collections.singletonList("zar")) .build(); ConsumerGroup consumerGroup = createConsumerGroup("group-foo"); @@ -418,19 +420,21 @@ public class ConsumerGroupTest { consumerGroup.computeSubscriptionMetadata( null, null, - image.topics() + image.topics(), + image.cluster() ) ); // Compute while taking into account member 1. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) ), consumerGroup.computeSubscriptionMetadata( null, member1, - image.topics() + image.topics(), + image.cluster() ) ); @@ -440,12 +444,13 @@ public class ConsumerGroupTest { // It should return foo now. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) ), consumerGroup.computeSubscriptionMetadata( null, null, - image.topics() + image.topics(), + image.cluster() ) ); @@ -455,20 +460,22 @@ public class ConsumerGroupTest { consumerGroup.computeSubscriptionMetadata( member1, null, - image.topics() + image.topics(), + image.cluster() ) ); // Compute while taking into account member 2. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) ), consumerGroup.computeSubscriptionMetadata( null, member2, - image.topics() + image.topics(), + image.cluster() ) ); @@ -478,51 +485,55 @@ public class ConsumerGroupTest { // It should return foo and bar. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) ), consumerGroup.computeSubscriptionMetadata( null, null, - image.topics() + image.topics(), + image.cluster() ) ); // Compute while taking into account removal of member 2. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) ), consumerGroup.computeSubscriptionMetadata( member2, null, - image.topics() + image.topics(), + image.cluster() ) ); // Removing member1 results in returning bar. assertEquals( mkMap( - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)) + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) ), consumerGroup.computeSubscriptionMetadata( member1, null, - image.topics() + image.topics(), + image.cluster() ) ); // Compute while taking into account member 3. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)), - mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3)) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))) ), consumerGroup.computeSubscriptionMetadata( null, member3, - image.topics() + image.topics(), + image.cluster() ) ); @@ -532,14 +543,15 @@ public class ConsumerGroupTest { // It should return foo, bar and zar. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)), - mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3)) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))) ), consumerGroup.computeSubscriptionMetadata( null, null, - image.topics() + image.topics(), + image.cluster() ) ); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadataTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadataTest.java new file mode 100644 index 00000000000..632b701f44a --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadataTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class SubscribedTopicMetadataTest { + + @Test + public void testAttribute() { + Map topicMetadataMap = new HashMap<>(); + for (int i = 0; i < 5; i++) { + Uuid topicId = Uuid.randomUuid(); + String topicName = "topic" + i; + Map> partitionRacks = mkMapOfPartitionRacks(5); + topicMetadataMap.put( + topicId, + new TopicMetadata(topicId, topicName, 5, partitionRacks) + ); + } + assertEquals(topicMetadataMap, new SubscribedTopicMetadata(topicMetadataMap).topicMetadata()); + } + + @Test + public void testTopicMetadataCannotBeNull() { + assertThrows(NullPointerException.class, () -> new SubscribedTopicMetadata(null)); + } + + @Test + public void testEquals() { + Map topicMetadataMap = new HashMap<>(); + for (int i = 0; i < 5; i++) { + Uuid topicId = Uuid.randomUuid(); + String topicName = "topic" + i; + Map> partitionRacks = mkMapOfPartitionRacks(5); + topicMetadataMap.put( + topicId, + new TopicMetadata(topicId, topicName, 5, partitionRacks) + ); + } + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadataMap); + assertEquals(new SubscribedTopicMetadata(topicMetadataMap), subscribedTopicMetadata); + + Map topicMetadataMap2 = new HashMap<>(); + Uuid topicId = Uuid.randomUuid(); + topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic", 5, Collections.emptyMap())); + assertNotEquals(new SubscribedTopicMetadata(topicMetadataMap2), subscribedTopicMetadata); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java index 7733f67b128..810b7617a23 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; -import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; @@ -38,6 +37,7 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; import static org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createAssignmentMemberSpec; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; @@ -84,13 +84,15 @@ public class TargetAssignmentBuilderTest { public Uuid addTopicMetadata( String topicName, - int numPartitions + int numPartitions, + Map> partitionRacks ) { Uuid topicId = Uuid.randomUuid(); subscriptionMetadata.put(topicName, new TopicMetadata( topicId, topicName, - numPartitions + numPartitions, + partitionRacks )); return topicId; } @@ -145,13 +147,13 @@ public class TargetAssignmentBuilderTest { Map memberSpecs = new HashMap<>(); // All the existing members are prepared. - members.forEach((memberId, member) -> { + members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec( member, targetAssignment.getOrDefault(memberId, Assignment.EMPTY), subscriptionMetadata - )); - }); + ) + )); // All the updated are added and all the deleted // members are removed. @@ -168,20 +170,21 @@ public class TargetAssignmentBuilderTest { }); // Prepare the expected topic metadata. - Map topicMetadata = new HashMap<>(); - subscriptionMetadata.forEach((topicName, metadata) -> { - topicMetadata.put(metadata.id(), new AssignmentTopicMetadata(metadata.numPartitions())); - }); + Map topicMetadataMap = new HashMap<>(); + subscriptionMetadata.forEach((topicName, topicMetadata) -> + topicMetadataMap.put(topicMetadata.id(), topicMetadata)); + + // Prepare the expected subscription topic metadata. + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadataMap); // Prepare the expected assignment spec. - AssignmentSpec assignmentSpec = new AssignmentSpec( - memberSpecs, - topicMetadata - ); + AssignmentSpec assignmentSpec = new AssignmentSpec(memberSpecs); // We use `any` here to always return an assignment but use `verify` later on // to ensure that the input was correct. - when(assignor.assign(any())).thenReturn(new GroupAssignment(memberAssignments)); + when(assignor.assign(any(), any())) + .thenReturn(new GroupAssignment(memberAssignments)); + // Create and populate the assignment builder. TargetAssignmentBuilder builder = new TargetAssignmentBuilder(groupId, groupEpoch, assignor) @@ -203,7 +206,8 @@ public class TargetAssignmentBuilderTest { // Verify that the assignor was called once with the expected // assignment spec. - verify(assignor, times(1)).assign(assignmentSpec); + verify(assignor, times(1)) + .assign(assignmentSpec, subscribedTopicMetadata); return result; } @@ -222,8 +226,8 @@ public class TargetAssignmentBuilderTest { Map subscriptionMetadata = new HashMap() { { - put("foo", new TopicMetadata(fooTopicId, "foo", 5)); - put("bar", new TopicMetadata(barTopicId, "bar", 5)); + put("foo", new TopicMetadata(fooTopicId, "foo", 5, Collections.emptyMap())); + put("bar", new TopicMetadata(barTopicId, "bar", 5, Collections.emptyMap())); } }; @@ -268,8 +272,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6); - Uuid barTopicId = context.addTopicMetadata("bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); + Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), @@ -318,8 +322,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6); - Uuid barTopicId = context.addTopicMetadata("bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); + Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), @@ -381,8 +385,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6); - Uuid barTopicId = context.addTopicMetadata("bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); + Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), @@ -459,8 +463,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6); - Uuid barTopicId = context.addTopicMetadata("bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); + Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), @@ -546,8 +550,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6); - Uuid barTopicId = context.addTopicMetadata("bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6, mkMapOfPartitionRacks(6)); + Uuid barTopicId = context.addTopicMetadata("bar", 6, mkMapOfPartitionRacks(6)); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2), @@ -624,8 +628,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6); - Uuid barTopicId = context.addTopicMetadata("bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); + Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2), @@ -687,7 +691,7 @@ public class TargetAssignmentBuilderTest { assertEquals(expectedAssignment, result.targetAssignment()); } - public static void assertUnorderedList( + private static void assertUnorderedList( List expected, List actual ) { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TopicMetadataTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TopicMetadataTest.java index 07d790ec8ff..4129baae30b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TopicMetadataTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TopicMetadataTest.java @@ -20,6 +20,12 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; import org.junit.jupiter.api.Test; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkListOfPartitionRacks; +import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -28,24 +34,29 @@ public class TopicMetadataTest { @Test public void testAttributes() { Uuid topicId = Uuid.randomUuid(); - TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15); + Map> partitionRacks = mkMapOfPartitionRacks(15); + TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15, partitionRacks); + assertEquals(topicId, topicMetadata.id()); assertEquals("foo", topicMetadata.name()); assertEquals(15, topicMetadata.numPartitions()); + assertEquals(partitionRacks, topicMetadata.partitionRacks()); } @Test public void testTopicIdAndNameCannotBeNull() { - assertThrows(NullPointerException.class, () -> new TopicMetadata(Uuid.randomUuid(), null, 15)); - assertThrows(NullPointerException.class, () -> new TopicMetadata(null, "foo", 15)); + assertThrows(NullPointerException.class, () -> new TopicMetadata(Uuid.randomUuid(), null, 15, Collections.emptyMap())); + assertThrows(NullPointerException.class, () -> new TopicMetadata(null, "foo", 15, Collections.emptyMap())); } @Test public void testEquals() { Uuid topicId = Uuid.randomUuid(); - TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15); - assertEquals(new TopicMetadata(topicId, "foo", 15), topicMetadata); - assertNotEquals(new TopicMetadata(topicId, "foo", 5), topicMetadata); + Map> partitionRacks = mkMapOfPartitionRacks(15); + TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15, partitionRacks); + + assertEquals(new TopicMetadata(topicId, "foo", 15, partitionRacks), topicMetadata); + assertNotEquals(new TopicMetadata(topicId, "foo", 5, mkMapOfPartitionRacks(5)), topicMetadata); } @Test @@ -56,10 +67,11 @@ public class TopicMetadataTest { ConsumerGroupPartitionMetadataValue.TopicMetadata record = new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(topicId) .setTopicName(topicName) - .setNumPartitions(15); + .setNumPartitions(15) + .setPartitionMetadata(mkListOfPartitionRacks(15)); assertEquals( - new TopicMetadata(topicId, topicName, 15), + new TopicMetadata(topicId, topicName, 15, mkMapOfPartitionRacks(15)), TopicMetadata.fromRecord(record) ); }